use std::collections::VecDeque;
use std::time::{Duration, Instant};
use noxu_sync::RwLock;
fn phi_cdf(x: f64) -> f64 {
let t = 1.0 / (1.0 + 0.2316419 * x.abs());
let poly = t
* (0.319_381_53
+ t * (-0.356_563_78
+ t * (1.781_477_94
+ t * (-1.821_255_978 + t * 1.330_274_429))));
let base = 1.0
- ((-0.5 * x * x).exp()) / (2.0 * std::f64::consts::PI).sqrt() * poly;
if x >= 0.0 { base } else { 1.0 - base }
}
fn p_later(u: f64, mean: f64, std_dev: f64) -> f64 {
if std_dev <= 0.0 {
return 1.0;
}
let z = (u - mean) / std_dev;
1.0 - phi_cdf(z)
}
pub struct PhiAccrualDetector {
samples: RwLock<VecDeque<f64>>,
window_size: usize,
last_heartbeat: RwLock<Option<Instant>>,
threshold: f64,
}
impl PhiAccrualDetector {
pub fn new(threshold: f64, window_size: usize) -> Self {
Self {
samples: RwLock::new(VecDeque::with_capacity(window_size + 1)),
window_size,
last_heartbeat: RwLock::new(None),
threshold,
}
}
pub fn record_heartbeat(&self) {
let now = Instant::now();
let mut last = self.last_heartbeat.write();
if let Some(prev) = *last {
let interval = now.duration_since(prev).as_secs_f64();
let mut samples = self.samples.write();
samples.push_back(interval);
if samples.len() > self.window_size {
samples.pop_front();
}
}
*last = Some(now);
}
pub fn phi(&self) -> f64 {
let last_opt = *self.last_heartbeat.read();
let last = match last_opt {
Some(t) => t,
None => return 0.0,
};
let samples = self.samples.read();
if samples.len() < 2 {
return 0.0;
}
let n = samples.len() as f64;
let mean = samples.iter().sum::<f64>() / n;
let variance =
samples.iter().map(|&x| (x - mean).powi(2)).sum::<f64>() / n;
let std_dev = variance.sqrt();
let elapsed = last.elapsed().as_secs_f64();
let p = p_later(elapsed, mean, std_dev).max(f64::MIN_POSITIVE);
-p.log10()
}
pub fn is_available(&self) -> bool {
self.phi() < self.threshold
}
pub fn threshold(&self) -> f64 {
self.threshold
}
pub fn sample_count(&self) -> usize {
self.samples.read().len()
}
pub fn mean_interval(&self) -> Option<f64> {
let samples = self.samples.read();
if samples.len() < 2 {
return None;
}
Some(samples.iter().sum::<f64>() / samples.len() as f64)
}
pub fn stddev_interval(&self) -> Option<f64> {
let samples = self.samples.read();
if samples.len() < 2 {
return None;
}
let mean = samples.iter().sum::<f64>() / samples.len() as f64;
let variance = samples.iter().map(|x| (x - mean).powi(2)).sum::<f64>()
/ samples.len() as f64;
Some(variance.sqrt())
}
pub fn suggested_phase_timeout(
&self,
k: f64,
fallback: Duration,
) -> Duration {
let mean = match self.mean_interval() {
Some(m) => m,
None => return fallback,
};
let std = self.stddev_interval().unwrap_or(0.0);
let secs = (mean + k * std).clamp(0.05, 5.0);
Duration::from_secs_f64(secs)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
#[test]
fn test_phi_zero_with_no_heartbeats() {
let d = PhiAccrualDetector::new(8.0, 200);
assert_eq!(d.phi(), 0.0);
assert!(d.is_available());
}
#[test]
fn test_phi_zero_with_one_sample() {
let d = PhiAccrualDetector::new(8.0, 200);
d.record_heartbeat();
assert_eq!(d.phi(), 0.0);
d.record_heartbeat();
assert_eq!(d.sample_count(), 1);
assert_eq!(d.phi(), 0.0);
}
#[test]
fn test_phi_available_just_after_heartbeat() {
let d = PhiAccrualDetector::new(8.0, 200);
for _ in 0..10 {
d.record_heartbeat();
thread::sleep(Duration::from_millis(10));
}
d.record_heartbeat();
assert!(d.phi() < 1.0, "phi={}", d.phi());
assert!(d.is_available());
}
#[test]
fn test_phi_rises_without_heartbeat() {
let d = PhiAccrualDetector::new(8.0, 200);
for _ in 0..10 {
d.record_heartbeat();
thread::sleep(Duration::from_millis(10));
}
thread::sleep(Duration::from_millis(200));
let phi = d.phi();
assert!(phi > 1.0, "expected phi > 1.0 after long silence, got {phi}");
}
#[test]
fn test_phi_resets_on_heartbeat() {
let d = PhiAccrualDetector::new(8.0, 200);
for _ in 0..10 {
d.record_heartbeat();
thread::sleep(Duration::from_millis(10));
}
thread::sleep(Duration::from_millis(200));
let phi_before = d.phi();
assert!(phi_before > 1.0);
d.record_heartbeat();
let phi_after = d.phi();
assert!(
phi_after < phi_before,
"phi should drop after heartbeat: before={phi_before} after={phi_after}"
);
}
#[test]
fn test_is_available_below_threshold() {
let d = PhiAccrualDetector::new(8.0, 200);
for _ in 0..10 {
d.record_heartbeat();
thread::sleep(Duration::from_millis(10));
}
d.record_heartbeat(); assert!(d.is_available());
}
#[test]
fn test_suspected_above_threshold() {
let d = PhiAccrualDetector::new(1.0, 200);
for _ in 0..20 {
d.record_heartbeat();
thread::sleep(Duration::from_millis(10));
}
thread::sleep(Duration::from_millis(150));
assert!(
!d.is_available(),
"process should be suspected; phi={}",
d.phi()
);
}
#[test]
fn test_window_size_capped() {
let d = PhiAccrualDetector::new(8.0, 5);
for _ in 0..20 {
d.record_heartbeat();
thread::sleep(Duration::from_millis(1));
}
assert!(d.sample_count() <= 5);
}
#[test]
fn test_threshold_accessor() {
let d = PhiAccrualDetector::new(12.5, 100);
assert_eq!(d.threshold(), 12.5);
}
}