phi_accrual_detector/
lib.rs

1//!
2//! This is a pluggable implementation of the Phi Accrual Failure Detector.
3//!
4//! The simplest implementation to use it in your system has been shown in the examples/monitor.rs
5//!
6//! ```rust
7//!use phi_accrual_detector::{Detector};
8//!use async_trait::async_trait;
9//!use std::sync::{Arc};
10//!use chrono::{DateTime, Local};
11//!
12//!struct Monitor {
13//!    detector: Arc<Detector>,
14//!}
15//!
16//!#[async_trait]
17//!trait MonitorInteraction {
18//!    // For inserting heartbeat arrival time
19//!    async fn ping(&self);
20//!    // For calculating suspicion level
21//!    async fn suspicion(&self) -> f64;
22//!}
23//!
24//!#[async_trait]
25//!impl MonitorInteraction for Monitor {
26//!     async fn ping(&self) {
27//!         let current_time = Local::now();
28//!         self.detector.insert(current_time).await.expect("Some panic occurred");
29//!    }
30//!
31//!    async fn suspicion(&self) -> f64 {
32//!        let current_time = Local::now();
33//!        let last_arrived_at = self.detector.last_arrived_at().await.expect("Some panic occurred");
34//!        // you can determine an acceptable threshold (ex:0.5) for phi after which you can take action.
35//!        let phi = self.detector.phi(current_time).await.unwrap();
36//!        phi
37//!    }
38//!}
39//!
40//! fn main() {
41//!   let detector = Arc::new(Detector::new(1000));
42//!   let monitor = Monitor { detector: Arc::clone(&detector) };
43//! }
44//! ```
45//!
46//! The above example gives you an implementation of a Monitor struct which can be used to interact
47//! with the Detector struct. However, if you want to give your process some leeway to recover from
48//! a failure or account in the network latencies, you can set an acceptable pause duration during
49//! which the detector will not raise suspicion. You can tweak the detector in the above example like
50//! this:
51//!
52//! ```rust
53//! use phi_accrual_detector::{Detector};
54//! use async_trait::async_trait;
55//! use std::sync::{Arc};
56//! use chrono::{TimeDelta};
57//!
58//! struct Monitor {
59//!    detector: Arc<Detector>,
60//! }
61//!
62//! // implementation and traits remain the same.
63//!
64//! fn main() {
65//!   let detector = Arc::new(Detector::with_acceptable_pause(1000, TimeDelta::milliseconds(1000)));
66//!   let monitor = Monitor { detector: Arc::clone(&detector) };
67//! }
68//! ```
69//!
70use std::error::Error;
71use std::ops::Sub;
72use std::sync::{Arc};
73use tokio::sync::{RwLock, RwLockReadGuard};
74use async_trait::async_trait;
75use libm::{erf, log10};
76use chrono::{DateTime, Local, TimeDelta};
77
78/// Statistics of last window_length intervals
79#[derive(Clone, Debug)]
80pub struct Statistics {
81    arrival_intervals: Vec<u64>,
82    last_arrived_at: DateTime<Local>,
83    window_length: u32,
84    n: u32,
85}
86
87/// Detector meant for abstraction over Statistics
88#[derive(Debug)]
89pub struct Detector {
90    statistics: RwLock<Statistics>,
91    acceptable_pause: TimeDelta,
92}
93
94impl Detector {
95    /// New Detector instance with window_length. Recommended window_length is < 10000
96    pub fn new(window_length: u32) -> Self {
97        Detector {
98            statistics: RwLock::new(Statistics::new(window_length)),
99            acceptable_pause: TimeDelta::milliseconds(0),
100        }
101    }
102
103    /// New Detector instance with acceptable heartbeat pause duration.
104    pub fn with_acceptable_pause(window_length: u32, acceptable_pause: TimeDelta) -> Self {
105        Detector {
106            statistics: RwLock::new(Statistics::new(window_length)),
107            acceptable_pause,
108        }
109    }
110}
111
112impl Statistics {
113    /// New Statistics instance with window_length.
114    pub fn new(window_length: u32) -> Self {
115        Self {
116            arrival_intervals: vec![],
117            last_arrived_at: Local::now(),
118            window_length,
119            n: 0,
120        }
121    }
122
123    /// Insert heartbeat arrival time in window.
124    pub fn insert(&mut self, arrived_at: DateTime<Local>) {
125
126        // insert first element
127        if self.n == 0 {
128            self.last_arrived_at = arrived_at;
129            self.n += 1;
130            return;
131        }
132
133
134        if self.n - 1 == self.window_length {
135            self.arrival_intervals.remove(0);
136            self.n -= 1;
137        }
138        if self.n != 0 {
139            let arrival_interval = arrived_at.sub(self.last_arrived_at).num_milliseconds() as u64;
140            self.arrival_intervals.push(arrival_interval);
141        }
142        self.last_arrived_at = arrived_at;
143        self.n += 1;
144    }
145}
146
147/// PhiCore trait for mean and variance calculation
148#[async_trait]
149trait PhiCore {
150    /// Calculate mean with existing stats.
151    async fn mean_with_stats<'a>(&self, stats: Arc<RwLockReadGuard<'a, Statistics>>) -> Result<f64, Box<dyn Error>>;
152
153    /// Calculate variance and mean with existing stats.
154    async fn variance_and_mean(&self) -> Result<(f64, f64), Box<dyn Error>>;
155}
156
157/// PhiInteraction trait for Detector
158#[async_trait]
159pub trait PhiInteraction {
160    /// Insertion of heartbeat arrival time.
161    async fn insert(&self, arrived_at: DateTime<Local>) -> Result<(), Box<dyn Error>>;
162
163    /// Trait for phi for implementing struct
164    async fn phi(&self, t: DateTime<Local>) -> Result<f64, Box<dyn Error>>;
165
166    /// Last arrival time of heartbeat
167    async fn last_arrived_at(&self) -> Result<DateTime<Local>, Box<dyn Error>>;
168}
169
170/// Implementation of PhiCore for Detector
171#[async_trait]
172impl PhiCore for Detector {
173    async fn mean_with_stats<'a>(&self, stats: Arc<RwLockReadGuard<'a, Statistics>>) -> Result<f64, Box<dyn Error>> {
174        let mut mean: f64 = 0.;
175        let len = &stats.arrival_intervals.len();
176        for v in &stats.arrival_intervals {
177            mean += *v as f64 / *len as f64;
178        }
179        Ok(mean)
180    }
181
182    async fn variance_and_mean(&self) -> Result<(f64, f64), Box<dyn Error>> {
183        let mut variance: f64 = 0.;
184        let stats = Arc::new(self.statistics.read().await);
185        let mu = self.mean_with_stats(Arc::clone(&stats)).await?;
186        let len = &stats.arrival_intervals.len();
187        for v in &stats.arrival_intervals {
188            let val = ((*v as f64 - mu) * (*v as f64 - mu)) / *len as f64;
189            variance += val;
190        }
191        Ok((variance, mu))
192    }
193}
194
195/// Cumulative distribution function for normal distribution
196fn normal_cdf(t: f64, mu: f64, sigma: f64) -> f64 {
197    if sigma == 0. {
198        return if t == mu {
199            1.
200        } else {
201            0.
202        };
203    }
204
205    let z = (t - mu) / sigma;
206    0.5 + 0.5 * (erf(z))
207}
208
209/// Implementation of PhiInteraction for Detector
210#[async_trait]
211impl PhiInteraction for Detector {
212    async fn insert(&self, arrived_at: DateTime<Local>) -> Result<(), Box<dyn Error>> {
213        let mut stats = self.statistics.write().await;
214        stats.insert(arrived_at);
215        Ok(())
216    }
217
218    async fn phi(&self, t: DateTime<Local>) -> Result<f64, Box<dyn Error>> {
219        let (sigma_sq, mu) = self.variance_and_mean().await?;
220        let sigma = sigma_sq.sqrt();
221        let last_arrived_at = self.last_arrived_at().await?;
222        let time_diff = t.sub(last_arrived_at).sub(self.acceptable_pause);
223        let ft = normal_cdf(time_diff.num_milliseconds() as f64, mu, sigma);
224        let phi = -log10(1. - ft);
225        Ok(phi)
226    }
227
228    async fn last_arrived_at(&self) -> Result<DateTime<Local>, Box<dyn Error>> {
229        Ok(self.statistics.read().await.last_arrived_at)
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use std::ops::Add;
236    use chrono::{Duration, Local, TimeDelta};
237    use tokio::sync::RwLock;
238    use crate::{Detector, PhiCore, PhiInteraction, Statistics};
239
240    #[tokio::test]
241    async fn test_variant_mean_and_variance_combo_calculation() {
242        let mut stats = Statistics::new(10);
243        let mut i = 0;
244        let mut curr_time = Local::now();
245        &stats.insert(curr_time.clone());
246        let expect_vals = [1630, 4421, 1514, 216, 231, 931, 4182, 102, 104, 241, 5132];
247        while i < expect_vals.len() {
248            curr_time = curr_time.add(Duration::milliseconds(expect_vals[i]));
249            let arrived_at = curr_time;
250            &stats.insert(arrived_at);
251            i += 1;
252        }
253        let detector = Detector {
254            statistics: RwLock::new(stats),
255            acceptable_pause: TimeDelta::milliseconds(0),
256        };
257        let (mut variance, mut mean) = detector.variance_and_mean().await.unwrap();
258        mean = (mean * 100.0).round() * 0.01;
259        variance = (variance * 100.0).round() * 0.01;
260        assert_eq!(1707.4, mean);
261        assert_eq!(3755791.64, variance);
262
263        let mut suspicion_level: Vec<f64> = vec![];
264        for i in 1..10 {
265            curr_time = curr_time.add(Duration::milliseconds(250));
266            suspicion_level.push(detector.phi(curr_time).await.unwrap())
267        }
268        println!("suspicion -> {:?}", suspicion_level);
269        for i in 1..suspicion_level.len() {
270            assert!(suspicion_level[i] > suspicion_level[i - 1]);
271        }
272    }
273
274    #[tokio::test]
275    async fn test_constant_phi_with_constant_pings_calculation() {
276        let stats = Statistics::new(10);
277        let detector = Detector {
278            statistics: RwLock::new(stats),
279            acceptable_pause: TimeDelta::milliseconds(0),
280        };
281        let mut i = 0;
282        let mut curr_time = Local::now();
283        while i <= 100 {
284            let arrived_at = curr_time;
285            &detector.insert(arrived_at).await;
286            curr_time = curr_time.add(Duration::milliseconds(10));
287            i += 10;
288        }
289        let (mut variance, mut mean) = detector.variance_and_mean().await.unwrap();
290        mean = (mean * 100.0).round() * 0.01;
291        variance = (variance * 100.0).round() * 0.01;
292        assert_eq!(10., mean);
293        assert_eq!(0., variance);
294        curr_time = curr_time.add(Duration::milliseconds(10));
295        assert_eq!(0., detector.phi(curr_time).await.unwrap());
296    }
297}