Skip to main content

atomr_cluster/
sbr_runtime.rs

1//! SBR runtime — wires a [`crate::DowningStrategy`] into a
2//! [`crate::MembershipState`] and emits the resulting downing
3//! actions.
4//!
5//! Runs on a tick with a stability deadline; if the partition has been
6//! observed for at least `stable_after`, it consults the configured
7//! strategy and returns the actions the leader should apply.
8
9use std::time::{Duration, Instant};
10
11use crate::member::{Member, MemberStatus};
12use crate::membership::MembershipState;
13use crate::sbr::{DowningDecision, DowningStrategy};
14
15/// Action emitted by [`SbrRuntime::tick`].
16#[derive(Debug, Clone, PartialEq, Eq)]
17#[non_exhaustive]
18pub enum SbrAction {
19    /// No change; partition not yet stable, or no decision.
20    None,
21    /// Down each address (the unreachable side).
22    DownUnreachable(Vec<String>),
23    /// Down every member (catastrophic — the strategy chose `DownAll`).
24    DownAll(Vec<String>),
25    /// Down this node (we lost; voluntarily exit).
26    DownSelf,
27}
28
29/// Runtime that pairs a strategy with a stability deadline.
30pub struct SbrRuntime<S: DowningStrategy> {
31    strategy: S,
32    stable_after: Duration,
33    /// When did we first observe a non-empty unreachable set?
34    /// Reset to `None` when the unreachable set is empty.
35    unstable_since: Option<Instant>,
36}
37
38impl<S: DowningStrategy> SbrRuntime<S> {
39    pub fn new(strategy: S, stable_after: Duration) -> Self {
40        Self { strategy, stable_after, unstable_since: None }
41    }
42
43    /// One scheduling tick. Returns the action the leader should
44    /// apply — typically nothing, sometimes a downing list.
45    pub fn tick(&mut self, state: &MembershipState, now: Instant) -> SbrAction {
46        // Partition the members by reachability.
47        let mut reachable: Vec<&Member> = Vec::new();
48        let mut unreachable: Vec<&Member> = Vec::new();
49        for m in &state.members {
50            if matches!(m.status, MemberStatus::Down | MemberStatus::Removed) {
51                continue;
52            }
53            if state.reachability.is_reachable(&m.address) {
54                reachable.push(m);
55            } else {
56                unreachable.push(m);
57            }
58        }
59
60        if unreachable.is_empty() {
61            // Healthy — reset the stability clock.
62            self.unstable_since = None;
63            return SbrAction::None;
64        }
65
66        // First observation of a partition.
67        let since = *self.unstable_since.get_or_insert(now);
68        if now.duration_since(since) < self.stable_after {
69            return SbrAction::None;
70        }
71
72        match self.strategy.decide(&reachable, &unreachable) {
73            DowningDecision::Stay => SbrAction::None,
74            DowningDecision::DownUnreachable => {
75                SbrAction::DownUnreachable(unreachable.iter().map(|m| m.address.to_string()).collect())
76            }
77            DowningDecision::DownAll => {
78                SbrAction::DownAll(state.members.iter().map(|m| m.address.to_string()).collect())
79            }
80            DowningDecision::DownSelf => SbrAction::DownSelf,
81        }
82    }
83
84    /// `true` once the partition has been observed for at least
85    /// `stable_after`. Useful for telemetry.
86    pub fn is_stable(&self, now: Instant) -> bool {
87        match self.unstable_since {
88            Some(t) => now.duration_since(t) >= self.stable_after,
89            None => true, // healthy → trivially stable
90        }
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use super::*;
97    use crate::sbr::KeepMajorityStrategy;
98    use atomr_core::actor::Address;
99
100    fn member(addr: &str, status: MemberStatus) -> Member {
101        let mut m = Member::new(Address::local(addr), vec![]);
102        m.status = status;
103        m
104    }
105
106    #[test]
107    fn none_when_no_partition() {
108        let mut s = MembershipState::new();
109        s.add_or_update(member("a", MemberStatus::Up));
110        s.add_or_update(member("b", MemberStatus::Up));
111        let mut rt = SbrRuntime::new(KeepMajorityStrategy, Duration::from_millis(0));
112        let r = rt.tick(&s, Instant::now());
113        assert_eq!(r, SbrAction::None);
114        assert!(rt.is_stable(Instant::now()));
115    }
116
117    #[test]
118    fn waits_for_stability_window() {
119        let mut s = MembershipState::new();
120        s.add_or_update(member("a", MemberStatus::Up));
121        s.add_or_update(member("b", MemberStatus::Up));
122        s.add_or_update(member("c", MemberStatus::Up));
123        s.reachability.unreachable(Address::local("b"), Address::local("c"));
124        let mut rt = SbrRuntime::new(KeepMajorityStrategy, Duration::from_secs(60));
125        let now = Instant::now();
126        // First tick records the deadline; returns None.
127        assert_eq!(rt.tick(&s, now), SbrAction::None);
128        assert!(!rt.is_stable(now));
129    }
130
131    #[test]
132    fn fires_after_stability_window_with_majority() {
133        let mut s = MembershipState::new();
134        s.add_or_update(member("a", MemberStatus::Up));
135        s.add_or_update(member("b", MemberStatus::Up));
136        s.add_or_update(member("c", MemberStatus::Up));
137        // c is unreachable.
138        s.reachability.unreachable(Address::local("b"), Address::local("c"));
139        let mut rt = SbrRuntime::new(KeepMajorityStrategy, Duration::from_millis(0));
140        let r = rt.tick(&s, Instant::now());
141        match r {
142            SbrAction::DownUnreachable(addrs) => {
143                assert_eq!(addrs, vec!["akka://c".to_string()]);
144            }
145            other => panic!("expected DownUnreachable, got {other:?}"),
146        }
147    }
148
149    #[test]
150    fn resets_clock_when_partition_heals() {
151        let mut s = MembershipState::new();
152        s.add_or_update(member("a", MemberStatus::Up));
153        s.add_or_update(member("b", MemberStatus::Up));
154        s.reachability.unreachable(Address::local("a"), Address::local("b"));
155        let mut rt = SbrRuntime::new(KeepMajorityStrategy, Duration::from_secs(60));
156        let now = Instant::now();
157        let _ = rt.tick(&s, now);
158        // Heal partition.
159        s.reachability.reachable(Address::local("a"), Address::local("b"));
160        let r = rt.tick(&s, now + Duration::from_secs(1));
161        assert_eq!(r, SbrAction::None);
162        assert!(rt.is_stable(now + Duration::from_secs(1)));
163    }
164}