use std::collections::VecDeque;
use std::time::{Duration, Instant};
pub const DEFAULT_WINDOW_SIZE: usize = 100;
pub const DEFAULT_THRESHOLD: f64 = 8.0;
pub const DEFAULT_MIN_MEAN_MS: f64 = 1_000.0;
#[derive(Debug, Clone)]
pub struct PhiAccrual {
intervals: VecDeque<f64>,
window_size: usize,
last_heartbeat: Option<Instant>,
threshold: f64,
min_mean_ms: f64,
}
impl PhiAccrual {
#[must_use]
pub fn new(window_size: usize, threshold: f64) -> Self {
Self {
intervals: VecDeque::with_capacity(window_size.max(1)),
window_size: window_size.max(1),
last_heartbeat: None,
threshold,
min_mean_ms: DEFAULT_MIN_MEAN_MS,
}
}
#[must_use]
pub fn with_min_mean(mut self, min_mean: Duration) -> Self {
self.min_mean_ms = duration_to_ms(min_mean);
self
}
#[must_use]
pub fn sample_count(&self) -> usize {
self.intervals.len()
}
#[must_use]
pub fn threshold(&self) -> f64 {
self.threshold
}
pub fn set_threshold(&mut self, threshold: f64) {
self.threshold = threshold;
}
#[must_use]
pub fn last_heartbeat(&self) -> Option<Instant> {
self.last_heartbeat
}
pub fn record_heartbeat(&mut self, now: Instant) {
if let Some(prev) = self.last_heartbeat {
let dt = now.saturating_duration_since(prev);
let dt_ms = duration_to_ms(dt);
if self.intervals.len() == self.window_size {
self.intervals.pop_front();
}
self.intervals.push_back(dt_ms);
}
self.last_heartbeat = Some(now);
}
pub fn reset(&mut self) {
self.intervals.clear();
self.last_heartbeat = None;
}
#[must_use]
pub fn phi(&self, now: Instant) -> f64 {
let Some(last) = self.last_heartbeat else {
return 0.0;
};
if self.intervals.is_empty() {
return 0.0;
}
let elapsed_ms = duration_to_ms(now.saturating_duration_since(last));
let mean_ms = self.mean_interval_ms();
if mean_ms <= 0.0 {
return 0.0;
}
elapsed_ms / (mean_ms * std::f64::consts::LN_10)
}
#[must_use]
pub fn is_suspect(&self, now: Instant) -> bool {
self.phi(now) > self.threshold
}
#[must_use]
#[allow(
clippy::cast_precision_loss,
reason = "window length is bounded by window_size (default 100), well below f64 mantissa"
)]
pub fn mean_interval_ms(&self) -> f64 {
if self.intervals.is_empty() {
return self.min_mean_ms;
}
let raw = self.intervals.iter().sum::<f64>() / self.intervals.len() as f64;
if raw < self.min_mean_ms {
self.min_mean_ms
} else {
raw
}
}
}
impl Default for PhiAccrual {
fn default() -> Self {
Self::new(DEFAULT_WINDOW_SIZE, DEFAULT_THRESHOLD)
}
}
fn duration_to_ms(d: Duration) -> f64 {
d.as_secs_f64() * 1_000.0
}
#[cfg(test)]
mod tests {
use super::*;
fn t(secs: u64, ms: u64) -> Instant {
static ANCHOR: std::sync::OnceLock<Instant> = std::sync::OnceLock::new();
let a = ANCHOR.get_or_init(Instant::now);
*a + Duration::from_secs(secs) + Duration::from_millis(ms)
}
#[test]
fn no_heartbeat_means_phi_zero() {
let fd = PhiAccrual::default();
assert!(fd.phi(Instant::now()).abs() < f64::EPSILON);
assert!(!fd.is_suspect(Instant::now()));
}
#[test]
fn single_heartbeat_means_phi_zero() {
let mut fd = PhiAccrual::default();
fd.record_heartbeat(t(0, 0));
assert!(fd.phi(t(60, 0)).abs() < f64::EPSILON);
}
#[test]
fn steady_heartbeats_then_silence_raises_phi() {
let mut fd = PhiAccrual::new(20, 8.0).with_min_mean(Duration::from_millis(100));
for i in 0..5 {
fd.record_heartbeat(t(i, 0));
}
assert_eq!(fd.sample_count(), 4);
assert!(fd.phi(t(4, 10)) < 0.5);
let phi30 = fd.phi(t(34, 0));
assert!(
phi30 > 8.0,
"expected phi >> 8 after 30s silence on 1Hz cadence, got {phi30}"
);
assert!(fd.is_suspect(t(34, 0)));
assert!(!fd.is_suspect(t(5, 100)));
}
#[test]
fn high_jitter_relaxes_suspicion() {
let mut steady = PhiAccrual::new(50, 8.0).with_min_mean(Duration::from_millis(50));
let mut jittery = PhiAccrual::new(50, 8.0).with_min_mean(Duration::from_millis(50));
for i in 0..50 {
steady.record_heartbeat(t(0, i * 100));
}
let mut elapsed = 0u64;
for i in 0..50 {
jittery.record_heartbeat(t(0, elapsed));
elapsed += if i % 2 == 0 { 50 } else { 950 };
}
let probe = Duration::from_millis(1_000);
let last_steady = steady.last_heartbeat().unwrap();
let last_jittery = jittery.last_heartbeat().unwrap();
let phi_steady = steady.phi(last_steady + probe);
let phi_jittery = jittery.phi(last_jittery + probe);
assert!(
phi_jittery < phi_steady,
"jittery phi ({phi_jittery}) should be less than steady phi ({phi_steady})"
);
}
#[test]
fn window_eviction_keeps_size_bounded() {
let mut fd = PhiAccrual::new(5, 8.0);
for i in 0..20 {
fd.record_heartbeat(t(i, 0));
}
assert_eq!(fd.sample_count(), 5);
}
#[test]
fn reset_clears_state() {
let mut fd = PhiAccrual::default();
for i in 0..10 {
fd.record_heartbeat(t(i, 0));
}
assert!(fd.sample_count() > 0);
fd.reset();
assert_eq!(fd.sample_count(), 0);
assert!(fd.last_heartbeat().is_none());
assert!(fd.phi(Instant::now()).abs() < f64::EPSILON);
}
#[test]
fn clock_going_backwards_is_handled() {
let mut fd = PhiAccrual::default();
fd.record_heartbeat(t(10, 0));
fd.record_heartbeat(t(5, 0));
assert_eq!(fd.sample_count(), 1);
}
#[test]
fn min_mean_clamp_prevents_runaway_phi() {
let mut fd = PhiAccrual::new(10, 8.0);
for i in 0..10 {
fd.record_heartbeat(t(0, i)); }
let phi_after_1s = fd.phi(t(1, 0));
assert!(
phi_after_1s < 1.0,
"min_mean clamp should hold phi below 1.0 here, got {phi_after_1s}"
);
}
}