Skip to main content

scatter_proxy/
metrics.rs

1use std::collections::HashMap;
2use std::collections::VecDeque;
3use std::sync::Mutex;
4use std::time::{Duration, Instant};
5
6use serde::{Deserialize, Serialize};
7
8/// Snapshot of the overall proxy-pool and task-pool state.
9///
10/// Returned by `ScatterProxy::metrics()` so callers can build dashboards,
11/// alerting, or adaptive back-pressure on top of the scheduler.
12#[derive(Debug, Clone, Default)]
13pub struct PoolMetrics {
14    // ── Proxy pool ───────────────────────────────────────────────────
15    pub total_proxies: usize,
16    pub healthy_proxies: usize,
17    pub cooldown_proxies: usize,
18    pub dead_proxies: usize,
19
20    // ── Task pool ────────────────────────────────────────────────────
21    pub pending_tasks: usize,
22    pub completed_tasks: u64,
23    pub failed_tasks: u64,
24
25    // ── Throughput (sliding window) ──────────────────────────────────
26    pub throughput_1s: f64,
27    pub throughput_10s: f64,
28    pub throughput_60s: f64,
29
30    // ── Quality ──────────────────────────────────────────────────────
31    pub success_rate_1m: f64,
32    pub avg_latency_ms: f64,
33
34    // ── Resources ────────────────────────────────────────────────────
35    pub inflight: usize,
36    /// Map of host → whether the circuit breaker is currently open.
37    pub circuit_breakers: HashMap<String, bool>,
38}
39
40/// Per-(proxy, host) statistics snapshot (user-facing / serialisable).
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct ProxyHostStats {
43    pub success: u32,
44    pub fail: u32,
45    pub success_rate: f64,
46    pub avg_latency_ms: f64,
47    pub consecutive_fails: u32,
48}
49
50impl Default for ProxyHostStats {
51    fn default() -> Self {
52        Self {
53            success: 0,
54            fail: 0,
55            success_rate: 0.0,
56            avg_latency_ms: 0.0,
57            consecutive_fails: 0,
58        }
59    }
60}
61
62// ─── Internal throughput tracker ─────────────────────────────────────────────
63
64/// Sliding-window throughput tracker.
65///
66/// Stores the [`Instant`] of every recorded event and computes throughput
67/// (events / second) over arbitrary windows up to `max_window`.
68///
69/// Thread-safe: the inner [`VecDeque`] is wrapped in a [`Mutex`].
70pub(crate) struct ThroughputTracker {
71    timestamps: Mutex<VecDeque<Instant>>,
72    max_window: Duration,
73}
74
75impl ThroughputTracker {
76    /// Create a new tracker that retains timestamps for 60 seconds.
77    pub fn new() -> Self {
78        Self::with_max_window(Duration::from_secs(60))
79    }
80
81    /// Create a tracker with a custom retention window.
82    pub fn with_max_window(max_window: Duration) -> Self {
83        Self {
84            timestamps: Mutex::new(VecDeque::new()),
85            max_window,
86        }
87    }
88
89    /// Record a completed event at the current instant.
90    pub fn record(&self) {
91        self.record_at(Instant::now());
92    }
93
94    /// Record a completed event at a specific instant (useful for testing).
95    pub(crate) fn record_at(&self, now: Instant) {
96        let mut ts = self.timestamps.lock().unwrap();
97        ts.push_back(now);
98        Self::prune(&mut ts, now, self.max_window);
99    }
100
101    /// Compute throughput (events per second) over the given `window`.
102    ///
103    /// If `window` is zero the result is `0.0`.
104    pub fn throughput(&self, window: Duration) -> f64 {
105        self.throughput_at(Instant::now(), window)
106    }
107
108    /// Compute throughput relative to a specific instant (useful for testing).
109    pub(crate) fn throughput_at(&self, now: Instant, window: Duration) -> f64 {
110        if window.is_zero() {
111            return 0.0;
112        }
113        let mut ts = self.timestamps.lock().unwrap();
114        Self::prune(&mut ts, now, self.max_window);
115
116        let cutoff = now.checked_sub(window).unwrap_or(now);
117        let count = ts.iter().filter(|&&t| t >= cutoff).count();
118        count as f64 / window.as_secs_f64()
119    }
120
121    /// Remove timestamps older than `max_window` before `now`.
122    fn prune(ts: &mut VecDeque<Instant>, now: Instant, max_window: Duration) {
123        let cutoff = now.checked_sub(max_window).unwrap_or(now);
124        while let Some(&front) = ts.front() {
125            if front < cutoff {
126                ts.pop_front();
127            } else {
128                break;
129            }
130        }
131    }
132
133    /// Return the number of timestamps currently stored (after pruning).
134    #[cfg(test)]
135    fn len(&self) -> usize {
136        let ts = self.timestamps.lock().unwrap();
137        ts.len()
138    }
139}
140
141// ─── Tests ───────────────────────────────────────────────────────────────────
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146    use std::time::Duration;
147
148    // ── PoolMetrics ──────────────────────────────────────────────────
149
150    #[test]
151    fn pool_metrics_default_is_zeroed() {
152        let m = PoolMetrics::default();
153        assert_eq!(m.total_proxies, 0);
154        assert_eq!(m.healthy_proxies, 0);
155        assert_eq!(m.cooldown_proxies, 0);
156        assert_eq!(m.dead_proxies, 0);
157        assert_eq!(m.pending_tasks, 0);
158        assert_eq!(m.completed_tasks, 0);
159        assert_eq!(m.failed_tasks, 0);
160        assert!((m.throughput_1s).abs() < f64::EPSILON);
161        assert!((m.throughput_10s).abs() < f64::EPSILON);
162        assert!((m.throughput_60s).abs() < f64::EPSILON);
163        assert!((m.success_rate_1m).abs() < f64::EPSILON);
164        assert!((m.avg_latency_ms).abs() < f64::EPSILON);
165        assert_eq!(m.inflight, 0);
166        assert!(m.circuit_breakers.is_empty());
167    }
168
169    #[test]
170    fn pool_metrics_is_clone() {
171        let m = PoolMetrics {
172            total_proxies: 42,
173            circuit_breakers: HashMap::from([("host.com".into(), true)]),
174            ..Default::default()
175        };
176        let m2 = m.clone();
177        assert_eq!(m2.total_proxies, 42);
178        assert_eq!(m2.circuit_breakers.get("host.com"), Some(&true));
179    }
180
181    // ── ProxyHostStats ───────────────────────────────────────────────
182
183    #[test]
184    fn proxy_host_stats_default() {
185        let s = ProxyHostStats::default();
186        assert_eq!(s.success, 0);
187        assert_eq!(s.fail, 0);
188        assert!((s.success_rate).abs() < f64::EPSILON);
189        assert!((s.avg_latency_ms).abs() < f64::EPSILON);
190        assert_eq!(s.consecutive_fails, 0);
191    }
192
193    #[test]
194    fn proxy_host_stats_serde_round_trip() {
195        let stats = ProxyHostStats {
196            success: 10,
197            fail: 2,
198            success_rate: 0.833,
199            avg_latency_ms: 120.5,
200            consecutive_fails: 0,
201        };
202        let json = serde_json::to_string(&stats).unwrap();
203        let deser: ProxyHostStats = serde_json::from_str(&json).unwrap();
204        assert_eq!(deser.success, 10);
205        assert_eq!(deser.fail, 2);
206        assert!((deser.success_rate - 0.833).abs() < 1e-6);
207        assert!((deser.avg_latency_ms - 120.5).abs() < 1e-6);
208        assert_eq!(deser.consecutive_fails, 0);
209    }
210
211    // ── ThroughputTracker ────────────────────────────────────────────
212
213    #[test]
214    fn new_tracker_is_empty() {
215        let t = ThroughputTracker::new();
216        assert_eq!(t.len(), 0);
217        assert!((t.throughput(Duration::from_secs(1))).abs() < f64::EPSILON);
218    }
219
220    #[test]
221    fn record_increases_count() {
222        let t = ThroughputTracker::new();
223        t.record();
224        assert_eq!(t.len(), 1);
225        t.record();
226        assert_eq!(t.len(), 2);
227    }
228
229    #[test]
230    fn throughput_with_zero_window_is_zero() {
231        let t = ThroughputTracker::new();
232        t.record();
233        assert!((t.throughput(Duration::ZERO)).abs() < f64::EPSILON);
234    }
235
236    #[test]
237    fn throughput_over_1s_window() {
238        let t = ThroughputTracker::new();
239        let now = Instant::now();
240
241        // Record 5 events all within the last second.
242        for i in 0..5 {
243            t.record_at(now - Duration::from_millis(100 * i));
244        }
245
246        let tp = t.throughput_at(now, Duration::from_secs(1));
247        // 5 events in 1 second = 5.0/s
248        assert!((tp - 5.0).abs() < 0.01);
249    }
250
251    #[test]
252    fn throughput_over_10s_window() {
253        let t = ThroughputTracker::new();
254        let now = Instant::now();
255
256        // 20 events spread over the last 10 seconds.
257        for i in 0..20 {
258            t.record_at(now - Duration::from_millis(500 * i));
259        }
260
261        let tp = t.throughput_at(now, Duration::from_secs(10));
262        // 20 events in 10 seconds = 2.0/s
263        assert!((tp - 2.0).abs() < 0.01);
264    }
265
266    #[test]
267    fn old_entries_are_pruned() {
268        let t = ThroughputTracker::with_max_window(Duration::from_secs(5));
269        let now = Instant::now();
270
271        // Record an event 10s ago — beyond the 5s max_window.
272        t.record_at(now - Duration::from_secs(10));
273        // Record an event 1s ago — within the window.
274        t.record_at(now - Duration::from_secs(1));
275
276        // Trigger prune by computing throughput.
277        let tp = t.throughput_at(now, Duration::from_secs(5));
278        // Only the recent event should survive.
279        assert!((tp - 0.2).abs() < 0.01); // 1 event / 5s = 0.2
280        assert_eq!(t.len(), 1);
281    }
282
283    #[test]
284    fn window_larger_than_max_window_still_works() {
285        let t = ThroughputTracker::with_max_window(Duration::from_secs(5));
286        let now = Instant::now();
287
288        t.record_at(now - Duration::from_secs(3));
289        t.record_at(now - Duration::from_secs(1));
290
291        // Ask for 60s window, but tracker only keeps 5s.
292        let tp = t.throughput_at(now, Duration::from_secs(60));
293        // 2 events / 60s ≈ 0.033
294        assert!((tp - 2.0 / 60.0).abs() < 0.01);
295    }
296
297    #[test]
298    fn concurrent_access_does_not_panic() {
299        use std::sync::Arc;
300        use std::thread;
301
302        let tracker = Arc::new(ThroughputTracker::new());
303        let mut handles = Vec::new();
304
305        for _ in 0..8 {
306            let t = Arc::clone(&tracker);
307            handles.push(thread::spawn(move || {
308                for _ in 0..100 {
309                    t.record();
310                    let _ = t.throughput(Duration::from_secs(1));
311                }
312            }));
313        }
314
315        for h in handles {
316            h.join().unwrap();
317        }
318
319        // All 800 events should be recorded (within the 60s window, they all are).
320        assert_eq!(tracker.len(), 800);
321    }
322
323    #[test]
324    fn throughput_only_counts_events_within_requested_window() {
325        let t = ThroughputTracker::new();
326        let now = Instant::now();
327
328        // 3 events 500ms ago.
329        for _ in 0..3 {
330            t.record_at(now - Duration::from_millis(500));
331        }
332        // 2 events 5s ago.
333        for _ in 0..2 {
334            t.record_at(now - Duration::from_secs(5));
335        }
336
337        // 1s window should only see the 3 recent events.
338        let tp_1s = t.throughput_at(now, Duration::from_secs(1));
339        assert!((tp_1s - 3.0).abs() < 0.01);
340
341        // 10s window should see all 5.
342        let tp_10s = t.throughput_at(now, Duration::from_secs(10));
343        assert!((tp_10s - 0.5).abs() < 0.01);
344    }
345
346    #[test]
347    fn custom_max_window() {
348        let t = ThroughputTracker::with_max_window(Duration::from_secs(2));
349        assert_eq!(t.max_window, Duration::from_secs(2));
350    }
351
352    #[test]
353    fn default_max_window_is_60s() {
354        let t = ThroughputTracker::new();
355        assert_eq!(t.max_window, Duration::from_secs(60));
356    }
357}