Skip to main content

atomr_cluster/
membership.rs

1//! Membership state.
2
3use std::collections::BTreeSet;
4
5use atomr_core::actor::Address;
6use serde::{Deserialize, Serialize};
7
8use crate::events::ClusterEvent;
9use crate::leader::{is_converged, next_status};
10use crate::member::{Member, MemberStatus};
11use crate::reachability::Reachability;
12
13#[derive(Debug, Clone, Default, Serialize, Deserialize)]
14pub struct MembershipState {
15    pub members: Vec<Member>,
16    pub reachability: Reachability,
17}
18
19impl MembershipState {
20    pub fn new() -> Self {
21        Self::default()
22    }
23
24    pub fn add_or_update(&mut self, m: Member) {
25        if let Some(existing) = self.members.iter_mut().find(|x| x.address == m.address) {
26            *existing = m;
27        } else {
28            self.members.push(m);
29        }
30    }
31
32    pub fn remove(&mut self, addr: &Address) {
33        self.members.retain(|m| &m.address != addr);
34    }
35
36    pub fn up_members(&self) -> Vec<&Member> {
37        self.members.iter().filter(|m| matches!(m.status, MemberStatus::Up)).collect()
38    }
39
40    pub fn unreachable_addresses(&self) -> BTreeSet<String> {
41        self.members
42            .iter()
43            .filter(|m| !self.reachability.is_reachable(&m.address))
44            .map(|m| m.address.to_string())
45            .collect()
46    }
47
48    pub fn member_count(&self) -> usize {
49        self.members.len()
50    }
51
52    /// Run the leader's per-tick transition logic against the current
53    /// state. Returns the [`ClusterEvent`]s the daemon should publish
54    /// (membership status flips, removals).
55    ///
56    /// Phase 6.C of `docs/full-port-plan.md`. Pure function — keeps
57    /// the daemon actor itself trivial: collect events, then publish
58    /// onto [`crate::events::ClusterEventBus`].
59    pub fn apply_leader_actions(&mut self) -> Vec<ClusterEvent> {
60        let converged = is_converged(self);
61        let mut events = Vec::new();
62        // First pass: compute transitions.
63        let mut transitions: Vec<(Address, MemberStatus)> = Vec::new();
64        for m in &self.members {
65            if let Some(next) = next_status(m.status, converged) {
66                transitions.push((m.address.clone(), next));
67            }
68        }
69        // Second pass: apply + emit.
70        for (addr, next) in transitions {
71            if let Some(m) = self.members.iter_mut().find(|x| x.address == addr) {
72                let prev = m.status;
73                m.status = next;
74                let updated = m.clone();
75                let evt = match next {
76                    MemberStatus::Up => ClusterEvent::MemberUp(updated.clone()),
77                    MemberStatus::Exiting => ClusterEvent::MemberExited(updated.clone()),
78                    MemberStatus::Removed => ClusterEvent::MemberRemoved(updated.clone(), prev),
79                    _ => continue,
80                };
81                events.push(evt);
82            }
83        }
84        // Drop members in `Removed` status (clean-up).
85        self.members.retain(|m| m.status != MemberStatus::Removed);
86        if converged {
87            events.push(ClusterEvent::Convergence(true));
88        }
89        events
90    }
91
92    /// Insert `m` as a `Joining` member and emit the `MemberJoined`
93    /// event for the daemon to publish.
94    pub fn join(&mut self, m: Member) -> ClusterEvent {
95        self.add_or_update(m.clone());
96        ClusterEvent::MemberJoined(m)
97    }
98
99    /// Mark `addr` as leaving. Returns the published event if the
100    /// transition was valid, `None` if no such member exists.
101    pub fn leave(&mut self, addr: &Address) -> Option<ClusterEvent> {
102        let m = self.members.iter_mut().find(|x| &x.address == addr)?;
103        if matches!(m.status, MemberStatus::Up | MemberStatus::WeaklyUp) {
104            m.status = MemberStatus::Leaving;
105            return Some(ClusterEvent::MemberLeft(m.clone()));
106        }
107        None
108    }
109
110    /// Force `addr` to `Down`. Unlike [`Self::leave`], this is the
111    /// operator-initiated terminal-down path: it accepts members in
112    /// `Up`, `WeaklyUp`, or `Leaving` status. The next leader-action
113    /// tick promotes `Down → Removed`.
114    ///
115    /// Returns the [`ClusterEvent::MemberDowned`] if a transition
116    /// occurred, or `None` if the member is unknown or already in a
117    /// terminal status (`Down`, `Exiting`, `Removed`, `Joining`).
118    pub fn down(&mut self, addr: &Address) -> Option<ClusterEvent> {
119        let m = self.members.iter_mut().find(|x| &x.address == addr)?;
120        if matches!(m.status, MemberStatus::Up | MemberStatus::WeaklyUp | MemberStatus::Leaving) {
121            m.status = MemberStatus::Down;
122            return Some(ClusterEvent::MemberDowned(m.clone()));
123        }
124        None
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131
132    #[test]
133    fn add_and_remove() {
134        let mut s = MembershipState::new();
135        let m = Member::new(Address::local("a"), vec![]);
136        s.add_or_update(m.clone());
137        assert_eq!(s.member_count(), 1);
138        s.remove(&m.address);
139        assert_eq!(s.member_count(), 0);
140    }
141
142    #[test]
143    fn join_emits_member_joined() {
144        let mut s = MembershipState::new();
145        let evt = s.join(Member::new(Address::local("a"), vec![]));
146        assert!(matches!(evt, ClusterEvent::MemberJoined(_)));
147        assert_eq!(s.member_count(), 1);
148    }
149
150    #[test]
151    fn leader_actions_promote_joining_to_up_when_converged() {
152        let mut s = MembershipState::new();
153        s.join(Member::new(Address::local("a"), vec![]));
154        // Converged because every member is reachable; Joining→Up transitions.
155        let events = s.apply_leader_actions();
156        let names: Vec<_> = events
157            .iter()
158            .filter_map(|e| match e {
159                ClusterEvent::MemberUp(m) => Some(m.address.to_string()),
160                _ => None,
161            })
162            .collect();
163        assert_eq!(names, vec!["akka://a".to_string()]);
164    }
165
166    #[test]
167    fn leader_actions_remove_down_members() {
168        let mut s = MembershipState::new();
169        let mut m = Member::new(Address::local("a"), vec![]);
170        m.status = MemberStatus::Down;
171        s.add_or_update(m);
172        let _ = s.apply_leader_actions();
173        assert_eq!(s.member_count(), 0);
174    }
175
176    #[test]
177    fn leave_marks_up_member_as_leaving() {
178        let mut s = MembershipState::new();
179        let mut m = Member::new(Address::local("a"), vec![]);
180        m.status = MemberStatus::Up;
181        s.add_or_update(m);
182        let evt = s.leave(&Address::local("a"));
183        assert!(matches!(evt, Some(ClusterEvent::MemberLeft(_))));
184    }
185
186    #[test]
187    fn leave_is_noop_for_unknown_member() {
188        let mut s = MembershipState::new();
189        let evt = s.leave(&Address::local("nope"));
190        assert!(evt.is_none());
191    }
192
193    #[test]
194    fn down_marks_up_member_as_down() {
195        let mut s = MembershipState::new();
196        let mut m = Member::new(Address::local("a"), vec![]);
197        m.status = MemberStatus::Up;
198        s.add_or_update(m);
199        let evt = s.down(&Address::local("a"));
200        assert!(matches!(evt, Some(ClusterEvent::MemberDowned(_))));
201        let m = s.members.first().unwrap();
202        assert_eq!(m.status, MemberStatus::Down);
203    }
204
205    #[test]
206    fn down_accepts_leaving_member() {
207        let mut s = MembershipState::new();
208        let mut m = Member::new(Address::local("a"), vec![]);
209        m.status = MemberStatus::Leaving;
210        s.add_or_update(m);
211        let evt = s.down(&Address::local("a"));
212        assert!(matches!(evt, Some(ClusterEvent::MemberDowned(_))));
213    }
214
215    #[test]
216    fn down_accepts_weakly_up_member() {
217        let mut s = MembershipState::new();
218        let mut m = Member::new(Address::local("a"), vec![]);
219        m.status = MemberStatus::WeaklyUp;
220        s.add_or_update(m);
221        let evt = s.down(&Address::local("a"));
222        assert!(matches!(evt, Some(ClusterEvent::MemberDowned(_))));
223    }
224
225    #[test]
226    fn down_is_noop_for_unknown_member() {
227        let mut s = MembershipState::new();
228        let evt = s.down(&Address::local("nope"));
229        assert!(evt.is_none());
230    }
231
232    #[test]
233    fn down_is_noop_for_already_down_member() {
234        let mut s = MembershipState::new();
235        let mut m = Member::new(Address::local("a"), vec![]);
236        m.status = MemberStatus::Down;
237        s.add_or_update(m);
238        let evt = s.down(&Address::local("a"));
239        assert!(evt.is_none());
240    }
241}