1use std::collections::VecDeque;
2use std::sync::Mutex;
3use std::time::{Duration, Instant};
4
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, Default)]
12pub struct PoolMetrics {
13 pub total_proxies: usize,
15 pub healthy_proxies: usize,
16 pub cooldown_proxies: usize,
17 pub dead_proxies: usize,
18
19 pub pending_tasks: usize,
21 pub delayed_tasks: usize,
22 pub completed_tasks: u64,
23 pub failed_tasks: u64,
24
25 pub throughput_1s: f64,
27 pub throughput_10s: f64,
28 pub throughput_60s: f64,
29
30 pub success_rate_1m: f64,
32 pub avg_latency_ms: f64,
33
34 pub inflight: usize,
36 pub requeued_tasks: u64,
37 pub zero_available_events: u64,
38 pub skipped_no_permit: u64,
39 pub skipped_rate_limit: u64,
40 pub skipped_cooldown: u64,
41 pub dispatch_count: u64,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct ProxyHostStats {
47 pub success: u32,
48 pub fail: u32,
49 pub success_rate: f64,
50 pub avg_latency_ms: f64,
51 pub consecutive_fails: u32,
52}
53
54impl Default for ProxyHostStats {
55 fn default() -> Self {
56 Self {
57 success: 0,
58 fail: 0,
59 success_rate: 0.0,
60 avg_latency_ms: 0.0,
61 consecutive_fails: 0,
62 }
63 }
64}
65
66pub(crate) struct ThroughputTracker {
75 timestamps: Mutex<VecDeque<Instant>>,
76 max_window: Duration,
77}
78
79impl ThroughputTracker {
80 pub fn new() -> Self {
82 Self::with_max_window(Duration::from_secs(60))
83 }
84
85 pub fn with_max_window(max_window: Duration) -> Self {
87 Self {
88 timestamps: Mutex::new(VecDeque::new()),
89 max_window,
90 }
91 }
92
93 pub fn record(&self) {
95 self.record_at(Instant::now());
96 }
97
98 pub(crate) fn record_at(&self, now: Instant) {
100 let mut ts = self.timestamps.lock().unwrap();
101 ts.push_back(now);
102 Self::prune(&mut ts, now, self.max_window);
103 }
104
105 pub fn throughput(&self, window: Duration) -> f64 {
109 self.throughput_at(Instant::now(), window)
110 }
111
112 pub(crate) fn throughput_at(&self, now: Instant, window: Duration) -> f64 {
114 if window.is_zero() {
115 return 0.0;
116 }
117 let mut ts = self.timestamps.lock().unwrap();
118 Self::prune(&mut ts, now, self.max_window);
119
120 let cutoff = now.checked_sub(window).unwrap_or(now);
121 let count = ts.iter().filter(|&&t| t >= cutoff).count();
122 count as f64 / window.as_secs_f64()
123 }
124
125 fn prune(ts: &mut VecDeque<Instant>, now: Instant, max_window: Duration) {
127 let cutoff = now.checked_sub(max_window).unwrap_or(now);
128 while let Some(&front) = ts.front() {
129 if front < cutoff {
130 ts.pop_front();
131 } else {
132 break;
133 }
134 }
135 }
136
137 #[cfg(test)]
139 fn len(&self) -> usize {
140 let ts = self.timestamps.lock().unwrap();
141 ts.len()
142 }
143}
144
145#[cfg(test)]
148mod tests {
149 use super::*;
150 use std::time::Duration;
151
152 #[test]
155 fn pool_metrics_default_is_zeroed() {
156 let m = PoolMetrics::default();
157 assert_eq!(m.total_proxies, 0);
158 assert_eq!(m.healthy_proxies, 0);
159 assert_eq!(m.cooldown_proxies, 0);
160 assert_eq!(m.dead_proxies, 0);
161 assert_eq!(m.pending_tasks, 0);
162 assert_eq!(m.delayed_tasks, 0);
163 assert_eq!(m.completed_tasks, 0);
164 assert_eq!(m.failed_tasks, 0);
165 assert!((m.throughput_1s).abs() < f64::EPSILON);
166 assert!((m.throughput_10s).abs() < f64::EPSILON);
167 assert!((m.throughput_60s).abs() < f64::EPSILON);
168 assert!((m.success_rate_1m).abs() < f64::EPSILON);
169 assert!((m.avg_latency_ms).abs() < f64::EPSILON);
170 assert_eq!(m.inflight, 0);
171 assert_eq!(m.requeued_tasks, 0);
172 assert_eq!(m.zero_available_events, 0);
173 assert_eq!(m.skipped_no_permit, 0);
174 assert_eq!(m.skipped_rate_limit, 0);
175 assert_eq!(m.skipped_cooldown, 0);
176 assert_eq!(m.dispatch_count, 0);
177 }
178
179 #[test]
180 fn pool_metrics_is_clone() {
181 let m = PoolMetrics {
182 total_proxies: 42,
183 ..Default::default()
184 };
185 let m2 = m.clone();
186 assert_eq!(m2.total_proxies, 42);
187 }
188
189 #[test]
192 fn proxy_host_stats_default() {
193 let s = ProxyHostStats::default();
194 assert_eq!(s.success, 0);
195 assert_eq!(s.fail, 0);
196 assert!((s.success_rate).abs() < f64::EPSILON);
197 assert!((s.avg_latency_ms).abs() < f64::EPSILON);
198 assert_eq!(s.consecutive_fails, 0);
199 }
200
201 #[test]
202 fn proxy_host_stats_serde_round_trip() {
203 let stats = ProxyHostStats {
204 success: 10,
205 fail: 2,
206 success_rate: 0.833,
207 avg_latency_ms: 120.5,
208 consecutive_fails: 0,
209 };
210 let json = serde_json::to_string(&stats).unwrap();
211 let deser: ProxyHostStats = serde_json::from_str(&json).unwrap();
212 assert_eq!(deser.success, 10);
213 assert_eq!(deser.fail, 2);
214 assert!((deser.success_rate - 0.833).abs() < 1e-6);
215 assert!((deser.avg_latency_ms - 120.5).abs() < 1e-6);
216 assert_eq!(deser.consecutive_fails, 0);
217 }
218
219 #[test]
222 fn new_tracker_is_empty() {
223 let t = ThroughputTracker::new();
224 assert_eq!(t.len(), 0);
225 assert!((t.throughput(Duration::from_secs(1))).abs() < f64::EPSILON);
226 }
227
228 #[test]
229 fn record_increases_count() {
230 let t = ThroughputTracker::new();
231 t.record();
232 assert_eq!(t.len(), 1);
233 t.record();
234 assert_eq!(t.len(), 2);
235 }
236
237 #[test]
238 fn throughput_with_zero_window_is_zero() {
239 let t = ThroughputTracker::new();
240 t.record();
241 assert!((t.throughput(Duration::ZERO)).abs() < f64::EPSILON);
242 }
243
244 #[test]
245 fn throughput_over_1s_window() {
246 let t = ThroughputTracker::new();
247 let now = Instant::now();
248
249 for i in 0..5 {
251 t.record_at(now - Duration::from_millis(100 * i));
252 }
253
254 let tp = t.throughput_at(now, Duration::from_secs(1));
255 assert!((tp - 5.0).abs() < 0.01);
257 }
258
259 #[test]
260 fn throughput_over_10s_window() {
261 let t = ThroughputTracker::new();
262 let now = Instant::now();
263
264 for i in 0..20 {
266 t.record_at(now - Duration::from_millis(500 * i));
267 }
268
269 let tp = t.throughput_at(now, Duration::from_secs(10));
270 assert!((tp - 2.0).abs() < 0.01);
272 }
273
274 #[test]
275 fn old_entries_are_pruned() {
276 let t = ThroughputTracker::with_max_window(Duration::from_secs(5));
277 let now = Instant::now();
278
279 t.record_at(now - Duration::from_secs(10));
281 t.record_at(now - Duration::from_secs(1));
283
284 let tp = t.throughput_at(now, Duration::from_secs(5));
286 assert!((tp - 0.2).abs() < 0.01); assert_eq!(t.len(), 1);
289 }
290
291 #[test]
292 fn window_larger_than_max_window_still_works() {
293 let t = ThroughputTracker::with_max_window(Duration::from_secs(5));
294 let now = Instant::now();
295
296 t.record_at(now - Duration::from_secs(3));
297 t.record_at(now - Duration::from_secs(1));
298
299 let tp = t.throughput_at(now, Duration::from_secs(60));
301 assert!((tp - 2.0 / 60.0).abs() < 0.01);
303 }
304
305 #[test]
306 fn concurrent_access_does_not_panic() {
307 use std::sync::Arc;
308 use std::thread;
309
310 let tracker = Arc::new(ThroughputTracker::new());
311 let mut handles = Vec::new();
312
313 for _ in 0..8 {
314 let t = Arc::clone(&tracker);
315 handles.push(thread::spawn(move || {
316 for _ in 0..100 {
317 t.record();
318 let _ = t.throughput(Duration::from_secs(1));
319 }
320 }));
321 }
322
323 for h in handles {
324 h.join().unwrap();
325 }
326
327 assert_eq!(tracker.len(), 800);
329 }
330
331 #[test]
332 fn throughput_only_counts_events_within_requested_window() {
333 let t = ThroughputTracker::new();
334 let now = Instant::now();
335
336 for _ in 0..3 {
338 t.record_at(now - Duration::from_millis(500));
339 }
340 for _ in 0..2 {
342 t.record_at(now - Duration::from_secs(5));
343 }
344
345 let tp_1s = t.throughput_at(now, Duration::from_secs(1));
347 assert!((tp_1s - 3.0).abs() < 0.01);
348
349 let tp_10s = t.throughput_at(now, Duration::from_secs(10));
351 assert!((tp_10s - 0.5).abs() < 0.01);
352 }
353
354 #[test]
355 fn custom_max_window() {
356 let t = ThroughputTracker::with_max_window(Duration::from_secs(2));
357 assert_eq!(t.max_window, Duration::from_secs(2));
358 }
359
360 #[test]
361 fn default_max_window_is_60s() {
362 let t = ThroughputTracker::new();
363 assert_eq!(t.max_window, Duration::from_secs(60));
364 }
365}