use std::collections::HashMap;
use std::collections::VecDeque;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Default)]
pub struct PoolMetrics {
pub total_proxies: usize,
pub healthy_proxies: usize,
pub cooldown_proxies: usize,
pub dead_proxies: usize,
pub pending_tasks: usize,
pub completed_tasks: u64,
pub failed_tasks: u64,
pub throughput_1s: f64,
pub throughput_10s: f64,
pub throughput_60s: f64,
pub success_rate_1m: f64,
pub avg_latency_ms: f64,
pub inflight: usize,
pub circuit_breakers: HashMap<String, bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProxyHostStats {
pub success: u32,
pub fail: u32,
pub success_rate: f64,
pub avg_latency_ms: f64,
pub consecutive_fails: u32,
}
impl Default for ProxyHostStats {
fn default() -> Self {
Self {
success: 0,
fail: 0,
success_rate: 0.0,
avg_latency_ms: 0.0,
consecutive_fails: 0,
}
}
}
pub(crate) struct ThroughputTracker {
timestamps: Mutex<VecDeque<Instant>>,
max_window: Duration,
}
impl ThroughputTracker {
pub fn new() -> Self {
Self::with_max_window(Duration::from_secs(60))
}
pub fn with_max_window(max_window: Duration) -> Self {
Self {
timestamps: Mutex::new(VecDeque::new()),
max_window,
}
}
pub fn record(&self) {
self.record_at(Instant::now());
}
pub(crate) fn record_at(&self, now: Instant) {
let mut ts = self.timestamps.lock().unwrap();
ts.push_back(now);
Self::prune(&mut ts, now, self.max_window);
}
pub fn throughput(&self, window: Duration) -> f64 {
self.throughput_at(Instant::now(), window)
}
pub(crate) fn throughput_at(&self, now: Instant, window: Duration) -> f64 {
if window.is_zero() {
return 0.0;
}
let mut ts = self.timestamps.lock().unwrap();
Self::prune(&mut ts, now, self.max_window);
let cutoff = now.checked_sub(window).unwrap_or(now);
let count = ts.iter().filter(|&&t| t >= cutoff).count();
count as f64 / window.as_secs_f64()
}
fn prune(ts: &mut VecDeque<Instant>, now: Instant, max_window: Duration) {
let cutoff = now.checked_sub(max_window).unwrap_or(now);
while let Some(&front) = ts.front() {
if front < cutoff {
ts.pop_front();
} else {
break;
}
}
}
#[cfg(test)]
fn len(&self) -> usize {
let ts = self.timestamps.lock().unwrap();
ts.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn pool_metrics_default_is_zeroed() {
let m = PoolMetrics::default();
assert_eq!(m.total_proxies, 0);
assert_eq!(m.healthy_proxies, 0);
assert_eq!(m.cooldown_proxies, 0);
assert_eq!(m.dead_proxies, 0);
assert_eq!(m.pending_tasks, 0);
assert_eq!(m.completed_tasks, 0);
assert_eq!(m.failed_tasks, 0);
assert!((m.throughput_1s).abs() < f64::EPSILON);
assert!((m.throughput_10s).abs() < f64::EPSILON);
assert!((m.throughput_60s).abs() < f64::EPSILON);
assert!((m.success_rate_1m).abs() < f64::EPSILON);
assert!((m.avg_latency_ms).abs() < f64::EPSILON);
assert_eq!(m.inflight, 0);
assert!(m.circuit_breakers.is_empty());
}
#[test]
fn pool_metrics_is_clone() {
let m = PoolMetrics {
total_proxies: 42,
circuit_breakers: HashMap::from([("host.com".into(), true)]),
..Default::default()
};
let m2 = m.clone();
assert_eq!(m2.total_proxies, 42);
assert_eq!(m2.circuit_breakers.get("host.com"), Some(&true));
}
#[test]
fn proxy_host_stats_default() {
let s = ProxyHostStats::default();
assert_eq!(s.success, 0);
assert_eq!(s.fail, 0);
assert!((s.success_rate).abs() < f64::EPSILON);
assert!((s.avg_latency_ms).abs() < f64::EPSILON);
assert_eq!(s.consecutive_fails, 0);
}
#[test]
fn proxy_host_stats_serde_round_trip() {
let stats = ProxyHostStats {
success: 10,
fail: 2,
success_rate: 0.833,
avg_latency_ms: 120.5,
consecutive_fails: 0,
};
let json = serde_json::to_string(&stats).unwrap();
let deser: ProxyHostStats = serde_json::from_str(&json).unwrap();
assert_eq!(deser.success, 10);
assert_eq!(deser.fail, 2);
assert!((deser.success_rate - 0.833).abs() < 1e-6);
assert!((deser.avg_latency_ms - 120.5).abs() < 1e-6);
assert_eq!(deser.consecutive_fails, 0);
}
#[test]
fn new_tracker_is_empty() {
let t = ThroughputTracker::new();
assert_eq!(t.len(), 0);
assert!((t.throughput(Duration::from_secs(1))).abs() < f64::EPSILON);
}
#[test]
fn record_increases_count() {
let t = ThroughputTracker::new();
t.record();
assert_eq!(t.len(), 1);
t.record();
assert_eq!(t.len(), 2);
}
#[test]
fn throughput_with_zero_window_is_zero() {
let t = ThroughputTracker::new();
t.record();
assert!((t.throughput(Duration::ZERO)).abs() < f64::EPSILON);
}
#[test]
fn throughput_over_1s_window() {
let t = ThroughputTracker::new();
let now = Instant::now();
for i in 0..5 {
t.record_at(now - Duration::from_millis(100 * i));
}
let tp = t.throughput_at(now, Duration::from_secs(1));
assert!((tp - 5.0).abs() < 0.01);
}
#[test]
fn throughput_over_10s_window() {
let t = ThroughputTracker::new();
let now = Instant::now();
for i in 0..20 {
t.record_at(now - Duration::from_millis(500 * i));
}
let tp = t.throughput_at(now, Duration::from_secs(10));
assert!((tp - 2.0).abs() < 0.01);
}
#[test]
fn old_entries_are_pruned() {
let t = ThroughputTracker::with_max_window(Duration::from_secs(5));
let now = Instant::now();
t.record_at(now - Duration::from_secs(10));
t.record_at(now - Duration::from_secs(1));
let tp = t.throughput_at(now, Duration::from_secs(5));
assert!((tp - 0.2).abs() < 0.01); assert_eq!(t.len(), 1);
}
#[test]
fn window_larger_than_max_window_still_works() {
let t = ThroughputTracker::with_max_window(Duration::from_secs(5));
let now = Instant::now();
t.record_at(now - Duration::from_secs(3));
t.record_at(now - Duration::from_secs(1));
let tp = t.throughput_at(now, Duration::from_secs(60));
assert!((tp - 2.0 / 60.0).abs() < 0.01);
}
#[test]
fn concurrent_access_does_not_panic() {
use std::sync::Arc;
use std::thread;
let tracker = Arc::new(ThroughputTracker::new());
let mut handles = Vec::new();
for _ in 0..8 {
let t = Arc::clone(&tracker);
handles.push(thread::spawn(move || {
for _ in 0..100 {
t.record();
let _ = t.throughput(Duration::from_secs(1));
}
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(tracker.len(), 800);
}
#[test]
fn throughput_only_counts_events_within_requested_window() {
let t = ThroughputTracker::new();
let now = Instant::now();
for _ in 0..3 {
t.record_at(now - Duration::from_millis(500));
}
for _ in 0..2 {
t.record_at(now - Duration::from_secs(5));
}
let tp_1s = t.throughput_at(now, Duration::from_secs(1));
assert!((tp_1s - 3.0).abs() < 0.01);
let tp_10s = t.throughput_at(now, Duration::from_secs(10));
assert!((tp_10s - 0.5).abs() < 0.01);
}
#[test]
fn custom_max_window() {
let t = ThroughputTracker::with_max_window(Duration::from_secs(2));
assert_eq!(t.max_window, Duration::from_secs(2));
}
#[test]
fn default_max_window_is_60s() {
let t = ThroughputTracker::new();
assert_eq!(t.max_window, Duration::from_secs(60));
}
}