cloudscraper_rs/modules/metrics/
mod.rs

1//! Metrics collection utilities.
2//!
3//! Provides aggregated global and per-domain statistics with latency
4//! percentiles for observability.
5
6use chrono::{DateTime, Utc};
7use std::collections::{HashMap, VecDeque};
8use std::sync::{Arc, Mutex};
9use std::time::Duration;
10
11/// Aggregated metrics across all domains.
12#[derive(Debug, Clone)]
13pub struct GlobalStats {
14    pub started_at: DateTime<Utc>,
15    pub total_requests: u64,
16    pub successes: u64,
17    pub failures: u64,
18    pub average_latency: Option<Duration>,
19    pub p95_latency: Option<Duration>,
20}
21
22impl Default for GlobalStats {
23    fn default() -> Self {
24        Self {
25            started_at: Utc::now(),
26            total_requests: 0,
27            successes: 0,
28            failures: 0,
29            average_latency: None,
30            p95_latency: None,
31        }
32    }
33}
34
35/// Domain-scoped metrics snapshot.
36#[derive(Debug, Clone)]
37pub struct DomainStats {
38    pub domain: String,
39    pub total_requests: u64,
40    pub successes: u64,
41    pub failures: u64,
42    pub average_latency: Option<Duration>,
43    pub p95_latency: Option<Duration>,
44    pub consecutive_failures: u32,
45    pub last_status: Option<u16>,
46}
47
48impl DomainStats {
49    fn from_accumulator(domain: &str, acc: &DomainAccumulator) -> Self {
50        let (avg, p95) = acc.latency_stats();
51        Self {
52            domain: domain.to_string(),
53            total_requests: acc.total_requests,
54            successes: acc.successes,
55            failures: acc.failures,
56            average_latency: avg,
57            p95_latency: p95,
58            consecutive_failures: acc.consecutive_failures,
59            last_status: acc.last_status,
60        }
61    }
62}
63
64#[derive(Debug, Clone)]
65pub struct MetricsSnapshot {
66    pub global: GlobalStats,
67    pub domains: Vec<DomainStats>,
68}
69
70#[derive(Debug)]
71struct DomainAccumulator {
72    total_requests: u64,
73    successes: u64,
74    failures: u64,
75    latencies: VecDeque<Duration>,
76    max_window: usize,
77    consecutive_failures: u32,
78    last_status: Option<u16>,
79}
80
81impl DomainAccumulator {
82    fn new(max_window: usize) -> Self {
83        Self {
84            total_requests: 0,
85            successes: 0,
86            failures: 0,
87            latencies: VecDeque::with_capacity(max_window),
88            max_window,
89            consecutive_failures: 0,
90            last_status: None,
91        }
92    }
93
94    fn record(&mut self, status: u16, latency: Duration) {
95        self.total_requests += 1;
96        self.last_status = Some(status);
97
98        if status < 500 {
99            self.successes += 1;
100            self.consecutive_failures = 0;
101        } else {
102            self.failures += 1;
103            self.consecutive_failures = self.consecutive_failures.saturating_add(1);
104        }
105
106        if self.latencies.len() == self.max_window {
107            self.latencies.pop_front();
108        }
109        self.latencies.push_back(latency);
110    }
111
112    fn latency_stats(&self) -> (Option<Duration>, Option<Duration>) {
113        if self.latencies.is_empty() {
114            return (None, None);
115        }
116        let mut samples: Vec<_> = self.latencies.iter().cloned().collect();
117        samples.sort_unstable();
118        let avg = samples.iter().map(|d| d.as_secs_f64()).sum::<f64>() / samples.len() as f64;
119        let p95_index = ((samples.len() as f64 * 0.95).ceil() as usize).saturating_sub(1);
120        let p95 = samples[p95_index];
121        (Some(Duration::from_secs_f64(avg)), Some(p95))
122    }
123}
124
125#[derive(Debug)]
126struct MetricsState {
127    global: GlobalStats,
128    max_window: usize,
129    domains: HashMap<String, DomainAccumulator>,
130}
131
132impl MetricsState {
133    fn new(max_window: usize) -> Self {
134        Self {
135            global: GlobalStats::default(),
136            max_window,
137            domains: HashMap::new(),
138        }
139    }
140
141    fn accumulator_mut(&mut self, domain: &str) -> &mut DomainAccumulator {
142        self.domains
143            .entry(domain.to_string())
144            .or_insert_with(|| DomainAccumulator::new(self.max_window))
145    }
146}
147
148/// Thread-safe metrics collector used by the orchestration layer.
149#[derive(Clone, Debug)]
150pub struct MetricsCollector {
151    inner: Arc<Mutex<MetricsState>>,
152}
153
154impl MetricsCollector {
155    pub fn new() -> Self {
156        Self {
157            inner: Arc::new(Mutex::new(MetricsState::new(128))),
158        }
159    }
160
161    pub fn with_window(window: usize) -> Self {
162        Self {
163            inner: Arc::new(Mutex::new(MetricsState::new(window.max(16)))),
164        }
165    }
166
167    pub fn record_response(&self, domain: &str, status: u16, latency: Duration) {
168        let mut guard = self.inner.lock().expect("metrics lock poisoned");
169        guard.global.total_requests += 1;
170        if status < 500 {
171            guard.global.successes += 1;
172        } else {
173            guard.global.failures += 1;
174        }
175
176        if let Some(avg) = guard.global.average_latency {
177            let blended = (avg.as_secs_f64() * 0.9) + (latency.as_secs_f64() * 0.1);
178            guard.global.average_latency = Some(Duration::from_secs_f64(blended));
179        } else {
180            guard.global.average_latency = Some(latency);
181        }
182
183        let acc = guard.accumulator_mut(domain);
184        acc.record(status, latency);
185
186        // Update global p95 from all samples (approximation using domain 95th blending).
187        let mut percentile_samples: Vec<_> = guard
188            .domains
189            .values()
190            .flat_map(|domain| domain.latencies.iter())
191            .cloned()
192            .collect();
193        percentile_samples.sort_unstable();
194        if !percentile_samples.is_empty() {
195            let idx = ((percentile_samples.len() as f64 * 0.95).ceil() as usize).saturating_sub(1);
196            guard.global.p95_latency = Some(percentile_samples[idx]);
197        }
198    }
199
200    pub fn record_error(&self, domain: &str) {
201        let mut guard = self.inner.lock().expect("metrics lock poisoned");
202        guard.global.total_requests += 1;
203        guard.global.failures += 1;
204        let acc = guard.accumulator_mut(domain);
205        acc.total_requests += 1;
206        acc.failures += 1;
207        acc.consecutive_failures = acc.consecutive_failures.saturating_add(1);
208        acc.last_status = Some(0);
209    }
210
211    pub fn snapshot(&self) -> MetricsSnapshot {
212        let guard = self.inner.lock().expect("metrics lock poisoned");
213        let domains = guard
214            .domains
215            .iter()
216            .map(|(domain, acc)| DomainStats::from_accumulator(domain, acc))
217            .collect();
218        MetricsSnapshot {
219            global: guard.global.clone(),
220            domains,
221        }
222    }
223}
224
225impl Default for MetricsCollector {
226    fn default() -> Self {
227        Self::new()
228    }
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234    use std::time::Duration;
235
236    #[test]
237    fn records_success_and_failure() {
238        let metrics = MetricsCollector::new();
239        metrics.record_response("example.com", 200, Duration::from_millis(150));
240        metrics.record_response("example.com", 503, Duration::from_millis(800));
241        metrics.record_error("example.com");
242
243        let snapshot = metrics.snapshot();
244        let domain = snapshot
245            .domains
246            .iter()
247            .find(|d| d.domain == "example.com")
248            .unwrap();
249        assert_eq!(domain.total_requests, 3);
250        assert_eq!(domain.successes, 1);
251        assert_eq!(domain.failures, 2);
252    }
253}