atomr_cluster/
membership.rs1use std::collections::BTreeSet;
4
5use atomr_core::actor::Address;
6use serde::{Deserialize, Serialize};
7
8use crate::events::ClusterEvent;
9use crate::leader::{is_converged, next_status};
10use crate::member::{Member, MemberStatus};
11use crate::reachability::Reachability;
12
13#[derive(Debug, Clone, Default, Serialize, Deserialize)]
14pub struct MembershipState {
15 pub members: Vec<Member>,
16 pub reachability: Reachability,
17}
18
19impl MembershipState {
20 pub fn new() -> Self {
21 Self::default()
22 }
23
24 pub fn add_or_update(&mut self, m: Member) {
25 if let Some(existing) = self.members.iter_mut().find(|x| x.address == m.address) {
26 *existing = m;
27 } else {
28 self.members.push(m);
29 }
30 }
31
32 pub fn remove(&mut self, addr: &Address) {
33 self.members.retain(|m| &m.address != addr);
34 }
35
36 pub fn up_members(&self) -> Vec<&Member> {
37 self.members.iter().filter(|m| matches!(m.status, MemberStatus::Up)).collect()
38 }
39
40 pub fn unreachable_addresses(&self) -> BTreeSet<String> {
41 self.members
42 .iter()
43 .filter(|m| !self.reachability.is_reachable(&m.address))
44 .map(|m| m.address.to_string())
45 .collect()
46 }
47
48 pub fn member_count(&self) -> usize {
49 self.members.len()
50 }
51
52 pub fn apply_leader_actions(&mut self) -> Vec<ClusterEvent> {
60 let converged = is_converged(self);
61 let mut events = Vec::new();
62 let mut transitions: Vec<(Address, MemberStatus)> = Vec::new();
64 for m in &self.members {
65 if let Some(next) = next_status(m.status, converged) {
66 transitions.push((m.address.clone(), next));
67 }
68 }
69 for (addr, next) in transitions {
71 if let Some(m) = self.members.iter_mut().find(|x| x.address == addr) {
72 let prev = m.status;
73 m.status = next;
74 let updated = m.clone();
75 let evt = match next {
76 MemberStatus::Up => ClusterEvent::MemberUp(updated.clone()),
77 MemberStatus::Exiting => ClusterEvent::MemberExited(updated.clone()),
78 MemberStatus::Removed => ClusterEvent::MemberRemoved(updated.clone(), prev),
79 _ => continue,
80 };
81 events.push(evt);
82 }
83 }
84 self.members.retain(|m| m.status != MemberStatus::Removed);
86 if converged {
87 events.push(ClusterEvent::Convergence(true));
88 }
89 events
90 }
91
92 pub fn join(&mut self, m: Member) -> ClusterEvent {
95 self.add_or_update(m.clone());
96 ClusterEvent::MemberJoined(m)
97 }
98
99 pub fn leave(&mut self, addr: &Address) -> Option<ClusterEvent> {
102 let m = self.members.iter_mut().find(|x| &x.address == addr)?;
103 if matches!(m.status, MemberStatus::Up | MemberStatus::WeaklyUp) {
104 m.status = MemberStatus::Leaving;
105 return Some(ClusterEvent::MemberLeft(m.clone()));
106 }
107 None
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use super::*;
114
115 #[test]
116 fn add_and_remove() {
117 let mut s = MembershipState::new();
118 let m = Member::new(Address::local("a"), vec![]);
119 s.add_or_update(m.clone());
120 assert_eq!(s.member_count(), 1);
121 s.remove(&m.address);
122 assert_eq!(s.member_count(), 0);
123 }
124
125 #[test]
126 fn join_emits_member_joined() {
127 let mut s = MembershipState::new();
128 let evt = s.join(Member::new(Address::local("a"), vec![]));
129 assert!(matches!(evt, ClusterEvent::MemberJoined(_)));
130 assert_eq!(s.member_count(), 1);
131 }
132
133 #[test]
134 fn leader_actions_promote_joining_to_up_when_converged() {
135 let mut s = MembershipState::new();
136 s.join(Member::new(Address::local("a"), vec![]));
137 let events = s.apply_leader_actions();
139 let names: Vec<_> = events
140 .iter()
141 .filter_map(|e| match e {
142 ClusterEvent::MemberUp(m) => Some(m.address.to_string()),
143 _ => None,
144 })
145 .collect();
146 assert_eq!(names, vec!["akka://a".to_string()]);
147 }
148
149 #[test]
150 fn leader_actions_remove_down_members() {
151 let mut s = MembershipState::new();
152 let mut m = Member::new(Address::local("a"), vec![]);
153 m.status = MemberStatus::Down;
154 s.add_or_update(m);
155 let _ = s.apply_leader_actions();
156 assert_eq!(s.member_count(), 0);
157 }
158
159 #[test]
160 fn leave_marks_up_member_as_leaving() {
161 let mut s = MembershipState::new();
162 let mut m = Member::new(Address::local("a"), vec![]);
163 m.status = MemberStatus::Up;
164 s.add_or_update(m);
165 let evt = s.leave(&Address::local("a"));
166 assert!(matches!(evt, Some(ClusterEvent::MemberLeft(_))));
167 }
168
169 #[test]
170 fn leave_is_noop_for_unknown_member() {
171 let mut s = MembershipState::new();
172 let evt = s.leave(&Address::local("nope"));
173 assert!(evt.is_none());
174 }
175}