use std::{
collections::HashMap,
sync::{Mutex, OnceLock},
time::{Duration, Instant},
};
pub(crate) const FAILURE_THRESHOLD: u32 = 3;
pub(crate) const QUARANTINE_DURATION: Duration = Duration::from_secs(300);
#[derive(Debug, Clone, Default)]
pub(crate) struct NodeHealth {
consecutive_failures: u32,
quarantined_until: Option<Instant>,
}
impl NodeHealth {
fn is_quarantined(&self, now: Instant) -> bool {
match self.quarantined_until {
Some(until) => now < until,
None => false,
}
}
}
fn registry() -> &'static Mutex<HashMap<String, NodeHealth>> {
static REGISTRY: OnceLock<Mutex<HashMap<String, NodeHealth>>> = OnceLock::new();
REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum HealthTransition {
None,
Quarantined { consecutive: u32 },
Recovered,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum NodeOutcome {
Success,
RetryableFault,
}
pub(crate) fn record(url: &str, outcome: NodeOutcome) -> HealthTransition {
record_at(url, outcome, Instant::now())
}
fn record_at(url: &str, outcome: NodeOutcome, now: Instant) -> HealthTransition {
let mut map = match registry().lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
let entry = map.entry(url.to_string()).or_default();
match outcome {
NodeOutcome::Success => {
let was_quarantined = entry.quarantined_until.is_some();
entry.consecutive_failures = 0;
entry.quarantined_until = None;
if was_quarantined {
HealthTransition::Recovered
} else {
HealthTransition::None
}
}
NodeOutcome::RetryableFault => {
if entry.is_quarantined(now) {
return HealthTransition::None;
}
entry.consecutive_failures = entry.consecutive_failures.saturating_add(1);
if entry.consecutive_failures >= FAILURE_THRESHOLD {
entry.quarantined_until = Some(now + QUARANTINE_DURATION);
HealthTransition::Quarantined {
consecutive: entry.consecutive_failures,
}
} else {
HealthTransition::None
}
}
}
}
#[cfg(test)]
fn is_quarantined_at(url: &str, now: Instant) -> bool {
let map = match registry().lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
map.get(url).map(|h| h.is_quarantined(now)).unwrap_or(false)
}
pub(crate) fn eligible_indices(urls: &[String]) -> Vec<usize> {
eligible_indices_at(urls, Instant::now())
}
fn eligible_indices_at(urls: &[String], now: Instant) -> Vec<usize> {
let map = match registry().lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
let healthy: Vec<usize> = urls
.iter()
.enumerate()
.filter(|(_, url)| !map.get(*url).map(|h| h.is_quarantined(now)).unwrap_or(false))
.map(|(i, _)| i)
.collect();
if healthy.is_empty() {
(0..urls.len()).collect()
} else {
healthy
}
}
#[cfg(test)]
mod tests {
use super::*;
fn unique(tag: &str) -> String {
use std::sync::atomic::{AtomicU64, Ordering};
static N: AtomicU64 = AtomicU64::new(0);
format!("https://node-{}-{}.test", tag, N.fetch_add(1, Ordering::Relaxed))
}
#[test]
fn trips_after_threshold_consecutive_faults() {
let url = unique("trip");
let t0 = Instant::now();
assert_eq!(record_at(&url, NodeOutcome::RetryableFault, t0), HealthTransition::None);
assert_eq!(record_at(&url, NodeOutcome::RetryableFault, t0), HealthTransition::None);
assert!(!is_quarantined_at(&url, t0));
assert_eq!(
record_at(&url, NodeOutcome::RetryableFault, t0),
HealthTransition::Quarantined { consecutive: 3 }
);
assert!(is_quarantined_at(&url, t0));
}
#[test]
fn success_resets_counter_before_threshold() {
let url = unique("reset");
let t0 = Instant::now();
record_at(&url, NodeOutcome::RetryableFault, t0);
record_at(&url, NodeOutcome::RetryableFault, t0);
assert_eq!(record_at(&url, NodeOutcome::Success, t0), HealthTransition::None);
record_at(&url, NodeOutcome::RetryableFault, t0);
record_at(&url, NodeOutcome::RetryableFault, t0);
assert!(!is_quarantined_at(&url, t0));
assert_eq!(
record_at(&url, NodeOutcome::RetryableFault, t0),
HealthTransition::Quarantined { consecutive: 3 }
);
}
#[test]
fn quarantine_expires_after_duration() {
let url = unique("expire");
let t0 = Instant::now();
for _ in 0..FAILURE_THRESHOLD {
record_at(&url, NodeOutcome::RetryableFault, t0);
}
assert!(is_quarantined_at(&url, t0));
let almost = t0 + QUARANTINE_DURATION - Duration::from_secs(1);
assert!(is_quarantined_at(&url, almost));
let after = t0 + QUARANTINE_DURATION + Duration::from_secs(1);
assert!(!is_quarantined_at(&url, after));
}
#[test]
fn first_success_after_expiry_reports_recovered() {
let url = unique("recover");
let t0 = Instant::now();
for _ in 0..FAILURE_THRESHOLD {
record_at(&url, NodeOutcome::RetryableFault, t0);
}
let after = t0 + QUARANTINE_DURATION + Duration::from_secs(1);
assert!(!is_quarantined_at(&url, after));
assert_eq!(record_at(&url, NodeOutcome::Success, after), HealthTransition::Recovered);
assert_eq!(record_at(&url, NodeOutcome::Success, after), HealthTransition::None);
}
#[test]
fn fault_while_quarantined_does_not_re_escalate() {
let url = unique("noescalate");
let t0 = Instant::now();
for _ in 0..FAILURE_THRESHOLD {
record_at(&url, NodeOutcome::RetryableFault, t0);
}
assert_eq!(record_at(&url, NodeOutcome::RetryableFault, t0), HealthTransition::None);
assert_eq!(record_at(&url, NodeOutcome::RetryableFault, t0), HealthTransition::None);
}
#[test]
fn steam_tunneled_errors_count_as_healthy() {
let url = unique("tunnel");
let t0 = Instant::now();
for _ in 0..(FAILURE_THRESHOLD * 3) {
assert_eq!(record_at(&url, NodeOutcome::Success, t0), HealthTransition::None);
}
assert!(!is_quarantined_at(&url, t0));
}
#[test]
fn eligible_skips_quarantined_nodes() {
let healthy = unique("elig-healthy");
let sick = unique("elig-sick");
let urls = vec![healthy.clone(), sick.clone()];
let t0 = Instant::now();
for _ in 0..FAILURE_THRESHOLD {
record_at(&sick, NodeOutcome::RetryableFault, t0);
}
assert_eq!(eligible_indices_at(&urls, t0), vec![0]);
}
#[test]
fn all_quarantined_falls_back_to_everything() {
let a = unique("all-a");
let b = unique("all-b");
let urls = vec![a.clone(), b.clone()];
let t0 = Instant::now();
for _ in 0..FAILURE_THRESHOLD {
record_at(&a, NodeOutcome::RetryableFault, t0);
record_at(&b, NodeOutcome::RetryableFault, t0);
}
assert_eq!(eligible_indices_at(&urls, t0), vec![0, 1]);
}
}