1use std::collections::HashMap;
2use std::collections::VecDeque;
3use std::sync::Mutex;
4use std::time::{Duration, Instant};
5
6use serde::{Deserialize, Serialize};
7
8#[derive(Debug, Clone, Default)]
13pub struct PoolMetrics {
14 pub total_proxies: usize,
16 pub healthy_proxies: usize,
17 pub cooldown_proxies: usize,
18 pub dead_proxies: usize,
19
20 pub pending_tasks: usize,
22 pub completed_tasks: u64,
23 pub failed_tasks: u64,
24
25 pub throughput_1s: f64,
27 pub throughput_10s: f64,
28 pub throughput_60s: f64,
29
30 pub success_rate_1m: f64,
32 pub avg_latency_ms: f64,
33
34 pub inflight: usize,
36 pub circuit_breakers: HashMap<String, bool>,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct ProxyHostStats {
43 pub success: u32,
44 pub fail: u32,
45 pub success_rate: f64,
46 pub avg_latency_ms: f64,
47 pub consecutive_fails: u32,
48}
49
50impl Default for ProxyHostStats {
51 fn default() -> Self {
52 Self {
53 success: 0,
54 fail: 0,
55 success_rate: 0.0,
56 avg_latency_ms: 0.0,
57 consecutive_fails: 0,
58 }
59 }
60}
61
62pub(crate) struct ThroughputTracker {
71 timestamps: Mutex<VecDeque<Instant>>,
72 max_window: Duration,
73}
74
75impl ThroughputTracker {
76 pub fn new() -> Self {
78 Self::with_max_window(Duration::from_secs(60))
79 }
80
81 pub fn with_max_window(max_window: Duration) -> Self {
83 Self {
84 timestamps: Mutex::new(VecDeque::new()),
85 max_window,
86 }
87 }
88
89 pub fn record(&self) {
91 self.record_at(Instant::now());
92 }
93
94 pub(crate) fn record_at(&self, now: Instant) {
96 let mut ts = self.timestamps.lock().unwrap();
97 ts.push_back(now);
98 Self::prune(&mut ts, now, self.max_window);
99 }
100
101 pub fn throughput(&self, window: Duration) -> f64 {
105 self.throughput_at(Instant::now(), window)
106 }
107
108 pub(crate) fn throughput_at(&self, now: Instant, window: Duration) -> f64 {
110 if window.is_zero() {
111 return 0.0;
112 }
113 let mut ts = self.timestamps.lock().unwrap();
114 Self::prune(&mut ts, now, self.max_window);
115
116 let cutoff = now.checked_sub(window).unwrap_or(now);
117 let count = ts.iter().filter(|&&t| t >= cutoff).count();
118 count as f64 / window.as_secs_f64()
119 }
120
121 fn prune(ts: &mut VecDeque<Instant>, now: Instant, max_window: Duration) {
123 let cutoff = now.checked_sub(max_window).unwrap_or(now);
124 while let Some(&front) = ts.front() {
125 if front < cutoff {
126 ts.pop_front();
127 } else {
128 break;
129 }
130 }
131 }
132
133 #[cfg(test)]
135 fn len(&self) -> usize {
136 let ts = self.timestamps.lock().unwrap();
137 ts.len()
138 }
139}
140
141#[cfg(test)]
144mod tests {
145 use super::*;
146 use std::time::Duration;
147
148 #[test]
151 fn pool_metrics_default_is_zeroed() {
152 let m = PoolMetrics::default();
153 assert_eq!(m.total_proxies, 0);
154 assert_eq!(m.healthy_proxies, 0);
155 assert_eq!(m.cooldown_proxies, 0);
156 assert_eq!(m.dead_proxies, 0);
157 assert_eq!(m.pending_tasks, 0);
158 assert_eq!(m.completed_tasks, 0);
159 assert_eq!(m.failed_tasks, 0);
160 assert!((m.throughput_1s).abs() < f64::EPSILON);
161 assert!((m.throughput_10s).abs() < f64::EPSILON);
162 assert!((m.throughput_60s).abs() < f64::EPSILON);
163 assert!((m.success_rate_1m).abs() < f64::EPSILON);
164 assert!((m.avg_latency_ms).abs() < f64::EPSILON);
165 assert_eq!(m.inflight, 0);
166 assert!(m.circuit_breakers.is_empty());
167 }
168
169 #[test]
170 fn pool_metrics_is_clone() {
171 let m = PoolMetrics {
172 total_proxies: 42,
173 circuit_breakers: HashMap::from([("host.com".into(), true)]),
174 ..Default::default()
175 };
176 let m2 = m.clone();
177 assert_eq!(m2.total_proxies, 42);
178 assert_eq!(m2.circuit_breakers.get("host.com"), Some(&true));
179 }
180
181 #[test]
184 fn proxy_host_stats_default() {
185 let s = ProxyHostStats::default();
186 assert_eq!(s.success, 0);
187 assert_eq!(s.fail, 0);
188 assert!((s.success_rate).abs() < f64::EPSILON);
189 assert!((s.avg_latency_ms).abs() < f64::EPSILON);
190 assert_eq!(s.consecutive_fails, 0);
191 }
192
193 #[test]
194 fn proxy_host_stats_serde_round_trip() {
195 let stats = ProxyHostStats {
196 success: 10,
197 fail: 2,
198 success_rate: 0.833,
199 avg_latency_ms: 120.5,
200 consecutive_fails: 0,
201 };
202 let json = serde_json::to_string(&stats).unwrap();
203 let deser: ProxyHostStats = serde_json::from_str(&json).unwrap();
204 assert_eq!(deser.success, 10);
205 assert_eq!(deser.fail, 2);
206 assert!((deser.success_rate - 0.833).abs() < 1e-6);
207 assert!((deser.avg_latency_ms - 120.5).abs() < 1e-6);
208 assert_eq!(deser.consecutive_fails, 0);
209 }
210
211 #[test]
214 fn new_tracker_is_empty() {
215 let t = ThroughputTracker::new();
216 assert_eq!(t.len(), 0);
217 assert!((t.throughput(Duration::from_secs(1))).abs() < f64::EPSILON);
218 }
219
220 #[test]
221 fn record_increases_count() {
222 let t = ThroughputTracker::new();
223 t.record();
224 assert_eq!(t.len(), 1);
225 t.record();
226 assert_eq!(t.len(), 2);
227 }
228
229 #[test]
230 fn throughput_with_zero_window_is_zero() {
231 let t = ThroughputTracker::new();
232 t.record();
233 assert!((t.throughput(Duration::ZERO)).abs() < f64::EPSILON);
234 }
235
236 #[test]
237 fn throughput_over_1s_window() {
238 let t = ThroughputTracker::new();
239 let now = Instant::now();
240
241 for i in 0..5 {
243 t.record_at(now - Duration::from_millis(100 * i));
244 }
245
246 let tp = t.throughput_at(now, Duration::from_secs(1));
247 assert!((tp - 5.0).abs() < 0.01);
249 }
250
251 #[test]
252 fn throughput_over_10s_window() {
253 let t = ThroughputTracker::new();
254 let now = Instant::now();
255
256 for i in 0..20 {
258 t.record_at(now - Duration::from_millis(500 * i));
259 }
260
261 let tp = t.throughput_at(now, Duration::from_secs(10));
262 assert!((tp - 2.0).abs() < 0.01);
264 }
265
266 #[test]
267 fn old_entries_are_pruned() {
268 let t = ThroughputTracker::with_max_window(Duration::from_secs(5));
269 let now = Instant::now();
270
271 t.record_at(now - Duration::from_secs(10));
273 t.record_at(now - Duration::from_secs(1));
275
276 let tp = t.throughput_at(now, Duration::from_secs(5));
278 assert!((tp - 0.2).abs() < 0.01); assert_eq!(t.len(), 1);
281 }
282
283 #[test]
284 fn window_larger_than_max_window_still_works() {
285 let t = ThroughputTracker::with_max_window(Duration::from_secs(5));
286 let now = Instant::now();
287
288 t.record_at(now - Duration::from_secs(3));
289 t.record_at(now - Duration::from_secs(1));
290
291 let tp = t.throughput_at(now, Duration::from_secs(60));
293 assert!((tp - 2.0 / 60.0).abs() < 0.01);
295 }
296
297 #[test]
298 fn concurrent_access_does_not_panic() {
299 use std::sync::Arc;
300 use std::thread;
301
302 let tracker = Arc::new(ThroughputTracker::new());
303 let mut handles = Vec::new();
304
305 for _ in 0..8 {
306 let t = Arc::clone(&tracker);
307 handles.push(thread::spawn(move || {
308 for _ in 0..100 {
309 t.record();
310 let _ = t.throughput(Duration::from_secs(1));
311 }
312 }));
313 }
314
315 for h in handles {
316 h.join().unwrap();
317 }
318
319 assert_eq!(tracker.len(), 800);
321 }
322
323 #[test]
324 fn throughput_only_counts_events_within_requested_window() {
325 let t = ThroughputTracker::new();
326 let now = Instant::now();
327
328 for _ in 0..3 {
330 t.record_at(now - Duration::from_millis(500));
331 }
332 for _ in 0..2 {
334 t.record_at(now - Duration::from_secs(5));
335 }
336
337 let tp_1s = t.throughput_at(now, Duration::from_secs(1));
339 assert!((tp_1s - 3.0).abs() < 0.01);
340
341 let tp_10s = t.throughput_at(now, Duration::from_secs(10));
343 assert!((tp_10s - 0.5).abs() < 0.01);
344 }
345
346 #[test]
347 fn custom_max_window() {
348 let t = ThroughputTracker::with_max_window(Duration::from_secs(2));
349 assert_eq!(t.max_window, Duration::from_secs(2));
350 }
351
352 #[test]
353 fn default_max_window_is_60s() {
354 let t = ThroughputTracker::new();
355 assert_eq!(t.max_window, Duration::from_secs(60));
356 }
357}