Skip to main content

phi_accrual_failure_detector/
lib.rs

1use std::{
2    cell::RefCell,
3    marker::PhantomData,
4    sync::RwLock,
5    time::{Duration, Instant},
6};
7
8#[derive(Debug, thiserror::Error)]
9pub enum Error {
10    #[error("Threshold must be > 0")]
11    Threshold,
12
13    #[error("Max sample size must be > 0")]
14    MaxSampleSize,
15
16    #[error("Min standard deviation must be > 0")]
17    MinStdDeviation,
18
19    #[error("First heartbeat estimate must be > 0")]
20    FirstHeartbeatEstimate,
21}
22
23/// [`FailureDetector`] for single-threaded environments.
24pub type UnsyncDetector = FailureDetector<UnsyncState<DefaultClock>>;
25
26/// [`FailureDetector`] for multi-threaded environments.
27pub type SyncDetector = FailureDetector<SyncState<DefaultClock>>;
28
29/// [`FailureDetector`] builder.
30pub struct Builder<S: sealed::State> {
31    config: Config,
32    clock: S::Clock,
33    _marker: PhantomData<S>,
34}
35
36impl<S: sealed::State<Clock = DefaultClock>> Builder<S> {
37    pub fn new() -> Self {
38        Self {
39            config: Default::default(),
40            clock: DefaultClock,
41            _marker: PhantomData,
42        }
43    }
44}
45
46impl Default for Builder<UnsyncState<DefaultClock>> {
47    fn default() -> Self {
48        Self::new()
49    }
50}
51
52impl<S: sealed::State> Builder<S> {
53    /// Threshold for considering the monitored resource unavailable.
54    ///
55    /// A low threshold is prone to generate many wrong suspicions but ensures a
56    /// quick detection in the event of a real crash. Conversely, a high
57    /// threshold generates fewer mistakes but needs more time to detect actual
58    /// crashes.
59    ///
60    /// Default: 8.0
61    pub fn threshold(mut self, threshold: f64) -> Self {
62        self.config.threshold = threshold;
63        self
64    }
65
66    /// Number of samples to use for calculation of mean and standard deviation
67    /// of inter-arrival times.
68    ///
69    /// Default: 100
70    pub fn max_sample_size(mut self, max_sample_size: usize) -> Self {
71        self.config.max_sample_size = max_sample_size;
72        self
73    }
74
75    /// Minimum standard deviation to use for the normal distribution used when
76    /// calculating phi. Too low standard deviation might result in too much
77    /// sensitivity for sudden, but normal, deviations in heartbeat inter
78    /// arrival times.
79    ///
80    /// Default: 100ms
81    pub fn min_std_deviation(mut self, min_std_deviation: Duration) -> Self {
82        self.config.min_std_deviation = min_std_deviation;
83        self
84    }
85
86    /// Duration corresponding to number of potentially lost/delayed heartbeats
87    /// that will be accepted before considering it to be an anomaly. This
88    /// margin is important to be able to survive sudden, occasional, pauses in
89    /// heartbeat   arrivals, due to for example garbage collect or network
90    /// drop.
91    ///
92    /// Default: 3s
93    pub fn acceptable_heartbeat_pause(mut self, acceptable_heartbeat_pause: Duration) -> Self {
94        self.config.acceptable_heartbeat_pause = acceptable_heartbeat_pause;
95        self
96    }
97
98    /// Bootstrap the stats with heartbeats that corresponds to to this
99    /// duration, with a with rather high standard deviation (since environment
100    /// is unknown in the beginning).
101    ///
102    /// Default: 1s
103    pub fn first_heartbeat_estimate(mut self, first_heartbeat_estimate: Duration) -> Self {
104        self.config.first_heartbeat_estimate = first_heartbeat_estimate;
105        self
106    }
107
108    /// Use [`RwLock`] internally to make the detector [`Sync`].
109    pub fn sync(self) -> Builder<SyncState<S::Clock>> {
110        self.state::<SyncState<S::Clock>>()
111    }
112
113    /// Use [`RefCell`] internally instead of [`RwLock`] for slightly better
114    /// performance.
115    pub fn unsync(self) -> Builder<UnsyncState<S::Clock>> {
116        self.state::<UnsyncState<S::Clock>>()
117    }
118
119    /// Provide an alternative implementation of [`Clock`].
120    ///
121    /// Default: [`DefaultClock`]
122    pub fn clock<T: Clock>(self, clock: T) -> Builder<S::WithClock<T>> {
123        Builder {
124            config: self.config,
125            clock,
126            _marker: PhantomData,
127        }
128    }
129
130    /// Builds an instance of [`Detector`].
131    ///
132    /// Returns an [`Error`] if some configuration parameters are incorrect.
133    pub fn build(self) -> Result<FailureDetector<S>, Error> {
134        let config = self.config;
135
136        if config.threshold <= 0. {
137            return Err(Error::Threshold);
138        }
139
140        if config.max_sample_size == 0 {
141            return Err(Error::MaxSampleSize);
142        }
143
144        if config.min_std_deviation.is_zero() {
145            return Err(Error::MinStdDeviation);
146        }
147
148        if config.first_heartbeat_estimate.is_zero() {
149            return Err(Error::FirstHeartbeatEstimate);
150        }
151
152        let mean = config.first_heartbeat_estimate.as_millis() as f64;
153        let std_deviation = mean / 4.;
154
155        let threshold = config.threshold;
156        let acceptable_heartbeat_pause = config.acceptable_heartbeat_pause.as_millis() as f64;
157        let min_std_deviation = config.min_std_deviation.as_millis() as f64;
158
159        let mut history = HeartbeatHistory::new(config.max_sample_size);
160        history.add(mean - std_deviation);
161        history.add(mean + std_deviation);
162
163        let state = DetectorState {
164            threshold,
165            acceptable_heartbeat_pause,
166            min_std_deviation,
167            history,
168            last_timestamp: None,
169        };
170
171        Ok(FailureDetector {
172            state: state.into(),
173            clock: self.clock,
174        })
175    }
176
177    fn state<T: sealed::State<Clock = S::Clock>>(self) -> Builder<T> {
178        Builder {
179            config: self.config,
180            clock: self.clock,
181            _marker: PhantomData,
182        }
183    }
184}
185
186struct Config {
187    threshold: f64,
188    max_sample_size: usize,
189    min_std_deviation: Duration,
190    acceptable_heartbeat_pause: Duration,
191    first_heartbeat_estimate: Duration,
192}
193
194impl Default for Config {
195    fn default() -> Self {
196        Self {
197            threshold: 8.0,
198            max_sample_size: 100,
199            min_std_deviation: Duration::from_millis(100),
200            acceptable_heartbeat_pause: Duration::from_secs(3),
201            first_heartbeat_estimate: Duration::from_secs(1),
202        }
203    }
204}
205
206struct DetectorState<C: Clock> {
207    threshold: f64,
208    acceptable_heartbeat_pause: f64,
209    min_std_deviation: f64,
210    history: HeartbeatHistory,
211    last_timestamp: Option<C::Timestamp>,
212}
213
214impl<C: Clock> DetectorState<C> {
215    fn heartbeat(&mut self, timestamp: C::Timestamp) {
216        if let (Some(last_timestamp), true) = (
217            &self.last_timestamp,
218            self.is_available_for_timestamp(&timestamp),
219        ) {
220            self.history.add(C::elapsed_ms(last_timestamp, &timestamp));
221        }
222
223        self.last_timestamp = Some(timestamp);
224    }
225
226    fn is_available_for_timestamp(&self, timestamp: &C::Timestamp) -> bool {
227        self.phi_for_timestamp(timestamp) < self.threshold
228    }
229
230    fn phi_for_timestamp(&self, timestamp: &C::Timestamp) -> f64 {
231        let Some(last_timestamp) = &self.last_timestamp else {
232            // No heartbeats received yet.
233            return 0.0;
234        };
235
236        let time_diff = C::elapsed_ms(last_timestamp, timestamp);
237        let mean = self.history.mean() + self.acceptable_heartbeat_pause;
238        let std_deviation = self.history.std_deviation().max(self.min_std_deviation);
239
240        let y = (time_diff - mean) / std_deviation;
241        let e = (-y * (1.5976 + 0.070566 * y * y)).exp();
242
243        if time_diff > mean {
244            -(e / (1.0 + e)).log10()
245        } else {
246            -(1.0 - 1.0 / (1.0 + e)).log10()
247        }
248    }
249}
250
251/// Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al.
252/// as defined in their paper: <https://oneofus.la/have-emacs-will-hack/files/HDY04.pdf>
253///
254/// The suspicion level of failure is given by a value called φ (phi). The basic
255/// idea of the φ failure detector is to express the value of φ on a scale that
256/// is dynamically adjusted to reflect current network conditions. A
257/// configurable threshold is used to decide if φ is considered to be a failure.
258///
259/// The value of φ is calculated as: `φ = -log10(1 - F(timeSinceLastHeartbeat)`
260/// where `F` is the cumulative distribution function of a normal distribution
261/// with mean and standard deviation estimated from historical heartbeat
262/// inter-arrival times.
263pub struct FailureDetector<S: sealed::State> {
264    state: S,
265    clock: S::Clock,
266}
267
268impl<S: sealed::State<Clock = DefaultClock>> FailureDetector<S> {
269    pub fn builder() -> Builder<S> {
270        Builder::new()
271    }
272}
273
274impl<S: sealed::State<Clock = DefaultClock>> Default for FailureDetector<S> {
275    fn default() -> Self {
276        // Safe unwrap with default parameters.
277        Self::builder().build().unwrap()
278    }
279}
280
281pub trait Detector {
282    /// Notifies the detector that a heartbeat arrived from the monitored
283    /// resource. This causes the detector to update its state.
284    fn heartbeat(&self);
285
286    /// The suspicion level of the accrual failure detector.
287    ///
288    /// If a connection does not have any records in failure detector then it is
289    /// considered healthy.
290    fn phi(&self) -> f64;
291
292    /// Returns `true` if the resource is considered to be up and healthy and
293    /// returns `false` otherwise.
294    fn is_available(&self) -> bool;
295
296    /// Returns `true` if the failure detector has received any heartbeats and
297    /// started monitoring of the resource.
298    fn is_monitoring(&self) -> bool;
299}
300
301/// A [`FailureDetector`] state wrapper based on [`RefCell`] for single-threaded
302/// access.
303pub struct UnsyncState<C: Clock>(RefCell<DetectorState<C>>);
304
305impl<C: Clock> sealed::State for UnsyncState<C> {
306    type Clock = C;
307    type WithClock<T: Clock> = UnsyncState<T>;
308}
309
310impl<C: Clock> From<DetectorState<C>> for UnsyncState<C> {
311    fn from(inner: DetectorState<C>) -> Self {
312        Self(RefCell::new(inner))
313    }
314}
315
316impl<C: Clock> Detector for FailureDetector<UnsyncState<C>> {
317    fn heartbeat(&self) {
318        self.state.0.borrow_mut().heartbeat(self.clock.timestamp());
319    }
320
321    fn phi(&self) -> f64 {
322        self.state
323            .0
324            .borrow()
325            .phi_for_timestamp(&self.clock.timestamp())
326    }
327
328    fn is_available(&self) -> bool {
329        self.state
330            .0
331            .borrow()
332            .is_available_for_timestamp(&self.clock.timestamp())
333    }
334
335    fn is_monitoring(&self) -> bool {
336        self.state.0.borrow().last_timestamp.is_some()
337    }
338}
339
340/// A [`FailureDetector`] state wrapper based on [`RwLock`] for multi-threaded
341/// access.
342pub struct SyncState<C: Clock>(RwLock<DetectorState<C>>);
343
344impl<C: Clock> sealed::State for SyncState<C> {
345    type Clock = C;
346    type WithClock<T: Clock> = SyncState<T>;
347}
348
349impl<C: Clock> From<DetectorState<C>> for SyncState<C> {
350    fn from(inner: DetectorState<C>) -> Self {
351        Self(RwLock::new(inner))
352    }
353}
354
355impl<C: Clock> Detector for FailureDetector<SyncState<C>> {
356    fn heartbeat(&self) {
357        self.state
358            .0
359            .write()
360            .unwrap()
361            .heartbeat(self.clock.timestamp());
362    }
363
364    fn phi(&self) -> f64 {
365        self.state
366            .0
367            .read()
368            .unwrap()
369            .phi_for_timestamp(&self.clock.timestamp())
370    }
371
372    fn is_available(&self) -> bool {
373        self.state
374            .0
375            .read()
376            .unwrap()
377            .is_available_for_timestamp(&self.clock.timestamp())
378    }
379
380    fn is_monitoring(&self) -> bool {
381        self.state.0.read().unwrap().last_timestamp.is_some()
382    }
383}
384
385mod sealed {
386    use super::*;
387
388    #[allow(private_bounds)]
389    pub trait State: From<DetectorState<Self::Clock>> {
390        type Clock: Clock;
391        type WithClock<T: Clock>: State<Clock = T>;
392    }
393}
394
395pub trait Clock {
396    type Timestamp;
397
398    /// Returns current time.
399    fn timestamp(&self) -> Self::Timestamp;
400
401    /// Returns time elapsed between two timestamps.
402    fn elapsed(before: &Self::Timestamp, after: &Self::Timestamp) -> Duration;
403
404    fn elapsed_ms(before: &Self::Timestamp, after: &Self::Timestamp) -> f64 {
405        Self::elapsed(before, after).as_millis() as f64
406    }
407}
408
409/// The default clock implementation based on using [`std::time::Instant`].
410pub struct DefaultClock;
411
412impl Clock for DefaultClock {
413    type Timestamp = Instant;
414
415    fn timestamp(&self) -> Self::Timestamp {
416        Instant::now()
417    }
418
419    fn elapsed(before: &Self::Timestamp, after: &Self::Timestamp) -> Duration {
420        if before > after {
421            Duration::ZERO
422        } else {
423            after.duration_since(*before)
424        }
425    }
426}
427
428/// Holds the heartbeat statistics for a specific node Address. It is capped by
429/// the number of samples specified in `max_sample_size`.
430///
431/// The stats (`mean`, `variance`, `std_deviation`) are not defined for empty
432/// [`HeartbeatHistory`].
433struct HeartbeatHistory {
434    intervals: RingBuffer<f64>,
435    interval_sum: f64,
436    squared_interval_sum: f64,
437}
438
439impl HeartbeatHistory {
440    fn new(max_sample_size: usize) -> Self {
441        assert!(max_sample_size > 0);
442
443        Self {
444            intervals: RingBuffer::new(max_sample_size),
445            interval_sum: 0.,
446            squared_interval_sum: 0.,
447        }
448    }
449
450    fn mean(&self) -> f64 {
451        self.interval_sum / self.intervals.len() as f64
452    }
453
454    fn variance(&self) -> f64 {
455        self.squared_interval_sum / self.intervals.len() as f64 - pow2(self.mean())
456    }
457
458    fn std_deviation(&self) -> f64 {
459        self.variance().sqrt()
460    }
461
462    fn add(&mut self, interval: f64) {
463        self.interval_sum += interval;
464        self.squared_interval_sum += pow2(interval);
465
466        if let Some(oldest) = self.intervals.push(interval) {
467            self.interval_sum -= oldest;
468            self.squared_interval_sum -= pow2(oldest);
469        }
470    }
471}
472
473#[inline]
474fn pow2(x: f64) -> f64 {
475    x * x
476}
477
478/// Simple ring buffer that only allows for pushing values, and returns the
479/// oldest value on overflow.
480#[derive(Clone)]
481struct RingBuffer<T> {
482    data: Vec<T>,
483    capacity: usize,
484    cursor: usize,
485}
486
487impl<T> RingBuffer<T> {
488    fn new(capacity: usize) -> Self {
489        assert!(capacity > 0);
490        Self {
491            data: Vec::with_capacity(capacity),
492            capacity,
493            cursor: 0,
494        }
495    }
496
497    fn push(&mut self, item: T) -> Option<T> {
498        self.cursor += 1;
499
500        if self.data.len() < self.capacity {
501            self.data.push(item);
502
503            None
504        } else {
505            let oldest_idx = (self.cursor - 1) % self.capacity;
506
507            Some(std::mem::replace(&mut self.data[oldest_idx], item))
508        }
509    }
510
511    fn len(&self) -> usize {
512        self.cursor
513    }
514}
515
516#[cfg(test)]
517mod tests {
518    use super::*;
519
520    fn validate_history(history: &HeartbeatHistory, intervals: &[f64]) {
521        let (sum, sum_squared) = intervals
522            .iter()
523            .fold((0.0, 0.0), |(sum, sum_squared), interval| {
524                (sum + interval, sum_squared + interval * interval)
525            });
526
527        assert_eq!(history.interval_sum, sum);
528        assert_eq!(history.squared_interval_sum, sum_squared);
529        assert_eq!(history.mean(), sum / history.intervals.len() as f64);
530    }
531
532    #[test]
533    fn heartbeat_history() {
534        let sample_size = 30;
535        let intervals = (1..100).map(|i| i as f64).collect::<Vec<_>>();
536        let mut history = HeartbeatHistory::new(sample_size);
537
538        for (idx, interval) in intervals.iter().enumerate() {
539            let end_idx = idx + 1;
540            let start_idx = end_idx.max(sample_size) - sample_size;
541
542            history.add(*interval);
543            validate_history(&history, &intervals[start_idx..end_idx]);
544        }
545    }
546
547    #[test]
548    fn ring_buffer() {
549        let mut buf = RingBuffer::new(3);
550
551        assert_eq!(buf.len(), 0);
552        assert_eq!(buf.push(1), None);
553        assert_eq!(buf.len(), 1);
554        assert_eq!(buf.push(2), None);
555        assert_eq!(buf.len(), 2);
556        assert_eq!(buf.push(3), None);
557        assert_eq!(buf.len(), 3);
558        assert_eq!(buf.push(4), Some(1));
559        assert_eq!(buf.len(), 4);
560        assert_eq!(buf.push(5), Some(2));
561        assert_eq!(buf.len(), 5);
562        assert_eq!(buf.push(6), Some(3));
563        assert_eq!(buf.len(), 6);
564        assert_eq!(buf.push(7), Some(4));
565        assert_eq!(buf.len(), 7);
566    }
567
568    fn ensure_sync<T: Sync>() {}
569
570    #[test]
571    fn ensure_bounds() {
572        ensure_sync::<SyncDetector>();
573        let _: SyncDetector = UnsyncDetector::builder().sync().build().unwrap();
574        let _: UnsyncDetector = SyncDetector::builder().unsync().build().unwrap();
575    }
576}