atomr_remote/
phi_accrual.rs1use std::collections::VecDeque;
6use std::time::{Duration, Instant};
7
8use parking_lot::Mutex;
9
10use crate::failure_detector::FailureDetector;
11
12pub struct PhiAccrualFailureDetector {
13 threshold: f64,
14 max_samples: usize,
15 min_std_deviation: Duration,
16 acceptable_heartbeat_pause: Duration,
17 first_heartbeat_estimate: Duration,
18 inner: Mutex<Inner>,
19}
20
21struct Inner {
22 history: VecDeque<f64>, last_heartbeat: Option<Instant>,
24}
25
26impl PhiAccrualFailureDetector {
27 pub fn new(
28 threshold: f64,
29 max_samples: usize,
30 min_std_deviation: Duration,
31 acceptable_heartbeat_pause: Duration,
32 first_heartbeat_estimate: Duration,
33 ) -> Self {
34 Self {
35 threshold,
36 max_samples,
37 min_std_deviation,
38 acceptable_heartbeat_pause,
39 first_heartbeat_estimate,
40 inner: Mutex::new(Inner { history: VecDeque::new(), last_heartbeat: None }),
41 }
42 }
43
44 pub fn phi(&self) -> f64 {
45 let i = self.inner.lock();
46 let Some(last) = i.last_heartbeat else { return 0.0 };
47 let time_diff_ms = last.elapsed().as_millis() as f64;
48 let (mean, std_dev) = if i.history.is_empty() {
49 let m = self.first_heartbeat_estimate.as_millis() as f64;
50 (m, (m * 0.25).max(self.min_std_deviation.as_millis() as f64))
51 } else {
52 let n = i.history.len() as f64;
53 let mean = i.history.iter().sum::<f64>() / n;
54 let var = i.history.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / n;
55 (mean, var.sqrt().max(self.min_std_deviation.as_millis() as f64))
56 };
57 let adjusted = time_diff_ms - mean - self.acceptable_heartbeat_pause.as_millis() as f64;
58 let y = adjusted / std_dev;
59 let e = (-y * (1.5976 + 0.070566 * y * y)).exp();
60 if adjusted > 0.0 {
61 -(e / (1.0 + e)).log10()
62 } else {
63 -(1.0 - 1.0 / (1.0 + e)).log10()
64 }
65 }
66}
67
68impl FailureDetector for PhiAccrualFailureDetector {
69 fn is_available(&self) -> bool {
70 self.phi() < self.threshold
71 }
72
73 fn is_monitoring(&self) -> bool {
74 self.inner.lock().last_heartbeat.is_some()
75 }
76
77 fn heartbeat(&self) {
78 let now = Instant::now();
79 let mut i = self.inner.lock();
80 if let Some(prev) = i.last_heartbeat {
81 let diff = now.duration_since(prev).as_millis() as f64;
82 i.history.push_back(diff);
83 if i.history.len() > self.max_samples {
84 i.history.pop_front();
85 }
86 }
87 i.last_heartbeat = Some(now);
88 }
89
90 fn reset(&self) {
91 let mut i = self.inner.lock();
92 i.history.clear();
93 i.last_heartbeat = None;
94 }
95
96 fn since_last_heartbeat(&self) -> Option<Duration> {
97 self.inner.lock().last_heartbeat.map(|t| t.elapsed())
98 }
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104
105 #[test]
106 fn available_after_recent_heartbeats() {
107 let d = PhiAccrualFailureDetector::new(
108 8.0,
109 100,
110 Duration::from_millis(100),
111 Duration::from_secs(3),
112 Duration::from_secs(1),
113 );
114 for _ in 0..5 {
115 d.heartbeat();
116 std::thread::sleep(Duration::from_millis(5));
117 }
118 assert!(d.is_available());
119 assert!(d.is_monitoring());
120 }
121}