Skip to main content

atomr_cluster/
sbr_runtime.rs

1//! SBR runtime — wires a [`crate::DowningStrategy`] into a
2//! [`crate::MembershipState`] and emits the resulting downing
3//! actions.
4//!
5//! Phase 6.F of `docs/full-port-plan.md`. Akka.NET parity:
6//! `SBR.SplitBrainResolverWorker` — runs on a tick with a stability
7//! deadline; if the partition has been observed for at least
8//! `stable_after`, it consults the configured strategy and returns
9//! the actions the leader should apply.
10
11use std::time::{Duration, Instant};
12
13use crate::member::{Member, MemberStatus};
14use crate::membership::MembershipState;
15use crate::sbr::{DowningDecision, DowningStrategy};
16
17/// Action emitted by [`SbrRuntime::tick`].
18#[derive(Debug, Clone, PartialEq, Eq)]
19#[non_exhaustive]
20pub enum SbrAction {
21    /// No change; partition not yet stable, or no decision.
22    None,
23    /// Down each address (the unreachable side).
24    DownUnreachable(Vec<String>),
25    /// Down every member (catastrophic — the strategy chose `DownAll`).
26    DownAll(Vec<String>),
27    /// Down this node (we lost; voluntarily exit).
28    DownSelf,
29}
30
31/// Runtime that pairs a strategy with a stability deadline.
32pub struct SbrRuntime<S: DowningStrategy> {
33    strategy: S,
34    stable_after: Duration,
35    /// When did we first observe a non-empty unreachable set?
36    /// Reset to `None` when the unreachable set is empty.
37    unstable_since: Option<Instant>,
38}
39
40impl<S: DowningStrategy> SbrRuntime<S> {
41    pub fn new(strategy: S, stable_after: Duration) -> Self {
42        Self { strategy, stable_after, unstable_since: None }
43    }
44
45    /// One scheduling tick. Returns the action the leader should
46    /// apply — typically nothing, sometimes a downing list.
47    pub fn tick(&mut self, state: &MembershipState, now: Instant) -> SbrAction {
48        // Partition the members by reachability.
49        let mut reachable: Vec<&Member> = Vec::new();
50        let mut unreachable: Vec<&Member> = Vec::new();
51        for m in &state.members {
52            if matches!(m.status, MemberStatus::Down | MemberStatus::Removed) {
53                continue;
54            }
55            if state.reachability.is_reachable(&m.address) {
56                reachable.push(m);
57            } else {
58                unreachable.push(m);
59            }
60        }
61
62        if unreachable.is_empty() {
63            // Healthy — reset the stability clock.
64            self.unstable_since = None;
65            return SbrAction::None;
66        }
67
68        // First observation of a partition.
69        let since = *self.unstable_since.get_or_insert(now);
70        if now.duration_since(since) < self.stable_after {
71            return SbrAction::None;
72        }
73
74        match self.strategy.decide(&reachable, &unreachable) {
75            DowningDecision::Stay => SbrAction::None,
76            DowningDecision::DownUnreachable => {
77                SbrAction::DownUnreachable(unreachable.iter().map(|m| m.address.to_string()).collect())
78            }
79            DowningDecision::DownAll => {
80                SbrAction::DownAll(state.members.iter().map(|m| m.address.to_string()).collect())
81            }
82            DowningDecision::DownSelf => SbrAction::DownSelf,
83        }
84    }
85
86    /// `true` once the partition has been observed for at least
87    /// `stable_after`. Useful for telemetry.
88    pub fn is_stable(&self, now: Instant) -> bool {
89        match self.unstable_since {
90            Some(t) => now.duration_since(t) >= self.stable_after,
91            None => true, // healthy → trivially stable
92        }
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99    use crate::sbr::KeepMajorityStrategy;
100    use atomr_core::actor::Address;
101
102    fn member(addr: &str, status: MemberStatus) -> Member {
103        let mut m = Member::new(Address::local(addr), vec![]);
104        m.status = status;
105        m
106    }
107
108    #[test]
109    fn none_when_no_partition() {
110        let mut s = MembershipState::new();
111        s.add_or_update(member("a", MemberStatus::Up));
112        s.add_or_update(member("b", MemberStatus::Up));
113        let mut rt = SbrRuntime::new(KeepMajorityStrategy, Duration::from_millis(0));
114        let r = rt.tick(&s, Instant::now());
115        assert_eq!(r, SbrAction::None);
116        assert!(rt.is_stable(Instant::now()));
117    }
118
119    #[test]
120    fn waits_for_stability_window() {
121        let mut s = MembershipState::new();
122        s.add_or_update(member("a", MemberStatus::Up));
123        s.add_or_update(member("b", MemberStatus::Up));
124        s.add_or_update(member("c", MemberStatus::Up));
125        s.reachability.unreachable(Address::local("b"), Address::local("c"));
126        let mut rt = SbrRuntime::new(KeepMajorityStrategy, Duration::from_secs(60));
127        let now = Instant::now();
128        // First tick records the deadline; returns None.
129        assert_eq!(rt.tick(&s, now), SbrAction::None);
130        assert!(!rt.is_stable(now));
131    }
132
133    #[test]
134    fn fires_after_stability_window_with_majority() {
135        let mut s = MembershipState::new();
136        s.add_or_update(member("a", MemberStatus::Up));
137        s.add_or_update(member("b", MemberStatus::Up));
138        s.add_or_update(member("c", MemberStatus::Up));
139        // c is unreachable.
140        s.reachability.unreachable(Address::local("b"), Address::local("c"));
141        let mut rt = SbrRuntime::new(KeepMajorityStrategy, Duration::from_millis(0));
142        let r = rt.tick(&s, Instant::now());
143        match r {
144            SbrAction::DownUnreachable(addrs) => {
145                assert_eq!(addrs, vec!["akka://c".to_string()]);
146            }
147            other => panic!("expected DownUnreachable, got {other:?}"),
148        }
149    }
150
151    #[test]
152    fn resets_clock_when_partition_heals() {
153        let mut s = MembershipState::new();
154        s.add_or_update(member("a", MemberStatus::Up));
155        s.add_or_update(member("b", MemberStatus::Up));
156        s.reachability.unreachable(Address::local("a"), Address::local("b"));
157        let mut rt = SbrRuntime::new(KeepMajorityStrategy, Duration::from_secs(60));
158        let now = Instant::now();
159        let _ = rt.tick(&s, now);
160        // Heal partition.
161        s.reachability.reachable(Address::local("a"), Address::local("b"));
162        let r = rt.tick(&s, now + Duration::from_secs(1));
163        assert_eq!(r, SbrAction::None);
164        assert!(rt.is_stable(now + Duration::from_secs(1)));
165    }
166}