Skip to main content

scatter_proxy/
metrics.rs

1use std::collections::VecDeque;
2use std::sync::Mutex;
3use std::time::{Duration, Instant};
4
5use serde::{Deserialize, Serialize};
6
7/// Snapshot of the overall proxy-pool and task-pool state.
8///
9/// Returned by `ScatterProxy::metrics()` so callers can build dashboards,
10/// alerting, or adaptive back-pressure on top of the scheduler.
11#[derive(Debug, Clone, Default)]
12pub struct PoolMetrics {
13    // ── Proxy pool ───────────────────────────────────────────────────
14    pub total_proxies: usize,
15    pub healthy_proxies: usize,
16    pub cooldown_proxies: usize,
17    pub dead_proxies: usize,
18
19    // ── Task pool ────────────────────────────────────────────────────
20    pub pending_tasks: usize,
21    pub delayed_tasks: usize,
22    pub completed_tasks: u64,
23    pub failed_tasks: u64,
24
25    // ── Throughput (sliding window) ──────────────────────────────────
26    pub throughput_1s: f64,
27    pub throughput_10s: f64,
28    pub throughput_60s: f64,
29
30    // ── Quality ──────────────────────────────────────────────────────
31    pub success_rate_1m: f64,
32    pub avg_latency_ms: f64,
33
34    // ── Resources ────────────────────────────────────────────────────
35    pub inflight: usize,
36    pub requeued_tasks: u64,
37    pub zero_available_events: u64,
38    pub skipped_no_permit: u64,
39    pub skipped_rate_limit: u64,
40    pub skipped_cooldown: u64,
41    pub dispatch_count: u64,
42}
43
44/// Per-(proxy, host) statistics snapshot (user-facing / serialisable).
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct ProxyHostStats {
47    pub success: u32,
48    pub fail: u32,
49    pub success_rate: f64,
50    pub avg_latency_ms: f64,
51    pub consecutive_fails: u32,
52}
53
54impl Default for ProxyHostStats {
55    fn default() -> Self {
56        Self {
57            success: 0,
58            fail: 0,
59            success_rate: 0.0,
60            avg_latency_ms: 0.0,
61            consecutive_fails: 0,
62        }
63    }
64}
65
66// ─── Internal throughput tracker ─────────────────────────────────────────────
67
68/// Sliding-window throughput tracker.
69///
70/// Stores the [`Instant`] of every recorded event and computes throughput
71/// (events / second) over arbitrary windows up to `max_window`.
72///
73/// Thread-safe: the inner [`VecDeque`] is wrapped in a [`Mutex`].
74pub(crate) struct ThroughputTracker {
75    timestamps: Mutex<VecDeque<Instant>>,
76    max_window: Duration,
77}
78
79impl ThroughputTracker {
80    /// Create a new tracker that retains timestamps for 60 seconds.
81    pub fn new() -> Self {
82        Self::with_max_window(Duration::from_secs(60))
83    }
84
85    /// Create a tracker with a custom retention window.
86    pub fn with_max_window(max_window: Duration) -> Self {
87        Self {
88            timestamps: Mutex::new(VecDeque::new()),
89            max_window,
90        }
91    }
92
93    /// Record a completed event at the current instant.
94    pub fn record(&self) {
95        self.record_at(Instant::now());
96    }
97
98    /// Record a completed event at a specific instant (useful for testing).
99    pub(crate) fn record_at(&self, now: Instant) {
100        let mut ts = self.timestamps.lock().unwrap();
101        ts.push_back(now);
102        Self::prune(&mut ts, now, self.max_window);
103    }
104
105    /// Compute throughput (events per second) over the given `window`.
106    ///
107    /// If `window` is zero the result is `0.0`.
108    pub fn throughput(&self, window: Duration) -> f64 {
109        self.throughput_at(Instant::now(), window)
110    }
111
112    /// Compute throughput relative to a specific instant (useful for testing).
113    pub(crate) fn throughput_at(&self, now: Instant, window: Duration) -> f64 {
114        if window.is_zero() {
115            return 0.0;
116        }
117        let mut ts = self.timestamps.lock().unwrap();
118        Self::prune(&mut ts, now, self.max_window);
119
120        let cutoff = now.checked_sub(window).unwrap_or(now);
121        let count = ts.iter().filter(|&&t| t >= cutoff).count();
122        count as f64 / window.as_secs_f64()
123    }
124
125    /// Remove timestamps older than `max_window` before `now`.
126    fn prune(ts: &mut VecDeque<Instant>, now: Instant, max_window: Duration) {
127        let cutoff = now.checked_sub(max_window).unwrap_or(now);
128        while let Some(&front) = ts.front() {
129            if front < cutoff {
130                ts.pop_front();
131            } else {
132                break;
133            }
134        }
135    }
136
137    /// Return the number of timestamps currently stored (after pruning).
138    #[cfg(test)]
139    fn len(&self) -> usize {
140        let ts = self.timestamps.lock().unwrap();
141        ts.len()
142    }
143}
144
145// ─── Tests ───────────────────────────────────────────────────────────────────
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150    use std::time::Duration;
151
152    // ── PoolMetrics ──────────────────────────────────────────────────
153
154    #[test]
155    fn pool_metrics_default_is_zeroed() {
156        let m = PoolMetrics::default();
157        assert_eq!(m.total_proxies, 0);
158        assert_eq!(m.healthy_proxies, 0);
159        assert_eq!(m.cooldown_proxies, 0);
160        assert_eq!(m.dead_proxies, 0);
161        assert_eq!(m.pending_tasks, 0);
162        assert_eq!(m.delayed_tasks, 0);
163        assert_eq!(m.completed_tasks, 0);
164        assert_eq!(m.failed_tasks, 0);
165        assert!((m.throughput_1s).abs() < f64::EPSILON);
166        assert!((m.throughput_10s).abs() < f64::EPSILON);
167        assert!((m.throughput_60s).abs() < f64::EPSILON);
168        assert!((m.success_rate_1m).abs() < f64::EPSILON);
169        assert!((m.avg_latency_ms).abs() < f64::EPSILON);
170        assert_eq!(m.inflight, 0);
171        assert_eq!(m.requeued_tasks, 0);
172        assert_eq!(m.zero_available_events, 0);
173        assert_eq!(m.skipped_no_permit, 0);
174        assert_eq!(m.skipped_rate_limit, 0);
175        assert_eq!(m.skipped_cooldown, 0);
176        assert_eq!(m.dispatch_count, 0);
177    }
178
179    #[test]
180    fn pool_metrics_is_clone() {
181        let m = PoolMetrics {
182            total_proxies: 42,
183            ..Default::default()
184        };
185        let m2 = m.clone();
186        assert_eq!(m2.total_proxies, 42);
187    }
188
189    // ── ProxyHostStats ───────────────────────────────────────────────
190
191    #[test]
192    fn proxy_host_stats_default() {
193        let s = ProxyHostStats::default();
194        assert_eq!(s.success, 0);
195        assert_eq!(s.fail, 0);
196        assert!((s.success_rate).abs() < f64::EPSILON);
197        assert!((s.avg_latency_ms).abs() < f64::EPSILON);
198        assert_eq!(s.consecutive_fails, 0);
199    }
200
201    #[test]
202    fn proxy_host_stats_serde_round_trip() {
203        let stats = ProxyHostStats {
204            success: 10,
205            fail: 2,
206            success_rate: 0.833,
207            avg_latency_ms: 120.5,
208            consecutive_fails: 0,
209        };
210        let json = serde_json::to_string(&stats).unwrap();
211        let deser: ProxyHostStats = serde_json::from_str(&json).unwrap();
212        assert_eq!(deser.success, 10);
213        assert_eq!(deser.fail, 2);
214        assert!((deser.success_rate - 0.833).abs() < 1e-6);
215        assert!((deser.avg_latency_ms - 120.5).abs() < 1e-6);
216        assert_eq!(deser.consecutive_fails, 0);
217    }
218
219    // ── ThroughputTracker ────────────────────────────────────────────
220
221    #[test]
222    fn new_tracker_is_empty() {
223        let t = ThroughputTracker::new();
224        assert_eq!(t.len(), 0);
225        assert!((t.throughput(Duration::from_secs(1))).abs() < f64::EPSILON);
226    }
227
228    #[test]
229    fn record_increases_count() {
230        let t = ThroughputTracker::new();
231        t.record();
232        assert_eq!(t.len(), 1);
233        t.record();
234        assert_eq!(t.len(), 2);
235    }
236
237    #[test]
238    fn throughput_with_zero_window_is_zero() {
239        let t = ThroughputTracker::new();
240        t.record();
241        assert!((t.throughput(Duration::ZERO)).abs() < f64::EPSILON);
242    }
243
244    #[test]
245    fn throughput_over_1s_window() {
246        let t = ThroughputTracker::new();
247        let now = Instant::now();
248
249        // Record 5 events all within the last second.
250        for i in 0..5 {
251            t.record_at(now - Duration::from_millis(100 * i));
252        }
253
254        let tp = t.throughput_at(now, Duration::from_secs(1));
255        // 5 events in 1 second = 5.0/s
256        assert!((tp - 5.0).abs() < 0.01);
257    }
258
259    #[test]
260    fn throughput_over_10s_window() {
261        let t = ThroughputTracker::new();
262        let now = Instant::now();
263
264        // 20 events spread over the last 10 seconds.
265        for i in 0..20 {
266            t.record_at(now - Duration::from_millis(500 * i));
267        }
268
269        let tp = t.throughput_at(now, Duration::from_secs(10));
270        // 20 events in 10 seconds = 2.0/s
271        assert!((tp - 2.0).abs() < 0.01);
272    }
273
274    #[test]
275    fn old_entries_are_pruned() {
276        let t = ThroughputTracker::with_max_window(Duration::from_secs(5));
277        let now = Instant::now();
278
279        // Record an event 10s ago — beyond the 5s max_window.
280        t.record_at(now - Duration::from_secs(10));
281        // Record an event 1s ago — within the window.
282        t.record_at(now - Duration::from_secs(1));
283
284        // Trigger prune by computing throughput.
285        let tp = t.throughput_at(now, Duration::from_secs(5));
286        // Only the recent event should survive.
287        assert!((tp - 0.2).abs() < 0.01); // 1 event / 5s = 0.2
288        assert_eq!(t.len(), 1);
289    }
290
291    #[test]
292    fn window_larger_than_max_window_still_works() {
293        let t = ThroughputTracker::with_max_window(Duration::from_secs(5));
294        let now = Instant::now();
295
296        t.record_at(now - Duration::from_secs(3));
297        t.record_at(now - Duration::from_secs(1));
298
299        // Ask for 60s window, but tracker only keeps 5s.
300        let tp = t.throughput_at(now, Duration::from_secs(60));
301        // 2 events / 60s ≈ 0.033
302        assert!((tp - 2.0 / 60.0).abs() < 0.01);
303    }
304
305    #[test]
306    fn concurrent_access_does_not_panic() {
307        use std::sync::Arc;
308        use std::thread;
309
310        let tracker = Arc::new(ThroughputTracker::new());
311        let mut handles = Vec::new();
312
313        for _ in 0..8 {
314            let t = Arc::clone(&tracker);
315            handles.push(thread::spawn(move || {
316                for _ in 0..100 {
317                    t.record();
318                    let _ = t.throughput(Duration::from_secs(1));
319                }
320            }));
321        }
322
323        for h in handles {
324            h.join().unwrap();
325        }
326
327        // All 800 events should be recorded (within the 60s window, they all are).
328        assert_eq!(tracker.len(), 800);
329    }
330
331    #[test]
332    fn throughput_only_counts_events_within_requested_window() {
333        let t = ThroughputTracker::new();
334        let now = Instant::now();
335
336        // 3 events 500ms ago.
337        for _ in 0..3 {
338            t.record_at(now - Duration::from_millis(500));
339        }
340        // 2 events 5s ago.
341        for _ in 0..2 {
342            t.record_at(now - Duration::from_secs(5));
343        }
344
345        // 1s window should only see the 3 recent events.
346        let tp_1s = t.throughput_at(now, Duration::from_secs(1));
347        assert!((tp_1s - 3.0).abs() < 0.01);
348
349        // 10s window should see all 5.
350        let tp_10s = t.throughput_at(now, Duration::from_secs(10));
351        assert!((tp_10s - 0.5).abs() < 0.01);
352    }
353
354    #[test]
355    fn custom_max_window() {
356        let t = ThroughputTracker::with_max_window(Duration::from_secs(2));
357        assert_eq!(t.max_window, Duration::from_secs(2));
358    }
359
360    #[test]
361    fn default_max_window_is_60s() {
362        let t = ThroughputTracker::new();
363        assert_eq!(t.max_window, Duration::from_secs(60));
364    }
365}