phi_accrual/
lib.rs

1#[macro_use]
2extern crate log;
3
4use std::collections::VecDeque;
5
6
7#[derive(Clone,Debug)]
8pub struct PhiFailureDetector {
9    min_stddev: f64,
10    history_size: usize,
11    buf: VecDeque<u64>,
12}
13
14impl PhiFailureDetector {
15    pub fn new() -> PhiFailureDetector {
16        PhiFailureDetector {
17            min_stddev: 0.01,
18            history_size: 10,
19            buf: VecDeque::new(),
20        }
21    }
22    pub fn min_stddev(self, min_stddev: f64) -> PhiFailureDetector {
23        assert!(min_stddev > 0.0, "min_stddev must be > 0.0");
24        PhiFailureDetector { min_stddev: min_stddev, ..self }
25    }
26
27    pub fn history_size(self, count: usize) -> PhiFailureDetector {
28        assert!(count > 0, "history_size must > 0");
29        PhiFailureDetector { history_size: count, ..self }
30    }
31    pub fn heartbeat(&mut self, t: u64) {
32        self.buf.push_back(t);
33        if self.buf.len() > self.history_size {
34            let _ = self.buf.pop_front();
35        }
36    }
37
38    /// def ϕ(Tnow ) = − log10(Plater (Tnow − Tlast))
39    pub fn phi(&mut self, now: u64) -> f64 {
40        if let Some(&prev_time) = self.buf.back() {
41            let p_later = self.p_later(now - prev_time);
42            trace!("diff: {:?}; p_later:{:?}", now - prev_time, p_later);
43            -p_later.log10()
44        } else {
45            0.0
46        }
47    }
48
49    fn p_later(&self, diff: u64) -> f64 {
50        let deltasum = if let (Some(&front), Some(&back)) = (self.buf.front(), self.buf.back()) {
51            back - front
52        } else {
53            0
54        };
55        let nitems = self.buf.len() - 1;
56        let mean = nitems as f64 / deltasum as f64;
57        let variance = self.buf
58                           .iter()
59                           .zip(self.buf.iter().skip(1))
60                           .map(|(&a, &b)| b - a)
61                           .map(|i| (mean - i as f64).powi(2))
62                           .fold(0_f64, |acc, i| acc + i) / nitems as f64;
63
64        let stddev = variance.sqrt().max(self.min_stddev);
65        let y = (diff as f64 - mean) / stddev;
66        let e = (-y * (1.5976 + 0.070566 * y * y)).exp();
67        let cdf = if diff as f64 > mean {
68            e / (1.0 + e)
69        } else {
70            1.0 - 1.0 / (1.0 + e)
71        };
72        trace!("diff:{:?}; mean:{:?}; stddev:{:?}; y:{:?}; e:{:?}; cdf:{:?}",
73               diff,
74               mean,
75               stddev,
76               y,
77               e,
78               cdf);
79        cdf
80    }
81}
82
83#[cfg(test)]
84mod tests {
85    extern crate env_logger;
86    use super::PhiFailureDetector;
87    #[test]
88    fn should_fail_when_no_heartbeats() {
89        env_logger::init().unwrap_or(());
90        let mut detector = PhiFailureDetector::new();
91        for t in 0..100 {
92            detector.heartbeat(t);
93            let phi = detector.phi(t);
94            debug!("at:{:?}, phi:{:?}; det: {:?}", t, phi, detector);
95            if t > 10 {
96                assert!(phi < 1.0);
97            }
98        }
99        for t in 100..110 {
100            let phi = detector.phi(t);
101            debug!("at:{:?}, phi:{:?}; det: {:?}", t, phi, detector);
102        }
103        for &t in &[110, 200, 300] {
104            let phi = detector.phi(t);
105            debug!("at:{:?}, phi:{:?}; det: {:?}", t, phi, detector);
106            assert!(phi > 1.0);
107        }
108    }
109
110    #[test]
111    fn should_recover() {
112        env_logger::init().unwrap_or(());
113        let mut detector = PhiFailureDetector::new().history_size(3);
114        for t in 0..10 {
115            detector.heartbeat(t);
116            let phi = detector.phi(t);
117            debug!("at:{:?}, phi:{:?}; det: {:?}", t, phi, detector);
118        }
119        for t in 20..30 {
120            detector.heartbeat(t);
121            let phi = detector.phi(t);
122            debug!("at:{:?}, phi:{:?}; det: {:?}", t, phi, detector);
123            if t > 10 {
124                assert!(phi < 1.0);
125            }
126        }
127    }
128}