atomr_cluster/
sbr_runtime.rs1use std::time::{Duration, Instant};
12
13use crate::member::{Member, MemberStatus};
14use crate::membership::MembershipState;
15use crate::sbr::{DowningDecision, DowningStrategy};
16
17#[derive(Debug, Clone, PartialEq, Eq)]
19#[non_exhaustive]
20pub enum SbrAction {
21 None,
23 DownUnreachable(Vec<String>),
25 DownAll(Vec<String>),
27 DownSelf,
29}
30
31pub struct SbrRuntime<S: DowningStrategy> {
33 strategy: S,
34 stable_after: Duration,
35 unstable_since: Option<Instant>,
38}
39
40impl<S: DowningStrategy> SbrRuntime<S> {
41 pub fn new(strategy: S, stable_after: Duration) -> Self {
42 Self { strategy, stable_after, unstable_since: None }
43 }
44
45 pub fn tick(&mut self, state: &MembershipState, now: Instant) -> SbrAction {
48 let mut reachable: Vec<&Member> = Vec::new();
50 let mut unreachable: Vec<&Member> = Vec::new();
51 for m in &state.members {
52 if matches!(m.status, MemberStatus::Down | MemberStatus::Removed) {
53 continue;
54 }
55 if state.reachability.is_reachable(&m.address) {
56 reachable.push(m);
57 } else {
58 unreachable.push(m);
59 }
60 }
61
62 if unreachable.is_empty() {
63 self.unstable_since = None;
65 return SbrAction::None;
66 }
67
68 let since = *self.unstable_since.get_or_insert(now);
70 if now.duration_since(since) < self.stable_after {
71 return SbrAction::None;
72 }
73
74 match self.strategy.decide(&reachable, &unreachable) {
75 DowningDecision::Stay => SbrAction::None,
76 DowningDecision::DownUnreachable => {
77 SbrAction::DownUnreachable(unreachable.iter().map(|m| m.address.to_string()).collect())
78 }
79 DowningDecision::DownAll => {
80 SbrAction::DownAll(state.members.iter().map(|m| m.address.to_string()).collect())
81 }
82 DowningDecision::DownSelf => SbrAction::DownSelf,
83 }
84 }
85
86 pub fn is_stable(&self, now: Instant) -> bool {
89 match self.unstable_since {
90 Some(t) => now.duration_since(t) >= self.stable_after,
91 None => true, }
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99 use crate::sbr::KeepMajorityStrategy;
100 use atomr_core::actor::Address;
101
102 fn member(addr: &str, status: MemberStatus) -> Member {
103 let mut m = Member::new(Address::local(addr), vec![]);
104 m.status = status;
105 m
106 }
107
108 #[test]
109 fn none_when_no_partition() {
110 let mut s = MembershipState::new();
111 s.add_or_update(member("a", MemberStatus::Up));
112 s.add_or_update(member("b", MemberStatus::Up));
113 let mut rt = SbrRuntime::new(KeepMajorityStrategy, Duration::from_millis(0));
114 let r = rt.tick(&s, Instant::now());
115 assert_eq!(r, SbrAction::None);
116 assert!(rt.is_stable(Instant::now()));
117 }
118
119 #[test]
120 fn waits_for_stability_window() {
121 let mut s = MembershipState::new();
122 s.add_or_update(member("a", MemberStatus::Up));
123 s.add_or_update(member("b", MemberStatus::Up));
124 s.add_or_update(member("c", MemberStatus::Up));
125 s.reachability.unreachable(Address::local("b"), Address::local("c"));
126 let mut rt = SbrRuntime::new(KeepMajorityStrategy, Duration::from_secs(60));
127 let now = Instant::now();
128 assert_eq!(rt.tick(&s, now), SbrAction::None);
130 assert!(!rt.is_stable(now));
131 }
132
133 #[test]
134 fn fires_after_stability_window_with_majority() {
135 let mut s = MembershipState::new();
136 s.add_or_update(member("a", MemberStatus::Up));
137 s.add_or_update(member("b", MemberStatus::Up));
138 s.add_or_update(member("c", MemberStatus::Up));
139 s.reachability.unreachable(Address::local("b"), Address::local("c"));
141 let mut rt = SbrRuntime::new(KeepMajorityStrategy, Duration::from_millis(0));
142 let r = rt.tick(&s, Instant::now());
143 match r {
144 SbrAction::DownUnreachable(addrs) => {
145 assert_eq!(addrs, vec!["akka://c".to_string()]);
146 }
147 other => panic!("expected DownUnreachable, got {other:?}"),
148 }
149 }
150
151 #[test]
152 fn resets_clock_when_partition_heals() {
153 let mut s = MembershipState::new();
154 s.add_or_update(member("a", MemberStatus::Up));
155 s.add_or_update(member("b", MemberStatus::Up));
156 s.reachability.unreachable(Address::local("a"), Address::local("b"));
157 let mut rt = SbrRuntime::new(KeepMajorityStrategy, Duration::from_secs(60));
158 let now = Instant::now();
159 let _ = rt.tick(&s, now);
160 s.reachability.reachable(Address::local("a"), Address::local("b"));
162 let r = rt.tick(&s, now + Duration::from_secs(1));
163 assert_eq!(r, SbrAction::None);
164 assert!(rt.is_stable(now + Duration::from_secs(1)));
165 }
166}