phi_accrual_failure_detector/
lib.rs1use std::{
2 cell::RefCell,
3 marker::PhantomData,
4 sync::RwLock,
5 time::{Duration, Instant},
6};
7
8#[derive(Debug, thiserror::Error)]
9pub enum Error {
10 #[error("Threshold must be > 0")]
11 Threshold,
12
13 #[error("Max sample size must be > 0")]
14 MaxSampleSize,
15
16 #[error("Min standard deviation must be > 0")]
17 MinStdDeviation,
18
19 #[error("First heartbeat estimate must be > 0")]
20 FirstHeartbeatEstimate,
21}
22
23pub type UnsyncDetector = FailureDetector<UnsyncState<DefaultClock>>;
25
26pub type SyncDetector = FailureDetector<SyncState<DefaultClock>>;
28
29pub struct Builder<S: sealed::State> {
31 config: Config,
32 clock: S::Clock,
33 _marker: PhantomData<S>,
34}
35
36impl<S: sealed::State<Clock = DefaultClock>> Builder<S> {
37 pub fn new() -> Self {
38 Self {
39 config: Default::default(),
40 clock: DefaultClock,
41 _marker: PhantomData,
42 }
43 }
44}
45
46impl Default for Builder<UnsyncState<DefaultClock>> {
47 fn default() -> Self {
48 Self::new()
49 }
50}
51
52impl<S: sealed::State> Builder<S> {
53 pub fn threshold(mut self, threshold: f64) -> Self {
62 self.config.threshold = threshold;
63 self
64 }
65
66 pub fn max_sample_size(mut self, max_sample_size: usize) -> Self {
71 self.config.max_sample_size = max_sample_size;
72 self
73 }
74
75 pub fn min_std_deviation(mut self, min_std_deviation: Duration) -> Self {
82 self.config.min_std_deviation = min_std_deviation;
83 self
84 }
85
86 pub fn acceptable_heartbeat_pause(mut self, acceptable_heartbeat_pause: Duration) -> Self {
94 self.config.acceptable_heartbeat_pause = acceptable_heartbeat_pause;
95 self
96 }
97
98 pub fn first_heartbeat_estimate(mut self, first_heartbeat_estimate: Duration) -> Self {
104 self.config.first_heartbeat_estimate = first_heartbeat_estimate;
105 self
106 }
107
108 pub fn sync(self) -> Builder<SyncState<S::Clock>> {
110 self.state::<SyncState<S::Clock>>()
111 }
112
113 pub fn unsync(self) -> Builder<UnsyncState<S::Clock>> {
116 self.state::<UnsyncState<S::Clock>>()
117 }
118
119 pub fn clock<T: Clock>(self, clock: T) -> Builder<S::WithClock<T>> {
123 Builder {
124 config: self.config,
125 clock,
126 _marker: PhantomData,
127 }
128 }
129
130 pub fn build(self) -> Result<FailureDetector<S>, Error> {
134 let config = self.config;
135
136 if config.threshold <= 0. {
137 return Err(Error::Threshold);
138 }
139
140 if config.max_sample_size == 0 {
141 return Err(Error::MaxSampleSize);
142 }
143
144 if config.min_std_deviation.is_zero() {
145 return Err(Error::MinStdDeviation);
146 }
147
148 if config.first_heartbeat_estimate.is_zero() {
149 return Err(Error::FirstHeartbeatEstimate);
150 }
151
152 let mean = config.first_heartbeat_estimate.as_millis() as f64;
153 let std_deviation = mean / 4.;
154
155 let threshold = config.threshold;
156 let acceptable_heartbeat_pause = config.acceptable_heartbeat_pause.as_millis() as f64;
157 let min_std_deviation = config.min_std_deviation.as_millis() as f64;
158
159 let mut history = HeartbeatHistory::new(config.max_sample_size);
160 history.add(mean - std_deviation);
161 history.add(mean + std_deviation);
162
163 let state = DetectorState {
164 threshold,
165 acceptable_heartbeat_pause,
166 min_std_deviation,
167 history,
168 last_timestamp: None,
169 };
170
171 Ok(FailureDetector {
172 state: state.into(),
173 clock: self.clock,
174 })
175 }
176
177 fn state<T: sealed::State<Clock = S::Clock>>(self) -> Builder<T> {
178 Builder {
179 config: self.config,
180 clock: self.clock,
181 _marker: PhantomData,
182 }
183 }
184}
185
186struct Config {
187 threshold: f64,
188 max_sample_size: usize,
189 min_std_deviation: Duration,
190 acceptable_heartbeat_pause: Duration,
191 first_heartbeat_estimate: Duration,
192}
193
194impl Default for Config {
195 fn default() -> Self {
196 Self {
197 threshold: 8.0,
198 max_sample_size: 100,
199 min_std_deviation: Duration::from_millis(100),
200 acceptable_heartbeat_pause: Duration::from_secs(3),
201 first_heartbeat_estimate: Duration::from_secs(1),
202 }
203 }
204}
205
206struct DetectorState<C: Clock> {
207 threshold: f64,
208 acceptable_heartbeat_pause: f64,
209 min_std_deviation: f64,
210 history: HeartbeatHistory,
211 last_timestamp: Option<C::Timestamp>,
212}
213
214impl<C: Clock> DetectorState<C> {
215 fn heartbeat(&mut self, timestamp: C::Timestamp) {
216 if let (Some(last_timestamp), true) = (
217 &self.last_timestamp,
218 self.is_available_for_timestamp(×tamp),
219 ) {
220 self.history.add(C::elapsed_ms(last_timestamp, ×tamp));
221 }
222
223 self.last_timestamp = Some(timestamp);
224 }
225
226 fn is_available_for_timestamp(&self, timestamp: &C::Timestamp) -> bool {
227 self.phi_for_timestamp(timestamp) < self.threshold
228 }
229
230 fn phi_for_timestamp(&self, timestamp: &C::Timestamp) -> f64 {
231 let Some(last_timestamp) = &self.last_timestamp else {
232 return 0.0;
234 };
235
236 let time_diff = C::elapsed_ms(last_timestamp, timestamp);
237 let mean = self.history.mean() + self.acceptable_heartbeat_pause;
238 let std_deviation = self.history.std_deviation().max(self.min_std_deviation);
239
240 let y = (time_diff - mean) / std_deviation;
241 let e = (-y * (1.5976 + 0.070566 * y * y)).exp();
242
243 if time_diff > mean {
244 -(e / (1.0 + e)).log10()
245 } else {
246 -(1.0 - 1.0 / (1.0 + e)).log10()
247 }
248 }
249}
250
251pub struct FailureDetector<S: sealed::State> {
264 state: S,
265 clock: S::Clock,
266}
267
268impl<S: sealed::State<Clock = DefaultClock>> FailureDetector<S> {
269 pub fn builder() -> Builder<S> {
270 Builder::new()
271 }
272}
273
274impl<S: sealed::State<Clock = DefaultClock>> Default for FailureDetector<S> {
275 fn default() -> Self {
276 Self::builder().build().unwrap()
278 }
279}
280
281pub trait Detector {
282 fn heartbeat(&self);
285
286 fn phi(&self) -> f64;
291
292 fn is_available(&self) -> bool;
295
296 fn is_monitoring(&self) -> bool;
299}
300
301pub struct UnsyncState<C: Clock>(RefCell<DetectorState<C>>);
304
305impl<C: Clock> sealed::State for UnsyncState<C> {
306 type Clock = C;
307 type WithClock<T: Clock> = UnsyncState<T>;
308}
309
310impl<C: Clock> From<DetectorState<C>> for UnsyncState<C> {
311 fn from(inner: DetectorState<C>) -> Self {
312 Self(RefCell::new(inner))
313 }
314}
315
316impl<C: Clock> Detector for FailureDetector<UnsyncState<C>> {
317 fn heartbeat(&self) {
318 self.state.0.borrow_mut().heartbeat(self.clock.timestamp());
319 }
320
321 fn phi(&self) -> f64 {
322 self.state
323 .0
324 .borrow()
325 .phi_for_timestamp(&self.clock.timestamp())
326 }
327
328 fn is_available(&self) -> bool {
329 self.state
330 .0
331 .borrow()
332 .is_available_for_timestamp(&self.clock.timestamp())
333 }
334
335 fn is_monitoring(&self) -> bool {
336 self.state.0.borrow().last_timestamp.is_some()
337 }
338}
339
340pub struct SyncState<C: Clock>(RwLock<DetectorState<C>>);
343
344impl<C: Clock> sealed::State for SyncState<C> {
345 type Clock = C;
346 type WithClock<T: Clock> = SyncState<T>;
347}
348
349impl<C: Clock> From<DetectorState<C>> for SyncState<C> {
350 fn from(inner: DetectorState<C>) -> Self {
351 Self(RwLock::new(inner))
352 }
353}
354
355impl<C: Clock> Detector for FailureDetector<SyncState<C>> {
356 fn heartbeat(&self) {
357 self.state
358 .0
359 .write()
360 .unwrap()
361 .heartbeat(self.clock.timestamp());
362 }
363
364 fn phi(&self) -> f64 {
365 self.state
366 .0
367 .read()
368 .unwrap()
369 .phi_for_timestamp(&self.clock.timestamp())
370 }
371
372 fn is_available(&self) -> bool {
373 self.state
374 .0
375 .read()
376 .unwrap()
377 .is_available_for_timestamp(&self.clock.timestamp())
378 }
379
380 fn is_monitoring(&self) -> bool {
381 self.state.0.read().unwrap().last_timestamp.is_some()
382 }
383}
384
385mod sealed {
386 use super::*;
387
388 #[allow(private_bounds)]
389 pub trait State: From<DetectorState<Self::Clock>> {
390 type Clock: Clock;
391 type WithClock<T: Clock>: State<Clock = T>;
392 }
393}
394
395pub trait Clock {
396 type Timestamp;
397
398 fn timestamp(&self) -> Self::Timestamp;
400
401 fn elapsed(before: &Self::Timestamp, after: &Self::Timestamp) -> Duration;
403
404 fn elapsed_ms(before: &Self::Timestamp, after: &Self::Timestamp) -> f64 {
405 Self::elapsed(before, after).as_millis() as f64
406 }
407}
408
409pub struct DefaultClock;
411
412impl Clock for DefaultClock {
413 type Timestamp = Instant;
414
415 fn timestamp(&self) -> Self::Timestamp {
416 Instant::now()
417 }
418
419 fn elapsed(before: &Self::Timestamp, after: &Self::Timestamp) -> Duration {
420 if before > after {
421 Duration::ZERO
422 } else {
423 after.duration_since(*before)
424 }
425 }
426}
427
428struct HeartbeatHistory {
434 intervals: RingBuffer<f64>,
435 interval_sum: f64,
436 squared_interval_sum: f64,
437}
438
439impl HeartbeatHistory {
440 fn new(max_sample_size: usize) -> Self {
441 assert!(max_sample_size > 0);
442
443 Self {
444 intervals: RingBuffer::new(max_sample_size),
445 interval_sum: 0.,
446 squared_interval_sum: 0.,
447 }
448 }
449
450 fn mean(&self) -> f64 {
451 self.interval_sum / self.intervals.len() as f64
452 }
453
454 fn variance(&self) -> f64 {
455 self.squared_interval_sum / self.intervals.len() as f64 - pow2(self.mean())
456 }
457
458 fn std_deviation(&self) -> f64 {
459 self.variance().sqrt()
460 }
461
462 fn add(&mut self, interval: f64) {
463 self.interval_sum += interval;
464 self.squared_interval_sum += pow2(interval);
465
466 if let Some(oldest) = self.intervals.push(interval) {
467 self.interval_sum -= oldest;
468 self.squared_interval_sum -= pow2(oldest);
469 }
470 }
471}
472
473#[inline]
474fn pow2(x: f64) -> f64 {
475 x * x
476}
477
478#[derive(Clone)]
481struct RingBuffer<T> {
482 data: Vec<T>,
483 capacity: usize,
484 cursor: usize,
485}
486
487impl<T> RingBuffer<T> {
488 fn new(capacity: usize) -> Self {
489 assert!(capacity > 0);
490 Self {
491 data: Vec::with_capacity(capacity),
492 capacity,
493 cursor: 0,
494 }
495 }
496
497 fn push(&mut self, item: T) -> Option<T> {
498 self.cursor += 1;
499
500 if self.data.len() < self.capacity {
501 self.data.push(item);
502
503 None
504 } else {
505 let oldest_idx = (self.cursor - 1) % self.capacity;
506
507 Some(std::mem::replace(&mut self.data[oldest_idx], item))
508 }
509 }
510
511 fn len(&self) -> usize {
512 self.cursor
513 }
514}
515
516#[cfg(test)]
517mod tests {
518 use super::*;
519
520 fn validate_history(history: &HeartbeatHistory, intervals: &[f64]) {
521 let (sum, sum_squared) = intervals
522 .iter()
523 .fold((0.0, 0.0), |(sum, sum_squared), interval| {
524 (sum + interval, sum_squared + interval * interval)
525 });
526
527 assert_eq!(history.interval_sum, sum);
528 assert_eq!(history.squared_interval_sum, sum_squared);
529 assert_eq!(history.mean(), sum / history.intervals.len() as f64);
530 }
531
532 #[test]
533 fn heartbeat_history() {
534 let sample_size = 30;
535 let intervals = (1..100).map(|i| i as f64).collect::<Vec<_>>();
536 let mut history = HeartbeatHistory::new(sample_size);
537
538 for (idx, interval) in intervals.iter().enumerate() {
539 let end_idx = idx + 1;
540 let start_idx = end_idx.max(sample_size) - sample_size;
541
542 history.add(*interval);
543 validate_history(&history, &intervals[start_idx..end_idx]);
544 }
545 }
546
547 #[test]
548 fn ring_buffer() {
549 let mut buf = RingBuffer::new(3);
550
551 assert_eq!(buf.len(), 0);
552 assert_eq!(buf.push(1), None);
553 assert_eq!(buf.len(), 1);
554 assert_eq!(buf.push(2), None);
555 assert_eq!(buf.len(), 2);
556 assert_eq!(buf.push(3), None);
557 assert_eq!(buf.len(), 3);
558 assert_eq!(buf.push(4), Some(1));
559 assert_eq!(buf.len(), 4);
560 assert_eq!(buf.push(5), Some(2));
561 assert_eq!(buf.len(), 5);
562 assert_eq!(buf.push(6), Some(3));
563 assert_eq!(buf.len(), 6);
564 assert_eq!(buf.push(7), Some(4));
565 assert_eq!(buf.len(), 7);
566 }
567
568 fn ensure_sync<T: Sync>() {}
569
570 #[test]
571 fn ensure_bounds() {
572 ensure_sync::<SyncDetector>();
573 let _: SyncDetector = UnsyncDetector::builder().sync().build().unwrap();
574 let _: UnsyncDetector = SyncDetector::builder().unsync().build().unwrap();
575 }
576}