Skip to main content

atomr_remote/
phi_accrual.rs

1//! Phi accrual failure detector. Straight port of the math from
2//! which itself ports
3//! Hayashibara's algorithm.
4
5use std::collections::VecDeque;
6use std::time::{Duration, Instant};
7
8use parking_lot::Mutex;
9
10use crate::failure_detector::FailureDetector;
11
12pub struct PhiAccrualFailureDetector {
13    threshold: f64,
14    max_samples: usize,
15    min_std_deviation: Duration,
16    acceptable_heartbeat_pause: Duration,
17    first_heartbeat_estimate: Duration,
18    inner: Mutex<Inner>,
19}
20
21struct Inner {
22    history: VecDeque<f64>, // intervals in ms
23    last_heartbeat: Option<Instant>,
24}
25
26impl PhiAccrualFailureDetector {
27    pub fn new(
28        threshold: f64,
29        max_samples: usize,
30        min_std_deviation: Duration,
31        acceptable_heartbeat_pause: Duration,
32        first_heartbeat_estimate: Duration,
33    ) -> Self {
34        Self {
35            threshold,
36            max_samples,
37            min_std_deviation,
38            acceptable_heartbeat_pause,
39            first_heartbeat_estimate,
40            inner: Mutex::new(Inner { history: VecDeque::new(), last_heartbeat: None }),
41        }
42    }
43
44    pub fn phi(&self) -> f64 {
45        let i = self.inner.lock();
46        let Some(last) = i.last_heartbeat else { return 0.0 };
47        let time_diff_ms = last.elapsed().as_millis() as f64;
48        let (mean, std_dev) = if i.history.is_empty() {
49            let m = self.first_heartbeat_estimate.as_millis() as f64;
50            (m, (m * 0.25).max(self.min_std_deviation.as_millis() as f64))
51        } else {
52            let n = i.history.len() as f64;
53            let mean = i.history.iter().sum::<f64>() / n;
54            let var = i.history.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / n;
55            (mean, var.sqrt().max(self.min_std_deviation.as_millis() as f64))
56        };
57        let adjusted = time_diff_ms - mean - self.acceptable_heartbeat_pause.as_millis() as f64;
58        let y = adjusted / std_dev;
59        let e = (-y * (1.5976 + 0.070566 * y * y)).exp();
60        if adjusted > 0.0 {
61            -(e / (1.0 + e)).log10()
62        } else {
63            -(1.0 - 1.0 / (1.0 + e)).log10()
64        }
65    }
66}
67
68impl FailureDetector for PhiAccrualFailureDetector {
69    fn is_available(&self) -> bool {
70        self.phi() < self.threshold
71    }
72
73    fn is_monitoring(&self) -> bool {
74        self.inner.lock().last_heartbeat.is_some()
75    }
76
77    fn heartbeat(&self) {
78        let now = Instant::now();
79        let mut i = self.inner.lock();
80        if let Some(prev) = i.last_heartbeat {
81            let diff = now.duration_since(prev).as_millis() as f64;
82            i.history.push_back(diff);
83            if i.history.len() > self.max_samples {
84                i.history.pop_front();
85            }
86        }
87        i.last_heartbeat = Some(now);
88    }
89
90    fn reset(&self) {
91        let mut i = self.inner.lock();
92        i.history.clear();
93        i.last_heartbeat = None;
94    }
95
96    fn since_last_heartbeat(&self) -> Option<Duration> {
97        self.inner.lock().last_heartbeat.map(|t| t.elapsed())
98    }
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104
105    #[test]
106    fn available_after_recent_heartbeats() {
107        let d = PhiAccrualFailureDetector::new(
108            8.0,
109            100,
110            Duration::from_millis(100),
111            Duration::from_secs(3),
112            Duration::from_secs(1),
113        );
114        for _ in 0..5 {
115            d.heartbeat();
116            std::thread::sleep(Duration::from_millis(5));
117        }
118        assert!(d.is_available());
119        assert!(d.is_monitoring());
120    }
121}