Skip to main content

atomr_cluster/
membership.rs

1//! Membership state. akka.net: `Cluster/MembershipState.cs`.
2
3use std::collections::BTreeSet;
4
5use atomr_core::actor::Address;
6use serde::{Deserialize, Serialize};
7
8use crate::events::ClusterEvent;
9use crate::leader::{is_converged, next_status};
10use crate::member::{Member, MemberStatus};
11use crate::reachability::Reachability;
12
13#[derive(Debug, Clone, Default, Serialize, Deserialize)]
14pub struct MembershipState {
15    pub members: Vec<Member>,
16    pub reachability: Reachability,
17}
18
19impl MembershipState {
20    pub fn new() -> Self {
21        Self::default()
22    }
23
24    pub fn add_or_update(&mut self, m: Member) {
25        if let Some(existing) = self.members.iter_mut().find(|x| x.address == m.address) {
26            *existing = m;
27        } else {
28            self.members.push(m);
29        }
30    }
31
32    pub fn remove(&mut self, addr: &Address) {
33        self.members.retain(|m| &m.address != addr);
34    }
35
36    pub fn up_members(&self) -> Vec<&Member> {
37        self.members.iter().filter(|m| matches!(m.status, MemberStatus::Up)).collect()
38    }
39
40    pub fn unreachable_addresses(&self) -> BTreeSet<String> {
41        self.members
42            .iter()
43            .filter(|m| !self.reachability.is_reachable(&m.address))
44            .map(|m| m.address.to_string())
45            .collect()
46    }
47
48    pub fn member_count(&self) -> usize {
49        self.members.len()
50    }
51
52    /// Run the leader's per-tick transition logic against the current
53    /// state. Returns the [`ClusterEvent`]s the daemon should publish
54    /// (membership status flips, removals).
55    ///
56    /// Phase 6.C of `docs/full-port-plan.md`. Pure function — keeps
57    /// the daemon actor itself trivial: collect events, then publish
58    /// onto [`crate::events::ClusterEventBus`].
59    pub fn apply_leader_actions(&mut self) -> Vec<ClusterEvent> {
60        let converged = is_converged(self);
61        let mut events = Vec::new();
62        // First pass: compute transitions.
63        let mut transitions: Vec<(Address, MemberStatus)> = Vec::new();
64        for m in &self.members {
65            if let Some(next) = next_status(m.status, converged) {
66                transitions.push((m.address.clone(), next));
67            }
68        }
69        // Second pass: apply + emit.
70        for (addr, next) in transitions {
71            if let Some(m) = self.members.iter_mut().find(|x| x.address == addr) {
72                let prev = m.status;
73                m.status = next;
74                let updated = m.clone();
75                let evt = match next {
76                    MemberStatus::Up => ClusterEvent::MemberUp(updated.clone()),
77                    MemberStatus::Exiting => ClusterEvent::MemberExited(updated.clone()),
78                    MemberStatus::Removed => ClusterEvent::MemberRemoved(updated.clone(), prev),
79                    _ => continue,
80                };
81                events.push(evt);
82            }
83        }
84        // Drop members in `Removed` status (clean-up).
85        self.members.retain(|m| m.status != MemberStatus::Removed);
86        if converged {
87            events.push(ClusterEvent::Convergence(true));
88        }
89        events
90    }
91
92    /// Insert `m` as a `Joining` member and emit the `MemberJoined`
93    /// event for the daemon to publish.
94    pub fn join(&mut self, m: Member) -> ClusterEvent {
95        self.add_or_update(m.clone());
96        ClusterEvent::MemberJoined(m)
97    }
98
99    /// Mark `addr` as leaving. Returns the published event if the
100    /// transition was valid, `None` if no such member exists.
101    pub fn leave(&mut self, addr: &Address) -> Option<ClusterEvent> {
102        let m = self.members.iter_mut().find(|x| &x.address == addr)?;
103        if matches!(m.status, MemberStatus::Up | MemberStatus::WeaklyUp) {
104            m.status = MemberStatus::Leaving;
105            return Some(ClusterEvent::MemberLeft(m.clone()));
106        }
107        None
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114
115    #[test]
116    fn add_and_remove() {
117        let mut s = MembershipState::new();
118        let m = Member::new(Address::local("a"), vec![]);
119        s.add_or_update(m.clone());
120        assert_eq!(s.member_count(), 1);
121        s.remove(&m.address);
122        assert_eq!(s.member_count(), 0);
123    }
124
125    #[test]
126    fn join_emits_member_joined() {
127        let mut s = MembershipState::new();
128        let evt = s.join(Member::new(Address::local("a"), vec![]));
129        assert!(matches!(evt, ClusterEvent::MemberJoined(_)));
130        assert_eq!(s.member_count(), 1);
131    }
132
133    #[test]
134    fn leader_actions_promote_joining_to_up_when_converged() {
135        let mut s = MembershipState::new();
136        s.join(Member::new(Address::local("a"), vec![]));
137        // Converged because every member is reachable; Joining→Up transitions.
138        let events = s.apply_leader_actions();
139        let names: Vec<_> = events
140            .iter()
141            .filter_map(|e| match e {
142                ClusterEvent::MemberUp(m) => Some(m.address.to_string()),
143                _ => None,
144            })
145            .collect();
146        assert_eq!(names, vec!["akka://a".to_string()]);
147    }
148
149    #[test]
150    fn leader_actions_remove_down_members() {
151        let mut s = MembershipState::new();
152        let mut m = Member::new(Address::local("a"), vec![]);
153        m.status = MemberStatus::Down;
154        s.add_or_update(m);
155        let _ = s.apply_leader_actions();
156        assert_eq!(s.member_count(), 0);
157    }
158
159    #[test]
160    fn leave_marks_up_member_as_leaving() {
161        let mut s = MembershipState::new();
162        let mut m = Member::new(Address::local("a"), vec![]);
163        m.status = MemberStatus::Up;
164        s.add_or_update(m);
165        let evt = s.leave(&Address::local("a"));
166        assert!(matches!(evt, Some(ClusterEvent::MemberLeft(_))));
167    }
168
169    #[test]
170    fn leave_is_noop_for_unknown_member() {
171        let mut s = MembershipState::new();
172        let evt = s.leave(&Address::local("nope"));
173        assert!(evt.is_none());
174    }
175}