atomr_cluster/
sbr_runtime.rs1use std::time::{Duration, Instant};
10
11use crate::member::{Member, MemberStatus};
12use crate::membership::MembershipState;
13use crate::sbr::{DowningDecision, DowningStrategy};
14
15#[derive(Debug, Clone, PartialEq, Eq)]
17#[non_exhaustive]
18pub enum SbrAction {
19 None,
21 DownUnreachable(Vec<String>),
23 DownAll(Vec<String>),
25 DownSelf,
27}
28
29pub struct SbrRuntime<S: DowningStrategy> {
31 strategy: S,
32 stable_after: Duration,
33 unstable_since: Option<Instant>,
36}
37
38impl<S: DowningStrategy> SbrRuntime<S> {
39 pub fn new(strategy: S, stable_after: Duration) -> Self {
40 Self { strategy, stable_after, unstable_since: None }
41 }
42
43 pub fn tick(&mut self, state: &MembershipState, now: Instant) -> SbrAction {
46 let mut reachable: Vec<&Member> = Vec::new();
48 let mut unreachable: Vec<&Member> = Vec::new();
49 for m in &state.members {
50 if matches!(m.status, MemberStatus::Down | MemberStatus::Removed) {
51 continue;
52 }
53 if state.reachability.is_reachable(&m.address) {
54 reachable.push(m);
55 } else {
56 unreachable.push(m);
57 }
58 }
59
60 if unreachable.is_empty() {
61 self.unstable_since = None;
63 return SbrAction::None;
64 }
65
66 let since = *self.unstable_since.get_or_insert(now);
68 if now.duration_since(since) < self.stable_after {
69 return SbrAction::None;
70 }
71
72 match self.strategy.decide(&reachable, &unreachable) {
73 DowningDecision::Stay => SbrAction::None,
74 DowningDecision::DownUnreachable => {
75 SbrAction::DownUnreachable(unreachable.iter().map(|m| m.address.to_string()).collect())
76 }
77 DowningDecision::DownAll => {
78 SbrAction::DownAll(state.members.iter().map(|m| m.address.to_string()).collect())
79 }
80 DowningDecision::DownSelf => SbrAction::DownSelf,
81 }
82 }
83
84 pub fn is_stable(&self, now: Instant) -> bool {
87 match self.unstable_since {
88 Some(t) => now.duration_since(t) >= self.stable_after,
89 None => true, }
91 }
92}
93
94#[cfg(test)]
95mod tests {
96 use super::*;
97 use crate::sbr::KeepMajorityStrategy;
98 use atomr_core::actor::Address;
99
100 fn member(addr: &str, status: MemberStatus) -> Member {
101 let mut m = Member::new(Address::local(addr), vec![]);
102 m.status = status;
103 m
104 }
105
106 #[test]
107 fn none_when_no_partition() {
108 let mut s = MembershipState::new();
109 s.add_or_update(member("a", MemberStatus::Up));
110 s.add_or_update(member("b", MemberStatus::Up));
111 let mut rt = SbrRuntime::new(KeepMajorityStrategy, Duration::from_millis(0));
112 let r = rt.tick(&s, Instant::now());
113 assert_eq!(r, SbrAction::None);
114 assert!(rt.is_stable(Instant::now()));
115 }
116
117 #[test]
118 fn waits_for_stability_window() {
119 let mut s = MembershipState::new();
120 s.add_or_update(member("a", MemberStatus::Up));
121 s.add_or_update(member("b", MemberStatus::Up));
122 s.add_or_update(member("c", MemberStatus::Up));
123 s.reachability.unreachable(Address::local("b"), Address::local("c"));
124 let mut rt = SbrRuntime::new(KeepMajorityStrategy, Duration::from_secs(60));
125 let now = Instant::now();
126 assert_eq!(rt.tick(&s, now), SbrAction::None);
128 assert!(!rt.is_stable(now));
129 }
130
131 #[test]
132 fn fires_after_stability_window_with_majority() {
133 let mut s = MembershipState::new();
134 s.add_or_update(member("a", MemberStatus::Up));
135 s.add_or_update(member("b", MemberStatus::Up));
136 s.add_or_update(member("c", MemberStatus::Up));
137 s.reachability.unreachable(Address::local("b"), Address::local("c"));
139 let mut rt = SbrRuntime::new(KeepMajorityStrategy, Duration::from_millis(0));
140 let r = rt.tick(&s, Instant::now());
141 match r {
142 SbrAction::DownUnreachable(addrs) => {
143 assert_eq!(addrs, vec!["akka://c".to_string()]);
144 }
145 other => panic!("expected DownUnreachable, got {other:?}"),
146 }
147 }
148
149 #[test]
150 fn resets_clock_when_partition_heals() {
151 let mut s = MembershipState::new();
152 s.add_or_update(member("a", MemberStatus::Up));
153 s.add_or_update(member("b", MemberStatus::Up));
154 s.reachability.unreachable(Address::local("a"), Address::local("b"));
155 let mut rt = SbrRuntime::new(KeepMajorityStrategy, Duration::from_secs(60));
156 let now = Instant::now();
157 let _ = rt.tick(&s, now);
158 s.reachability.reachable(Address::local("a"), Address::local("b"));
160 let r = rt.tick(&s, now + Duration::from_secs(1));
161 assert_eq!(r, SbrAction::None);
162 assert!(rt.is_stable(now + Duration::from_secs(1)));
163 }
164}