phi_detector/lib.rs
1//! This is an implementation of Phi Accrual Failure Detector.
2//!
3//! To reduce the memory footprint, pings or intervals aren't actually stored
4//! but only two values to calculate normal distribution are maintained.
5//! This not only reduces the memory footprint to the constant value
6//! but also the computational cost for each ping down to constant.
7//!
8//! Why does the memory footprint matter? Think about your application communicates
9//! with thousand of remote servers and you want to maintain failure detector for each server.
10//! Apparently, it is too expensive to cost 100MB for a failure detector per server.
11//!
12//! ```should_panic
13//! use phi_detector::PingWindow;
14//! use std::time::Duration;
15//!
16//! // Create a window with a interval 100ms.
17//! let mut window = PingWindow::new(Duration::from_millis(100));
18//!
19//! window.add_ping(Duration::from_millis(150));
20//! window.add_ping(Duration::from_millis(80));
21//! // Now the window has intervals [100ms, 150ms, 80ms]. Average is 110ms.
22//!
23//! let normal_dist = window.normal_dist();
24//! // If the server is down for 5s, it should be failure.
25//! let phi = normal_dist.phi(Duration::from_millis(5000));
26//! if phi > 12. {
27//! panic!("The server is down");
28//! }
29//! ```
30
31use std::time::Duration;
32
33fn phi_from_prob(x: f64) -> f64 {
34 -f64::log10(x)
35}
36
37/// Set of recent N ping intervals.
38pub struct PingWindow {
39 n: usize,
40 sum: f64,
41 sum2: f64,
42}
43impl PingWindow {
44 /// Create a new instance with one reference interval.
45 pub fn new(interval: Duration) -> Self {
46 let sum = interval.as_millis() as f64;
47 Self {
48 n: 1,
49 sum,
50 sum2: 0.,
51 }
52 }
53
54 /// Add a new ping to the window.
55 pub fn add_ping(&mut self, du: Duration) {
56 // Window size too large is found meaningless in experiment.
57 // not only that, may harm by counting in old values. (e.g. latency change, overflow)
58 // the experiment shows the error rate saturate around n=10000.
59 if self.n == 10000 {
60 self.sum = self.sum / self.n as f64 * (self.n - 1) as f64;
61 // suppose each value has equal contribution to the variance.
62 self.sum2 = self.sum2 / self.n as f64 * (self.n - 1) as f64;
63 self.n -= 1;
64 }
65 let v = du.as_millis() as f64;
66 self.sum += v;
67 self.n += 1;
68 let mu = self.sum / self.n as f64;
69 self.sum2 += (v - mu) * (v - mu);
70 }
71
72 /// Make the current normal distribution based on the ping history.
73 pub fn normal_dist(&self) -> NormalDist {
74 let n = self.n;
75 let mu = self.sum / n as f64;
76 let sigma = f64::sqrt(self.sum2 / n as f64);
77 NormalDist { mu, sigma }
78 }
79}
80
81/// Normal distribution from the ping history.
82pub struct NormalDist {
83 mu: f64,
84 sigma: f64,
85}
86impl NormalDist {
87 /// Mean
88 pub fn mu(&self) -> Duration {
89 Duration::from_millis(self.mu as u64)
90 }
91
92 /// Standard diviation
93 pub fn sigma(&self) -> Duration {
94 Duration::from_millis(self.sigma as u64)
95 }
96
97 /// Calculate integral [x, inf]
98 /// This is a monotonically decreasing function.
99 fn integral(&self, x: f64) -> f64 {
100 // Any small sigma rounds up to 1ms which doesn't affect the behavior
101 // because it is enough small as network latency.
102 let sigma = if self.sigma < 1. { 1. } else { self.sigma };
103 let y = (x - self.mu) / sigma;
104 let e = f64::exp(-y * (1.5976 + 0.070566 * y * y));
105 let out = if x > self.mu {
106 e / (1. + e)
107 } else {
108 1. - 1. / (1. + e)
109 };
110
111 assert!(0. <= out && out <= 1.);
112 out
113 }
114
115 /// Calculate the phi from the current normal distribution
116 /// and the duration from the last ping.
117 pub fn phi(&self, elapsed: Duration) -> f64 {
118 let x = elapsed.as_millis() as f64;
119 let y = self.integral(x);
120 phi_from_prob(y)
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127
128 use std::time::Instant;
129
130 #[tokio::test]
131 async fn test_phi_detector() {
132 let mut window = PingWindow::new(Duration::from_secs(5));
133 for _ in 0..100 {
134 window.add_ping(Duration::from_millis(100));
135 }
136 let last_ping = Instant::now();
137 loop {
138 let dist = window.normal_dist();
139
140 let phi = dist.phi(Instant::now() - last_ping);
141 dbg!(phi);
142 if phi > 10. {
143 break;
144 }
145 tokio::time::sleep(Duration::from_millis(10)).await;
146 }
147 }
148
149 #[test]
150 fn test_values() {
151 let window = PingWindow::new(Duration::from_secs(5));
152 let dist = window.normal_dist();
153 dbg!(dist.mu());
154 dbg!(dist.sigma());
155 }
156
157 use proptest::prelude::*;
158 proptest! {
159 #![proptest_config(ProptestConfig::with_cases(100000))]
160 #[test]
161 fn test_integral(mu: f64, sigma: f64, x: f64) {
162 let dist = NormalDist { mu, sigma };
163 let y = dist.integral(x);
164 assert!(0. <= y && y <= 1.);
165 }
166 }
167}