phi_detector/
lib.rs

1//! This is an implementation of Phi Accrual Failure Detector.
2//!
3//! To reduce the memory footprint, pings or intervals aren't actually stored
4//! but only two values to calculate normal distribution are maintained.
5//! This not only reduces the memory footprint to the constant value
6//! but also the computational cost for each ping down to constant.
7//!
8//! Why does the memory footprint matter? Think about your application communicates
9//! with thousand of remote servers and you want to maintain failure detector for each server.
10//! Apparently, it is too expensive to cost 100MB for a failure detector per server.
11//!
12//! ```should_panic
13//! use phi_detector::PingWindow;
14//! use std::time::Duration;
15//!
16//! // Create a window with a interval 100ms.
17//! let mut window = PingWindow::new(Duration::from_millis(100));
18//!
19//! window.add_ping(Duration::from_millis(150));
20//! window.add_ping(Duration::from_millis(80));
21//! // Now the window has intervals [100ms, 150ms, 80ms]. Average is 110ms.
22//!
23//! let normal_dist = window.normal_dist();
24//! // If the server is down for 5s, it should be failure.
25//! let phi = normal_dist.phi(Duration::from_millis(5000));
26//! if phi > 12. {
27//!     panic!("The server is down");
28//! }
29//! ```
30
31use std::time::Duration;
32
33fn phi_from_prob(x: f64) -> f64 {
34    -f64::log10(x)
35}
36
37/// Set of recent N ping intervals.
38pub struct PingWindow {
39    n: usize,
40    sum: f64,
41    sum2: f64,
42}
43impl PingWindow {
44    /// Create a new instance with one reference interval.
45    pub fn new(interval: Duration) -> Self {
46        let sum = interval.as_millis() as f64;
47        Self {
48            n: 1,
49            sum,
50            sum2: 0.,
51        }
52    }
53
54    /// Add a new ping to the window.
55    pub fn add_ping(&mut self, du: Duration) {
56        // Window size too large is found meaningless in experiment.
57        // not only that, may harm by counting in old values. (e.g. latency change, overflow)
58        // the experiment shows the error rate saturate around n=10000.
59        if self.n == 10000 {
60            self.sum = self.sum / self.n as f64 * (self.n - 1) as f64;
61            // suppose each value has equal contribution to the variance.
62            self.sum2 = self.sum2 / self.n as f64 * (self.n - 1) as f64;
63            self.n -= 1;
64        }
65        let v = du.as_millis() as f64;
66        self.sum += v;
67        self.n += 1;
68        let mu = self.sum / self.n as f64;
69        self.sum2 += (v - mu) * (v - mu);
70    }
71
72    /// Make the current normal distribution based on the ping history.
73    pub fn normal_dist(&self) -> NormalDist {
74        let n = self.n;
75        let mu = self.sum / n as f64;
76        let sigma = f64::sqrt(self.sum2 / n as f64);
77        NormalDist { mu, sigma }
78    }
79}
80
81/// Normal distribution from the ping history.
82pub struct NormalDist {
83    mu: f64,
84    sigma: f64,
85}
86impl NormalDist {
87    /// Mean
88    pub fn mu(&self) -> Duration {
89        Duration::from_millis(self.mu as u64)
90    }
91
92    /// Standard diviation
93    pub fn sigma(&self) -> Duration {
94        Duration::from_millis(self.sigma as u64)
95    }
96
97    /// Calculate integral [x, inf]
98    /// This is a monotonically decreasing function.
99    fn integral(&self, x: f64) -> f64 {
100        // Any small sigma rounds up to 1ms which doesn't affect the behavior
101        // because it is enough small as network latency.
102        let sigma = if self.sigma < 1. { 1. } else { self.sigma };
103        let y = (x - self.mu) / sigma;
104        let e = f64::exp(-y * (1.5976 + 0.070566 * y * y));
105        let out = if x > self.mu {
106            e / (1. + e)
107        } else {
108            1. - 1. / (1. + e)
109        };
110
111        assert!(0. <= out && out <= 1.);
112        out
113    }
114
115    /// Calculate the phi from the current normal distribution
116    /// and the duration from the last ping.
117    pub fn phi(&self, elapsed: Duration) -> f64 {
118        let x = elapsed.as_millis() as f64;
119        let y = self.integral(x);
120        phi_from_prob(y)
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127
128    use std::time::Instant;
129
130    #[tokio::test]
131    async fn test_phi_detector() {
132        let mut window = PingWindow::new(Duration::from_secs(5));
133        for _ in 0..100 {
134            window.add_ping(Duration::from_millis(100));
135        }
136        let last_ping = Instant::now();
137        loop {
138            let dist = window.normal_dist();
139
140            let phi = dist.phi(Instant::now() - last_ping);
141            dbg!(phi);
142            if phi > 10. {
143                break;
144            }
145            tokio::time::sleep(Duration::from_millis(10)).await;
146        }
147    }
148
149    #[test]
150    fn test_values() {
151        let window = PingWindow::new(Duration::from_secs(5));
152        let dist = window.normal_dist();
153        dbg!(dist.mu());
154        dbg!(dist.sigma());
155    }
156
157    use proptest::prelude::*;
158    proptest! {
159        #![proptest_config(ProptestConfig::with_cases(100000))]
160        #[test]
161        fn test_integral(mu: f64, sigma: f64, x: f64) {
162            let dist = NormalDist { mu, sigma };
163            let y = dist.integral(x);
164            assert!(0. <= y && y <= 1.);
165        }
166    }
167}