use std::sync::Arc;
use std::time::Duration;
use atomr_core::actor::Address;
use atomr_remote::failure_detector::FailureDetector;
use atomr_remote::{DeadlineFailureDetector, FailureDetectorRegistry, PhiAccrualFailureDetector};
fn fast_phi(threshold: f64, acceptable_pause_ms: u64) -> PhiAccrualFailureDetector {
PhiAccrualFailureDetector::new(
threshold,
200,
Duration::from_millis(1),
Duration::from_millis(acceptable_pause_ms),
Duration::from_millis(1),
)
}
#[test]
fn phi_new_detector_with_no_heartbeats_is_available() {
let d = fast_phi(8.0, 5);
assert!(d.is_available());
assert!(!d.is_monitoring());
assert!(d.since_last_heartbeat().is_none());
assert_eq!(d.phi(), 0.0);
}
#[test]
fn phi_remains_available_within_expected_interval() {
let d = fast_phi(8.0, 50);
d.heartbeat();
assert!(d.is_monitoring());
assert!(d.is_available());
std::thread::sleep(Duration::from_millis(2));
d.heartbeat();
assert!(d.is_available());
assert!(d.since_last_heartbeat().is_some());
}
#[test]
fn phi_unavailable_after_long_pause() {
let d = fast_phi(8.0, 1);
d.heartbeat();
assert!(d.is_available());
std::thread::sleep(Duration::from_millis(25));
assert!(!d.is_available(), "phi={} should exceed threshold", d.phi());
assert!(d.phi() >= 8.0);
}
#[test]
fn phi_reset_returns_to_initial_state() {
let d = fast_phi(8.0, 1);
d.heartbeat();
std::thread::sleep(Duration::from_millis(15));
assert!(!d.is_available());
d.reset();
assert!(d.is_available());
assert!(!d.is_monitoring());
assert!(d.since_last_heartbeat().is_none());
}
#[test]
fn deadline_new_detector_is_available() {
let d = DeadlineFailureDetector::new(Duration::from_millis(20));
assert!(d.is_available());
assert!(!d.is_monitoring());
}
#[test]
fn deadline_available_until_deadline_then_unavailable() {
let d = DeadlineFailureDetector::new(Duration::from_millis(20));
d.heartbeat();
assert!(d.is_monitoring());
assert!(d.is_available(), "available immediately after heartbeat");
std::thread::sleep(Duration::from_millis(5));
assert!(d.is_available(), "still available within deadline");
std::thread::sleep(Duration::from_millis(30));
assert!(!d.is_available(), "unavailable once deadline has elapsed");
}
#[test]
fn deadline_heartbeat_restores_availability() {
let d = DeadlineFailureDetector::new(Duration::from_millis(15));
d.heartbeat();
std::thread::sleep(Duration::from_millis(25));
assert!(!d.is_available());
d.heartbeat();
assert!(d.is_available());
}
#[test]
fn registry_unknown_address_is_available() {
let reg = FailureDetectorRegistry::default_phi();
let a = Address::remote("akka.tcp", "S", "10.0.0.1", 1111);
assert!(reg.is_available(&a));
assert!(reg.addresses().is_empty());
}
#[test]
fn registry_isolates_detectors_per_address() {
let factory: Arc<dyn Fn() -> Arc<dyn FailureDetector> + Send + Sync> =
Arc::new(|| Arc::new(fast_phi(8.0, 1)));
let reg = FailureDetectorRegistry::new(factory);
let a = Address::remote("akka.tcp", "S", "10.0.0.1", 1111);
let b = Address::remote("akka.tcp", "S", "10.0.0.2", 2222);
reg.heartbeat(&a);
reg.heartbeat(&b);
assert!(reg.is_available(&a));
assert!(reg.is_available(&b));
std::thread::sleep(Duration::from_millis(25));
reg.heartbeat(&b);
assert!(reg.is_available(&b), "b's fresh heartbeat keeps it alive");
assert!(!reg.is_available(&a), "a's missed heartbeats marked it down");
let addrs = reg.addresses();
assert_eq!(addrs.len(), 2);
}
#[test]
fn registry_remove_clears_detector() {
let reg = FailureDetectorRegistry::default_phi();
let a = Address::remote("akka.tcp", "S", "10.0.0.5", 5555);
reg.heartbeat(&a);
assert_eq!(reg.addresses().len(), 1);
reg.remove(&a);
assert!(reg.addresses().is_empty());
assert!(reg.is_available(&a), "removed address is treated as unknown");
reg.heartbeat(&a);
assert!(reg.is_available(&a));
}