Skip to main content

atomr_cluster/
leader.rs

1//! Leader election.
2//!
3//! The leader is the lowest-address `Up`/`Leaving` member that's reachable
4//! from the current node — deterministic given the gossip-converged
5//! membership state. This module implements that pure function plus the
6//! transition rules that fire on each gossip tick.
7//!
8//! The full state machine (`Joining → Up → Leaving → Exiting →
9//! Removed`) lives here as helpers; the active driver that wires
10//! these into the gossip loop is Phase 6.B.
11
12use atomr_core::actor::Address;
13
14use crate::member::{Member, MemberStatus};
15use crate::membership::MembershipState;
16
17/// Pick the deterministic leader from a [`MembershipState`].
18///
19/// Algorithm: among reachable members in the `Up` or `Leaving`
20/// status, return the one with the lowest `Address` (lexicographic
21/// over the `Display` form). Returns `None` if no eligible member
22/// exists.
23pub fn elect_leader(state: &MembershipState) -> Option<Address> {
24    let mut eligible: Vec<&Member> = state
25        .members
26        .iter()
27        .filter(|m| matches!(m.status, MemberStatus::Up | MemberStatus::Leaving))
28        .filter(|m| state.reachability.is_reachable(&m.address))
29        .collect();
30    eligible.sort_by_key(|a| a.address.to_string());
31    eligible.first().map(|m| m.address.clone())
32}
33
34/// Compute the next status for a member given the current convergence
35/// state. Returns `None` if no transition applies.
36///
37/// * `Joining` → `Up` once convergence is reached and this member is
38///   reachable from the leader.
39/// * `Leaving` → `Exiting` once the leader sees the leave intent.
40/// * `Exiting` → `Removed` once the leader-side cleanup completes.
41/// * `Down` → `Removed` once the gossip purge interval elapses.
42pub fn next_status(current: MemberStatus, converged: bool) -> Option<MemberStatus> {
43    match (current, converged) {
44        (MemberStatus::Joining, true) => Some(MemberStatus::Up),
45        (MemberStatus::Leaving, true) => Some(MemberStatus::Exiting),
46        (MemberStatus::Exiting, true) => Some(MemberStatus::Removed),
47        (MemberStatus::Down, _) => Some(MemberStatus::Removed),
48        _ => None,
49    }
50}
51
52/// Convergence holds when every member that this node believes is
53/// alive is also reachable. gossip uses convergence as a
54/// pre-condition for the leader's status-transition tick (the
55/// leader won't move members from `Joining → Up` while a partition
56/// is in flight).
57///
58/// Simplified vs. upstream: we don't track per-node "seen" sets yet,
59/// so this is the local-view variant. A partitioned member shows up
60/// as unreachable; once SBR (or a heartbeat recovery) resolves it,
61/// convergence holds again. `Down` members don't block convergence
62/// because they're already on the way out.
63pub fn is_converged(state: &MembershipState) -> bool {
64    state.members.iter().all(|m| {
65        if matches!(m.status, MemberStatus::Down | MemberStatus::Removed) {
66            return true;
67        }
68        state.reachability.is_reachable(&m.address)
69    })
70}
71
72#[cfg(test)]
73mod tests {
74    use super::*;
75
76    fn member(addr: &str, status: MemberStatus) -> Member {
77        let mut m = Member::new(Address::local(addr), vec![]);
78        m.status = status;
79        m
80    }
81
82    #[test]
83    fn leader_is_lowest_address_up_member() {
84        let mut s = MembershipState::new();
85        s.add_or_update(member("c", MemberStatus::Up));
86        s.add_or_update(member("a", MemberStatus::Up));
87        s.add_or_update(member("b", MemberStatus::Up));
88        assert_eq!(elect_leader(&s), Some(Address::local("a")));
89    }
90
91    #[test]
92    fn leader_skips_non_up_members() {
93        let mut s = MembershipState::new();
94        s.add_or_update(member("a", MemberStatus::Joining));
95        s.add_or_update(member("b", MemberStatus::Up));
96        assert_eq!(elect_leader(&s), Some(Address::local("b")));
97    }
98
99    #[test]
100    fn leader_skips_unreachable_members() {
101        let mut s = MembershipState::new();
102        s.add_or_update(member("a", MemberStatus::Up));
103        s.add_or_update(member("b", MemberStatus::Up));
104        // Mark "a" unreachable from "b".
105        s.reachability.unreachable(Address::local("b"), Address::local("a"));
106        assert_eq!(elect_leader(&s), Some(Address::local("b")));
107    }
108
109    #[test]
110    fn no_leader_when_no_eligible_members() {
111        let s = MembershipState::new();
112        assert_eq!(elect_leader(&s), None);
113    }
114
115    #[test]
116    fn next_status_transitions() {
117        assert_eq!(next_status(MemberStatus::Joining, true), Some(MemberStatus::Up));
118        assert_eq!(next_status(MemberStatus::Joining, false), None);
119        assert_eq!(next_status(MemberStatus::Leaving, true), Some(MemberStatus::Exiting));
120        assert_eq!(next_status(MemberStatus::Exiting, true), Some(MemberStatus::Removed));
121        assert_eq!(next_status(MemberStatus::Down, false), Some(MemberStatus::Removed));
122        assert_eq!(next_status(MemberStatus::Up, true), None);
123    }
124
125    #[test]
126    fn convergence_holds_when_everyone_reachable() {
127        let mut s = MembershipState::new();
128        s.add_or_update(member("a", MemberStatus::Up));
129        s.add_or_update(member("b", MemberStatus::Joining));
130        assert!(is_converged(&s));
131    }
132
133    #[test]
134    fn convergence_fails_when_a_member_is_unreachable() {
135        let mut s = MembershipState::new();
136        s.add_or_update(member("a", MemberStatus::Up));
137        s.add_or_update(member("b", MemberStatus::Up));
138        s.reachability.unreachable(Address::local("a"), Address::local("b"));
139        assert!(!is_converged(&s));
140    }
141
142    #[test]
143    fn down_members_do_not_block_convergence() {
144        let mut s = MembershipState::new();
145        s.add_or_update(member("a", MemberStatus::Up));
146        s.add_or_update(member("b", MemberStatus::Down));
147        // b is Down so its reachability doesn't matter for convergence.
148        s.reachability.unreachable(Address::local("a"), Address::local("b"));
149        assert!(is_converged(&s));
150    }
151}
152
153// -- Distributed leader-election handover ---------------------------
154
155/// Handover event emitted by a [`LeaderHandover`] watcher when the
156/// elected leader changes between snapshots.
157/// `Cluster.LeaderChanged` event published on the cluster's domain
158/// event stream.
159#[derive(Debug, Clone, PartialEq, Eq)]
160pub struct LeaderHandoverEvent {
161    pub from: Option<Address>,
162    pub to: Option<Address>,
163}
164
165/// Watcher that detects leader transitions across successive
166/// [`MembershipState`] snapshots. Stateful: holds the previous
167/// snapshot's leader and emits a [`LeaderHandoverEvent`] only on
168/// change.
169///
170/// Wire it up by calling [`LeaderHandover::observe`] every gossip
171/// tick — a `Some(event)` return value means a handover occurred and
172/// should be published. Across-process handover comes from the
173/// cluster daemon broadcasting these events on the
174/// [`crate::ClusterEventBus`] via remote.
175#[derive(Debug, Default, Clone)]
176pub struct LeaderHandover {
177    previous: Option<Address>,
178}
179
180impl LeaderHandover {
181    pub fn new() -> Self {
182        Self::default()
183    }
184
185    /// Observe a new membership snapshot. Returns `Some(event)` if
186    /// the leader changed since the last observation; `None` if it
187    /// did not.
188    pub fn observe(&mut self, state: &MembershipState) -> Option<LeaderHandoverEvent> {
189        let next = elect_leader(state);
190        if self.previous != next {
191            let event = LeaderHandoverEvent { from: self.previous.clone(), to: next.clone() };
192            self.previous = next;
193            return Some(event);
194        }
195        None
196    }
197
198    /// The leader observed at the last call to [`Self::observe`].
199    pub fn current(&self) -> Option<&Address> {
200        self.previous.as_ref()
201    }
202}
203
204#[cfg(test)]
205mod handover_tests {
206    use super::*;
207
208    fn member(addr: &str, status: MemberStatus) -> Member {
209        let mut m = Member::new(Address::local(addr), vec![]);
210        m.status = status;
211        m
212    }
213
214    #[test]
215    fn first_observation_emits_initial_election() {
216        let mut s = MembershipState::new();
217        s.add_or_update(member("a", MemberStatus::Up));
218        let mut h = LeaderHandover::new();
219        let ev = h.observe(&s).unwrap();
220        assert_eq!(ev.from, None);
221        assert_eq!(ev.to, Some(Address::local("a")));
222    }
223
224    #[test]
225    fn no_event_when_leader_unchanged() {
226        let mut s = MembershipState::new();
227        s.add_or_update(member("a", MemberStatus::Up));
228        let mut h = LeaderHandover::new();
229        h.observe(&s);
230        assert!(h.observe(&s).is_none());
231    }
232
233    #[test]
234    fn leader_leaving_triggers_handover_to_next_member() {
235        let mut s = MembershipState::new();
236        s.add_or_update(member("a", MemberStatus::Up));
237        s.add_or_update(member("b", MemberStatus::Up));
238        let mut h = LeaderHandover::new();
239        h.observe(&s);
240        assert_eq!(h.current(), Some(&Address::local("a")));
241
242        // 'a' transitions to Leaving — still eligible — leader unchanged.
243        let mut leaving = MembershipState::new();
244        leaving.add_or_update(member("a", MemberStatus::Leaving));
245        leaving.add_or_update(member("b", MemberStatus::Up));
246        assert!(h.observe(&leaving).is_none());
247
248        // 'a' transitions to Exiting (no longer eligible). Leader now b.
249        let mut exiting = MembershipState::new();
250        exiting.add_or_update(member("a", MemberStatus::Exiting));
251        exiting.add_or_update(member("b", MemberStatus::Up));
252        let ev = h.observe(&exiting).unwrap();
253        assert_eq!(ev.from, Some(Address::local("a")));
254        assert_eq!(ev.to, Some(Address::local("b")));
255    }
256
257    #[test]
258    fn leader_becoming_unreachable_triggers_handover() {
259        let mut s = MembershipState::new();
260        s.add_or_update(member("a", MemberStatus::Up));
261        s.add_or_update(member("b", MemberStatus::Up));
262        let mut h = LeaderHandover::new();
263        h.observe(&s);
264        // Mark 'a' unreachable from b. Leader becomes b.
265        s.reachability.unreachable(Address::local("b"), Address::local("a"));
266        let ev = h.observe(&s).unwrap();
267        assert_eq!(ev.from, Some(Address::local("a")));
268        assert_eq!(ev.to, Some(Address::local("b")));
269    }
270
271    #[test]
272    fn no_eligible_members_emits_to_none() {
273        let mut s = MembershipState::new();
274        s.add_or_update(member("a", MemberStatus::Up));
275        let mut h = LeaderHandover::new();
276        h.observe(&s);
277
278        // a is removed — no eligible leader remains.
279        let mut empty = MembershipState::new();
280        empty.add_or_update(member("a", MemberStatus::Removed));
281        let ev = h.observe(&empty).unwrap();
282        assert_eq!(ev.from, Some(Address::local("a")));
283        assert_eq!(ev.to, None);
284    }
285
286    #[test]
287    fn handover_through_full_cluster_lifecycle() {
288        let mut h = LeaderHandover::new();
289
290        // 3 members all Up: leader is a.
291        let mut s1 = MembershipState::new();
292        for n in ["a", "b", "c"] {
293            s1.add_or_update(member(n, MemberStatus::Up));
294        }
295        assert_eq!(h.observe(&s1).unwrap().to, Some(Address::local("a")));
296
297        // a leaves: leader becomes b.
298        let mut s2 = MembershipState::new();
299        s2.add_or_update(member("a", MemberStatus::Removed));
300        for n in ["b", "c"] {
301            s2.add_or_update(member(n, MemberStatus::Up));
302        }
303        assert_eq!(h.observe(&s2).unwrap().to, Some(Address::local("b")));
304
305        // b leaves too: leader becomes c.
306        let mut s3 = MembershipState::new();
307        for n in ["a", "b"] {
308            s3.add_or_update(member(n, MemberStatus::Removed));
309        }
310        s3.add_or_update(member("c", MemberStatus::Up));
311        assert_eq!(h.observe(&s3).unwrap().to, Some(Address::local("c")));
312    }
313}