phi-detector 0.4.0

An Implementation of Phi Accrual Failure Detector
Documentation
//! This is an implementation of Phi Accrual Failure Detector.
//!
//! To reduce the memory footprint, pings or intervals aren't actually stored
//! but only two values to calculate normal distribution are maintained.
//! This not only reduces the memory footprint to the constant value
//! but also the computational cost for each ping down to constant.
//!
//! Why does the memory footprint matter? Think about your application communicates
//! with thousand of remote servers and you want to maintain failure detector for each server.
//! Apparently, it is too expensive to cost 100MB for a failure detector per server.
//!
//! ```should_panic
//! use phi_detector::PingWindow;
//! use std::time::Duration;
//!
//! // Create a window with a interval 100ms.
//! let mut window = PingWindow::new(Duration::from_millis(100));
//!
//! window.add_ping(Duration::from_millis(150));
//! window.add_ping(Duration::from_millis(80));
//! // Now the window has intervals [100ms, 150ms, 80ms]. Average is 110ms.
//!
//! let normal_dist = window.normal_dist();
//! // If the server is down for 5s, it should be failure.
//! let phi = normal_dist.phi(Duration::from_millis(5000));
//! if phi > 12. {
//!     panic!("The server is down");
//! }
//! ```

use std::time::Duration;

fn phi_from_prob(x: f64) -> f64 {
    -f64::log10(x)
}

/// Set of recent N ping intervals.
pub struct PingWindow {
    n: usize,
    sum: f64,
    sum2: f64,
}
impl PingWindow {
    /// Create a new instance with one reference interval.
    pub fn new(interval: Duration) -> Self {
        let sum = interval.as_millis() as f64;
        Self {
            n: 1,
            sum,
            sum2: 0.,
        }
    }

    /// Add a new ping to the window.
    pub fn add_ping(&mut self, du: Duration) {
        // Window size too large is found meaningless in experiment.
        // not only that, may harm by counting in old values. (e.g. latency change, overflow)
        // the experiment shows the error rate saturate around n=10000.
        if self.n == 10000 {
            self.sum = self.sum / self.n as f64 * (self.n - 1) as f64;
            // suppose each value has equal contribution to the variance.
            self.sum2 = self.sum2 / self.n as f64 * (self.n - 1) as f64;
            self.n -= 1;
        }
        let v = du.as_millis() as f64;
        self.sum += v;
        self.n += 1;
        let mu = self.sum / self.n as f64;
        self.sum2 += (v - mu) * (v - mu);
    }

    /// Make the current normal distribution based on the ping history.
    pub fn normal_dist(&self) -> NormalDist {
        let n = self.n;
        let mu = self.sum / n as f64;
        let sigma = f64::sqrt(self.sum2 / n as f64);
        NormalDist { mu, sigma }
    }
}

/// Normal distribution from the ping history.
pub struct NormalDist {
    mu: f64,
    sigma: f64,
}
impl NormalDist {
    /// Mean
    pub fn mu(&self) -> Duration {
        Duration::from_millis(self.mu as u64)
    }

    /// Standard diviation
    pub fn sigma(&self) -> Duration {
        Duration::from_millis(self.sigma as u64)
    }

    /// Calculate integral [x, inf]
    /// This is a monotonically decreasing function.
    fn integral(&self, x: f64) -> f64 {
        // Any small sigma rounds up to 1ms which doesn't affect the behavior
        // because it is enough small as network latency.
        let sigma = if self.sigma < 1. { 1. } else { self.sigma };
        let y = (x - self.mu) / sigma;
        let e = f64::exp(-y * (1.5976 + 0.070566 * y * y));
        let out = if x > self.mu {
            e / (1. + e)
        } else {
            1. - 1. / (1. + e)
        };

        assert!(0. <= out && out <= 1.);
        out
    }

    /// Calculate the phi from the current normal distribution
    /// and the duration from the last ping.
    pub fn phi(&self, elapsed: Duration) -> f64 {
        let x = elapsed.as_millis() as f64;
        let y = self.integral(x);
        phi_from_prob(y)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    use std::time::Instant;

    #[tokio::test]
    async fn test_phi_detector() {
        let mut window = PingWindow::new(Duration::from_secs(5));
        for _ in 0..100 {
            window.add_ping(Duration::from_millis(100));
        }
        let last_ping = Instant::now();
        loop {
            let dist = window.normal_dist();

            let phi = dist.phi(Instant::now() - last_ping);
            dbg!(phi);
            if phi > 10. {
                break;
            }
            tokio::time::sleep(Duration::from_millis(10)).await;
        }
    }

    #[test]
    fn test_values() {
        let window = PingWindow::new(Duration::from_secs(5));
        let dist = window.normal_dist();
        dbg!(dist.mu());
        dbg!(dist.sigma());
    }

    use proptest::prelude::*;
    proptest! {
        #![proptest_config(ProptestConfig::with_cases(100000))]
        #[test]
        fn test_integral(mu: f64, sigma: f64, x: f64) {
            let dist = NormalDist { mu, sigma };
            let y = dist.integral(x);
            assert!(0. <= y && y <= 1.);
        }
    }
}