1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#[macro_use]
extern crate log;
use std::collections::VecDeque;
#[derive(Clone,Debug)]
pub struct PhiFailureDetector {
min_stddev: f64,
history_size: usize,
buf: VecDeque<u64>,
}
impl PhiFailureDetector {
pub fn new() -> PhiFailureDetector {
PhiFailureDetector {
min_stddev: 0.01,
history_size: 10,
buf: VecDeque::new(),
}
}
pub fn min_stddev(self, min_stddev: f64) -> PhiFailureDetector {
assert!(min_stddev > 0.0, "min_stddev must be > 0.0");
PhiFailureDetector { min_stddev: min_stddev, ..self }
}
pub fn history_size(self, count: usize) -> PhiFailureDetector {
assert!(count > 0, "history_size must > 0");
PhiFailureDetector { history_size: count, ..self }
}
pub fn heartbeat(&mut self, t: u64) {
self.buf.push_back(t);
if self.buf.len() > self.history_size {
let _ = self.buf.pop_front();
}
}
pub fn phi(&mut self, now: u64) -> f64 {
if let Some(&prev_time) = self.buf.back() {
let p_later = self.p_later(now - prev_time);
trace!("diff: {:?}; p_later:{:?}", now - prev_time, p_later);
-p_later.log10()
} else {
0.0
}
}
fn p_later(&self, diff: u64) -> f64 {
let deltasum = if let (Some(&front), Some(&back)) = (self.buf.front(), self.buf.back()) {
back - front
} else {
0
};
let nitems = self.buf.len() - 1;
let mean = nitems as f64 / deltasum as f64;
let variance = self.buf
.iter()
.zip(self.buf.iter().skip(1))
.map(|(&a, &b)| b - a)
.map(|i| (mean - i as f64).powi(2))
.fold(0_f64, |acc, i| acc + i) / nitems as f64;
let stddev = variance.sqrt().max(self.min_stddev);
let y = (diff as f64 - mean) / stddev;
let e = (-y * (1.5976 + 0.070566 * y * y)).exp();
let cdf = if diff as f64 > mean {
e / (1.0 + e)
} else {
1.0 - 1.0 / (1.0 + e)
};
trace!("diff:{:?}; mean:{:?}; stddev:{:?}; y:{:?}; e:{:?}; cdf:{:?}",
diff,
mean,
stddev,
y,
e,
cdf);
cdf
}
}
#[cfg(test)]
mod tests {
extern crate env_logger;
use super::PhiFailureDetector;
#[test]
fn should_fail_when_no_heartbeats() {
env_logger::init().unwrap_or(());
let mut detector = PhiFailureDetector::new();
for t in 0..100 {
detector.heartbeat(t);
let phi = detector.phi(t);
debug!("at:{:?}, phi:{:?}; det: {:?}", t, phi, detector);
if t > 10 {
assert!(phi < 1.0);
}
}
for t in 100..110 {
let phi = detector.phi(t);
debug!("at:{:?}, phi:{:?}; det: {:?}", t, phi, detector);
}
for &t in &[110, 200, 300] {
let phi = detector.phi(t);
debug!("at:{:?}, phi:{:?}; det: {:?}", t, phi, detector);
assert!(phi > 1.0);
}
}
#[test]
fn should_recover() {
env_logger::init().unwrap_or(());
let mut detector = PhiFailureDetector::new().history_size(3);
for t in 0..10 {
detector.heartbeat(t);
let phi = detector.phi(t);
debug!("at:{:?}, phi:{:?}; det: {:?}", t, phi, detector);
}
for t in 20..30 {
detector.heartbeat(t);
let phi = detector.phi(t);
debug!("at:{:?}, phi:{:?}; det: {:?}", t, phi, detector);
if t > 10 {
assert!(phi < 1.0);
}
}
}
}