1#[macro_use]
2extern crate log;
3
4use std::collections::VecDeque;
5
6
7#[derive(Clone,Debug)]
8pub struct PhiFailureDetector {
9 min_stddev: f64,
10 history_size: usize,
11 buf: VecDeque<u64>,
12}
13
14impl PhiFailureDetector {
15 pub fn new() -> PhiFailureDetector {
16 PhiFailureDetector {
17 min_stddev: 0.01,
18 history_size: 10,
19 buf: VecDeque::new(),
20 }
21 }
22 pub fn min_stddev(self, min_stddev: f64) -> PhiFailureDetector {
23 assert!(min_stddev > 0.0, "min_stddev must be > 0.0");
24 PhiFailureDetector { min_stddev: min_stddev, ..self }
25 }
26
27 pub fn history_size(self, count: usize) -> PhiFailureDetector {
28 assert!(count > 0, "history_size must > 0");
29 PhiFailureDetector { history_size: count, ..self }
30 }
31 pub fn heartbeat(&mut self, t: u64) {
32 self.buf.push_back(t);
33 if self.buf.len() > self.history_size {
34 let _ = self.buf.pop_front();
35 }
36 }
37
38 pub fn phi(&mut self, now: u64) -> f64 {
40 if let Some(&prev_time) = self.buf.back() {
41 let p_later = self.p_later(now - prev_time);
42 trace!("diff: {:?}; p_later:{:?}", now - prev_time, p_later);
43 -p_later.log10()
44 } else {
45 0.0
46 }
47 }
48
49 fn p_later(&self, diff: u64) -> f64 {
50 let deltasum = if let (Some(&front), Some(&back)) = (self.buf.front(), self.buf.back()) {
51 back - front
52 } else {
53 0
54 };
55 let nitems = self.buf.len() - 1;
56 let mean = nitems as f64 / deltasum as f64;
57 let variance = self.buf
58 .iter()
59 .zip(self.buf.iter().skip(1))
60 .map(|(&a, &b)| b - a)
61 .map(|i| (mean - i as f64).powi(2))
62 .fold(0_f64, |acc, i| acc + i) / nitems as f64;
63
64 let stddev = variance.sqrt().max(self.min_stddev);
65 let y = (diff as f64 - mean) / stddev;
66 let e = (-y * (1.5976 + 0.070566 * y * y)).exp();
67 let cdf = if diff as f64 > mean {
68 e / (1.0 + e)
69 } else {
70 1.0 - 1.0 / (1.0 + e)
71 };
72 trace!("diff:{:?}; mean:{:?}; stddev:{:?}; y:{:?}; e:{:?}; cdf:{:?}",
73 diff,
74 mean,
75 stddev,
76 y,
77 e,
78 cdf);
79 cdf
80 }
81}
82
83#[cfg(test)]
84mod tests {
85 extern crate env_logger;
86 use super::PhiFailureDetector;
87 #[test]
88 fn should_fail_when_no_heartbeats() {
89 env_logger::init().unwrap_or(());
90 let mut detector = PhiFailureDetector::new();
91 for t in 0..100 {
92 detector.heartbeat(t);
93 let phi = detector.phi(t);
94 debug!("at:{:?}, phi:{:?}; det: {:?}", t, phi, detector);
95 if t > 10 {
96 assert!(phi < 1.0);
97 }
98 }
99 for t in 100..110 {
100 let phi = detector.phi(t);
101 debug!("at:{:?}, phi:{:?}; det: {:?}", t, phi, detector);
102 }
103 for &t in &[110, 200, 300] {
104 let phi = detector.phi(t);
105 debug!("at:{:?}, phi:{:?}; det: {:?}", t, phi, detector);
106 assert!(phi > 1.0);
107 }
108 }
109
110 #[test]
111 fn should_recover() {
112 env_logger::init().unwrap_or(());
113 let mut detector = PhiFailureDetector::new().history_size(3);
114 for t in 0..10 {
115 detector.heartbeat(t);
116 let phi = detector.phi(t);
117 debug!("at:{:?}, phi:{:?}; det: {:?}", t, phi, detector);
118 }
119 for t in 20..30 {
120 detector.heartbeat(t);
121 let phi = detector.phi(t);
122 debug!("at:{:?}, phi:{:?}; det: {:?}", t, phi, detector);
123 if t > 10 {
124 assert!(phi < 1.0);
125 }
126 }
127 }
128}