use dashmap::DashMap;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::time::Instant;
use crate::metrics::ProxyHostStats;
struct HealthEntry {
window: VecDeque<bool>,
window_size: usize,
total_success: u32,
total_fail: u32,
latency_sum: f64,
latency_count: u32,
consecutive_fails: u32,
last_access: Option<Instant>,
last_success: Option<Instant>,
}
impl HealthEntry {
fn new(window_size: usize) -> Self {
Self {
window: VecDeque::with_capacity(window_size),
window_size,
total_success: 0,
total_fail: 0,
latency_sum: 0.0,
latency_count: 0,
consecutive_fails: 0,
last_access: None,
last_success: None,
}
}
fn push_result(&mut self, success: bool) {
if self.window.len() >= self.window_size {
self.window.pop_front();
}
self.window.push_back(success);
self.last_access = Some(Instant::now());
}
fn window_success_rate(&self) -> Option<f64> {
if self.window.is_empty() {
return None;
}
let successes = self.window.iter().filter(|&&s| s).count() as f64;
Some(successes / self.window.len() as f64)
}
}
pub struct HealthTracker {
entries: DashMap<(String, String), HealthEntry>,
window_size: usize,
}
impl HealthTracker {
pub fn new(window_size: usize) -> Self {
Self {
entries: DashMap::new(),
window_size,
}
}
pub fn record_success(&self, proxy: &str, host: &str, latency_ms: f64) {
let key = (proxy.to_string(), host.to_string());
let mut entry = self
.entries
.entry(key)
.or_insert_with(|| HealthEntry::new(self.window_size));
entry.push_result(true);
entry.total_success += 1;
entry.latency_sum += latency_ms;
entry.latency_count += 1;
entry.consecutive_fails = 0;
entry.last_success = Some(Instant::now());
}
pub fn record_failure(&self, proxy: &str, host: &str) {
let key = (proxy.to_string(), host.to_string());
let mut entry = self
.entries
.entry(key)
.or_insert_with(|| HealthEntry::new(self.window_size));
entry.push_result(false);
entry.total_fail += 1;
entry.consecutive_fails += 1;
}
#[allow(dead_code)]
pub fn get_stats(&self, proxy: &str, host: &str) -> Option<ProxyHostStats> {
let key = (proxy.to_string(), host.to_string());
self.entries.get(&key).map(|entry| {
let success_rate = entry.window_success_rate().unwrap_or(0.0);
let avg_latency_ms = if entry.latency_count > 0 {
entry.latency_sum / entry.latency_count as f64
} else {
0.0
};
ProxyHostStats {
success: entry.total_success,
fail: entry.total_fail,
success_rate,
avg_latency_ms,
consecutive_fails: entry.consecutive_fails,
}
})
}
pub fn get_affinity(&self, proxy: &str, host: &str) -> f64 {
let key = (proxy.to_string(), host.to_string());
self.entries
.get(&key)
.and_then(|entry| entry.window_success_rate())
.unwrap_or(0.5)
}
pub fn get_global_health(&self, proxy: &str) -> f64 {
let mut total_successes: usize = 0;
let mut total_samples: usize = 0;
for entry in self.entries.iter() {
let (p, _) = entry.key();
if p == proxy {
let v = entry.value();
total_successes += v.window.iter().filter(|&&s| s).count();
total_samples += v.window.len();
}
}
if total_samples == 0 {
0.5
} else {
total_successes as f64 / total_samples as f64
}
}
pub fn seconds_since_last_access(&self, proxy: &str, host: &str) -> f64 {
let key = (proxy.to_string(), host.to_string());
self.entries
.get(&key)
.map(|e| match e.last_access {
Some(t) => t.elapsed().as_secs_f64(),
None => f64::MAX,
})
.unwrap_or(f64::MAX)
}
pub fn get_consecutive_fails(&self, proxy: &str, host: &str) -> u32 {
let key = (proxy.to_string(), host.to_string());
self.entries
.get(&key)
.map(|e| e.consecutive_fails)
.unwrap_or(0)
}
pub fn minutes_since_last_success(&self, proxy: &str, host: &str) -> f64 {
let key = (proxy.to_string(), host.to_string());
self.entries
.get(&key)
.map(|e| match e.last_success {
Some(t) => t.elapsed().as_secs_f64() / 60.0,
None => f64::MAX,
})
.unwrap_or(f64::MAX)
}
pub fn avg_success_rate_for_host(&self, host: &str) -> f64 {
let mut sum = 0.0;
let mut count = 0usize;
for entry in self.entries.iter() {
let (_, h) = entry.key();
if h == host {
if let Some(rate) = entry.value().window_success_rate() {
sum += rate;
count += 1;
}
}
}
if count == 0 {
0.5
} else {
sum / count as f64
}
}
pub fn get_all_stats(&self) -> HashMap<String, HashMap<String, ProxyHostStats>> {
let mut result: HashMap<String, HashMap<String, ProxyHostStats>> = HashMap::new();
for entry in self.entries.iter() {
let (proxy, host) = entry.key();
let v = entry.value();
let success_rate = v.window_success_rate().unwrap_or(0.0);
let avg_latency_ms = if v.latency_count > 0 {
v.latency_sum / v.latency_count as f64
} else {
0.0
};
let stats = ProxyHostStats {
success: v.total_success,
fail: v.total_fail,
success_rate,
avg_latency_ms,
consecutive_fails: v.consecutive_fails,
};
result
.entry(proxy.clone())
.or_default()
.insert(host.clone(), stats);
}
result
}
pub fn restore(&self, proxy: &str, host: &str, stats: &ProxyHostStats) {
let key = (proxy.to_string(), host.to_string());
let mut entry = HealthEntry::new(self.window_size);
entry.total_success = stats.success;
entry.total_fail = stats.fail;
entry.consecutive_fails = stats.consecutive_fails;
if stats.success > 0 {
entry.latency_sum = stats.avg_latency_ms * stats.success as f64;
entry.latency_count = stats.success;
}
let total = (stats.success + stats.fail) as usize;
let window_count = total.min(self.window_size);
let successes_in_window = (stats.success_rate * window_count as f64).round() as usize;
let failures_in_window = window_count.saturating_sub(successes_in_window);
for _ in 0..failures_in_window {
entry.window.push_back(false);
}
for _ in 0..successes_in_window {
entry.window.push_back(true);
}
self.entries.insert(key, entry);
}
pub fn total_success(&self) -> u64 {
self.entries
.iter()
.map(|e| e.value().total_success as u64)
.sum()
}
pub fn total_fail(&self) -> u64 {
self.entries
.iter()
.map(|e| e.value().total_fail as u64)
.sum()
}
pub fn avg_latency_ms(&self) -> f64 {
let mut total_sum = 0.0f64;
let mut total_count = 0u64;
for entry in self.entries.iter() {
total_sum += entry.value().latency_sum;
total_count += entry.value().latency_count as u64;
}
if total_count == 0 {
0.0
} else {
total_sum / total_count as f64
}
}
pub fn total_samples_for_proxy(&self, proxy: &str) -> u32 {
let mut total = 0u32;
for entry in self.entries.iter() {
let (p, _) = entry.key();
if p == proxy {
let v = entry.value();
total += v.total_success + v.total_fail;
}
}
total
}
pub fn global_success_rate_for_proxy(&self, proxy: &str) -> f64 {
let mut total_success = 0u64;
let mut total_samples = 0u64;
for entry in self.entries.iter() {
let (p, _) = entry.key();
if p == proxy {
let v = entry.value();
total_success += v.total_success as u64;
total_samples += (v.total_success + v.total_fail) as u64;
}
}
if total_samples == 0 {
0.0
} else {
total_success as f64 / total_samples as f64
}
}
}
impl Default for HealthTracker {
fn default() -> Self {
Self::new(30)
}
}
#[cfg(test)]
mod tests {
use super::*;
const PROXY_A: &str = "socks5://1.2.3.4:1080";
const PROXY_B: &str = "socks5://5.6.7.8:9050";
const HOST_X: &str = "yunhq.sse.com.cn";
const HOST_Y: &str = "www.szse.cn";
fn tracker() -> HealthTracker {
HealthTracker::new(5)
}
#[test]
fn record_success_updates_stats() {
let ht = tracker();
ht.record_success(PROXY_A, HOST_X, 120.0);
let stats = ht.get_stats(PROXY_A, HOST_X).unwrap();
assert_eq!(stats.success, 1);
assert_eq!(stats.fail, 0);
assert!((stats.success_rate - 1.0).abs() < f64::EPSILON);
assert!((stats.avg_latency_ms - 120.0).abs() < f64::EPSILON);
assert_eq!(stats.consecutive_fails, 0);
}
#[test]
fn record_failure_updates_stats() {
let ht = tracker();
ht.record_failure(PROXY_A, HOST_X);
let stats = ht.get_stats(PROXY_A, HOST_X).unwrap();
assert_eq!(stats.success, 0);
assert_eq!(stats.fail, 1);
assert!((stats.success_rate).abs() < f64::EPSILON);
assert_eq!(stats.consecutive_fails, 1);
}
#[test]
fn success_resets_consecutive_fails() {
let ht = tracker();
ht.record_failure(PROXY_A, HOST_X);
ht.record_failure(PROXY_A, HOST_X);
assert_eq!(ht.get_consecutive_fails(PROXY_A, HOST_X), 2);
ht.record_success(PROXY_A, HOST_X, 50.0);
assert_eq!(ht.get_consecutive_fails(PROXY_A, HOST_X), 0);
}
#[test]
fn consecutive_fails_increments() {
let ht = tracker();
for _ in 0..5 {
ht.record_failure(PROXY_A, HOST_X);
}
assert_eq!(ht.get_consecutive_fails(PROXY_A, HOST_X), 5);
}
#[test]
fn sliding_window_evicts_oldest() {
let ht = HealthTracker::new(3);
ht.record_failure(PROXY_A, HOST_X);
ht.record_failure(PROXY_A, HOST_X);
ht.record_failure(PROXY_A, HOST_X);
assert!((ht.get_affinity(PROXY_A, HOST_X)).abs() < f64::EPSILON);
ht.record_success(PROXY_A, HOST_X, 10.0);
ht.record_success(PROXY_A, HOST_X, 10.0);
ht.record_success(PROXY_A, HOST_X, 10.0);
assert!((ht.get_affinity(PROXY_A, HOST_X) - 1.0).abs() < f64::EPSILON);
}
#[test]
fn window_size_respected() {
let ht = HealthTracker::new(4);
ht.record_success(PROXY_A, HOST_X, 10.0);
ht.record_success(PROXY_A, HOST_X, 10.0);
ht.record_failure(PROXY_A, HOST_X);
ht.record_failure(PROXY_A, HOST_X);
assert!((ht.get_affinity(PROXY_A, HOST_X) - 0.5).abs() < f64::EPSILON);
ht.record_success(PROXY_A, HOST_X, 10.0);
assert!((ht.get_affinity(PROXY_A, HOST_X) - 0.5).abs() < f64::EPSILON);
}
#[test]
fn affinity_returns_half_when_unknown() {
let ht = tracker();
assert!((ht.get_affinity(PROXY_A, HOST_X) - 0.5).abs() < f64::EPSILON);
}
#[test]
fn affinity_reflects_window() {
let ht = HealthTracker::new(4);
ht.record_success(PROXY_A, HOST_X, 10.0);
ht.record_success(PROXY_A, HOST_X, 10.0);
ht.record_success(PROXY_A, HOST_X, 10.0);
ht.record_failure(PROXY_A, HOST_X);
assert!((ht.get_affinity(PROXY_A, HOST_X) - 0.75).abs() < f64::EPSILON);
}
#[test]
fn global_health_returns_half_when_unknown() {
let ht = tracker();
assert!((ht.get_global_health(PROXY_A) - 0.5).abs() < f64::EPSILON);
}
#[test]
fn global_health_aggregates_across_hosts() {
let ht = tracker();
ht.record_success(PROXY_A, HOST_X, 10.0);
ht.record_success(PROXY_A, HOST_X, 10.0);
ht.record_success(PROXY_A, HOST_Y, 10.0);
ht.record_failure(PROXY_A, HOST_Y);
assert!((ht.get_global_health(PROXY_A) - 0.75).abs() < f64::EPSILON);
}
#[test]
fn global_health_ignores_other_proxies() {
let ht = tracker();
ht.record_success(PROXY_A, HOST_X, 10.0);
ht.record_failure(PROXY_B, HOST_X);
assert!((ht.get_global_health(PROXY_A) - 1.0).abs() < f64::EPSILON);
assert!(ht.get_global_health(PROXY_B).abs() < f64::EPSILON);
}
#[test]
fn minutes_since_last_success_returns_max_when_unknown() {
let ht = tracker();
assert_eq!(ht.minutes_since_last_success(PROXY_A, HOST_X), f64::MAX);
}
#[test]
fn minutes_since_last_success_returns_max_when_only_failures() {
let ht = tracker();
ht.record_failure(PROXY_A, HOST_X);
assert_eq!(ht.minutes_since_last_success(PROXY_A, HOST_X), f64::MAX);
}
#[test]
fn minutes_since_last_success_is_small_after_success() {
let ht = tracker();
ht.record_success(PROXY_A, HOST_X, 10.0);
assert!(ht.minutes_since_last_success(PROXY_A, HOST_X) < 0.1);
}
#[test]
fn avg_success_rate_for_host_returns_half_when_unknown() {
let ht = tracker();
assert!((ht.avg_success_rate_for_host(HOST_X) - 0.5).abs() < f64::EPSILON);
}
#[test]
fn avg_success_rate_for_host_averages_across_proxies() {
let ht = HealthTracker::new(10);
ht.record_success(PROXY_A, HOST_X, 10.0);
ht.record_success(PROXY_A, HOST_X, 10.0);
ht.record_success(PROXY_B, HOST_X, 10.0);
ht.record_failure(PROXY_B, HOST_X);
assert!((ht.avg_success_rate_for_host(HOST_X) - 0.75).abs() < f64::EPSILON);
}
#[test]
fn get_stats_returns_none_for_unknown_pair() {
let ht = tracker();
assert!(ht.get_stats(PROXY_A, HOST_X).is_none());
}
#[test]
fn get_all_stats_returns_empty_when_no_data() {
let ht = tracker();
assert!(ht.get_all_stats().is_empty());
}
#[test]
fn get_all_stats_contains_all_pairs() {
let ht = tracker();
ht.record_success(PROXY_A, HOST_X, 10.0);
ht.record_failure(PROXY_B, HOST_Y);
let all = ht.get_all_stats();
assert_eq!(all.len(), 2); assert!(all.contains_key(PROXY_A));
assert!(all.contains_key(PROXY_B));
assert!(all[PROXY_A].contains_key(HOST_X));
assert!(all[PROXY_B].contains_key(HOST_Y));
}
#[test]
fn restore_recreates_entry() {
let ht = tracker();
let stats = ProxyHostStats {
success: 20,
fail: 5,
success_rate: 0.8,
avg_latency_ms: 100.0,
consecutive_fails: 2,
};
ht.restore(PROXY_A, HOST_X, &stats);
let restored = ht.get_stats(PROXY_A, HOST_X).unwrap();
assert_eq!(restored.success, 20);
assert_eq!(restored.fail, 5);
assert_eq!(restored.consecutive_fails, 2);
assert!((restored.success_rate - 0.8).abs() < 0.1);
assert!((restored.avg_latency_ms - 100.0).abs() < f64::EPSILON);
}
#[test]
fn restore_with_zero_success_preserves_zero_latency() {
let ht = tracker();
let stats = ProxyHostStats {
success: 0,
fail: 3,
success_rate: 0.0,
avg_latency_ms: 0.0,
consecutive_fails: 3,
};
ht.restore(PROXY_A, HOST_X, &stats);
let restored = ht.get_stats(PROXY_A, HOST_X).unwrap();
assert_eq!(restored.success, 0);
assert_eq!(restored.fail, 3);
assert!((restored.avg_latency_ms).abs() < f64::EPSILON);
}
#[test]
fn restore_approximates_window() {
let ht = HealthTracker::new(10);
let stats = ProxyHostStats {
success: 100,
fail: 0,
success_rate: 1.0,
avg_latency_ms: 50.0,
consecutive_fails: 0,
};
ht.restore(PROXY_A, HOST_X, &stats);
assert!((ht.get_affinity(PROXY_A, HOST_X) - 1.0).abs() < f64::EPSILON);
}
#[test]
fn total_success_accumulates_across_all_pairs() {
let ht = tracker();
ht.record_success(PROXY_A, HOST_X, 10.0);
ht.record_success(PROXY_A, HOST_X, 10.0);
ht.record_success(PROXY_B, HOST_Y, 10.0);
assert_eq!(ht.total_success(), 3);
}
#[test]
fn total_fail_accumulates_across_all_pairs() {
let ht = tracker();
ht.record_failure(PROXY_A, HOST_X);
ht.record_failure(PROXY_B, HOST_Y);
ht.record_failure(PROXY_B, HOST_Y);
assert_eq!(ht.total_fail(), 3);
}
#[test]
fn avg_latency_ms_zero_when_no_data() {
let ht = tracker();
assert!((ht.avg_latency_ms()).abs() < f64::EPSILON);
}
#[test]
fn avg_latency_ms_global_average() {
let ht = tracker();
ht.record_success(PROXY_A, HOST_X, 100.0);
ht.record_success(PROXY_B, HOST_Y, 200.0);
assert!((ht.avg_latency_ms() - 150.0).abs() < f64::EPSILON);
}
#[test]
fn avg_latency_ms_ignores_failures() {
let ht = tracker();
ht.record_success(PROXY_A, HOST_X, 100.0);
ht.record_failure(PROXY_A, HOST_X);
assert!((ht.avg_latency_ms() - 100.0).abs() < f64::EPSILON);
}
#[test]
fn total_samples_for_proxy_counts_across_hosts() {
let ht = tracker();
ht.record_success(PROXY_A, HOST_X, 10.0);
ht.record_failure(PROXY_A, HOST_X);
ht.record_success(PROXY_A, HOST_Y, 10.0);
assert_eq!(ht.total_samples_for_proxy(PROXY_A), 3);
}
#[test]
fn total_samples_for_proxy_ignores_other_proxies() {
let ht = tracker();
ht.record_success(PROXY_A, HOST_X, 10.0);
ht.record_failure(PROXY_B, HOST_X);
assert_eq!(ht.total_samples_for_proxy(PROXY_A), 1);
}
#[test]
fn total_samples_for_proxy_zero_when_unknown() {
let ht = tracker();
assert_eq!(ht.total_samples_for_proxy("unknown"), 0);
}
#[test]
fn global_success_rate_for_proxy_zero_when_unknown() {
let ht = tracker();
assert!(ht.global_success_rate_for_proxy("unknown").abs() < f64::EPSILON);
}
#[test]
fn global_success_rate_for_proxy_lifetime_rate() {
let ht = tracker();
ht.record_success(PROXY_A, HOST_X, 10.0);
ht.record_success(PROXY_A, HOST_X, 10.0);
ht.record_failure(PROXY_A, HOST_Y);
let rate = ht.global_success_rate_for_proxy(PROXY_A);
assert!((rate - 2.0 / 3.0).abs() < f64::EPSILON);
}
#[test]
fn default_uses_window_size_30() {
let ht = HealthTracker::default();
assert_eq!(ht.window_size, 30);
}
#[test]
fn window_size_one() {
let ht = HealthTracker::new(1);
ht.record_success(PROXY_A, HOST_X, 10.0);
assert!((ht.get_affinity(PROXY_A, HOST_X) - 1.0).abs() < f64::EPSILON);
ht.record_failure(PROXY_A, HOST_X);
assert!(ht.get_affinity(PROXY_A, HOST_X).abs() < f64::EPSILON);
}
#[test]
fn many_records_beyond_window() {
let ht = HealthTracker::new(3);
for _ in 0..100 {
ht.record_failure(PROXY_A, HOST_X);
}
for _ in 0..3 {
ht.record_success(PROXY_A, HOST_X, 10.0);
}
assert!((ht.get_affinity(PROXY_A, HOST_X) - 1.0).abs() < f64::EPSILON);
assert_eq!(ht.get_stats(PROXY_A, HOST_X).unwrap().fail, 100);
}
#[test]
fn separate_pairs_are_independent() {
let ht = tracker();
ht.record_success(PROXY_A, HOST_X, 10.0);
ht.record_failure(PROXY_A, HOST_Y);
ht.record_failure(PROXY_B, HOST_X);
assert!((ht.get_affinity(PROXY_A, HOST_X) - 1.0).abs() < f64::EPSILON);
assert!(ht.get_affinity(PROXY_A, HOST_Y).abs() < f64::EPSILON);
assert!(ht.get_affinity(PROXY_B, HOST_X).abs() < f64::EPSILON);
}
#[test]
fn consecutive_fails_for_unknown_is_zero() {
let ht = tracker();
assert_eq!(ht.get_consecutive_fails("no", "where"), 0);
}
#[test]
fn latency_average_across_multiple_successes() {
let ht = tracker();
ht.record_success(PROXY_A, HOST_X, 100.0);
ht.record_success(PROXY_A, HOST_X, 200.0);
ht.record_success(PROXY_A, HOST_X, 300.0);
let stats = ht.get_stats(PROXY_A, HOST_X).unwrap();
assert!((stats.avg_latency_ms - 200.0).abs() < 0.01);
}
#[test]
fn seconds_since_last_access_returns_max_when_unknown() {
let ht = tracker();
assert_eq!(ht.seconds_since_last_access(PROXY_A, HOST_X), f64::MAX);
}
#[test]
fn seconds_since_last_access_is_small_after_record() {
let ht = tracker();
ht.record_failure(PROXY_A, HOST_X);
let secs = ht.seconds_since_last_access(PROXY_A, HOST_X);
assert!(secs < 1.0, "expected < 1s, got {secs}");
}
#[test]
fn seconds_since_last_access_updates_on_success_too() {
let ht = tracker();
ht.record_success(PROXY_A, HOST_X, 50.0);
let secs = ht.seconds_since_last_access(PROXY_A, HOST_X);
assert!(secs < 1.0, "expected < 1s, got {secs}");
}
}