halldyll_core/observe/
metrics.rs1use std::collections::HashMap;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::RwLock;
6use std::time::{Duration, Instant};
7
8#[derive(Debug, Default)]
10pub struct Metrics {
11 pub requests_total: AtomicU64,
13 pub requests_success: AtomicU64,
15 pub requests_failed: AtomicU64,
17 pub requests_rate_limited: AtomicU64,
19 pub bytes_downloaded: AtomicU64,
21 pub retries_total: AtomicU64,
23 pub request_time_total_ms: AtomicU64,
25 pub documents_extracted: AtomicU64,
27}
28
29impl Metrics {
30 pub fn new() -> Self {
32 Self::default()
33 }
34
35 pub fn inc_requests(&self) {
37 self.requests_total.fetch_add(1, Ordering::Relaxed);
38 }
39
40 pub fn inc_success(&self) {
42 self.requests_success.fetch_add(1, Ordering::Relaxed);
43 }
44
45 pub fn inc_failed(&self) {
47 self.requests_failed.fetch_add(1, Ordering::Relaxed);
48 }
49
50 pub fn inc_rate_limited(&self) {
52 self.requests_rate_limited.fetch_add(1, Ordering::Relaxed);
53 }
54
55 pub fn add_bytes(&self, bytes: u64) {
57 self.bytes_downloaded.fetch_add(bytes, Ordering::Relaxed);
58 }
59
60 pub fn inc_retries(&self) {
62 self.retries_total.fetch_add(1, Ordering::Relaxed);
63 }
64
65 pub fn add_request_time(&self, duration_ms: u64) {
67 self.request_time_total_ms.fetch_add(duration_ms, Ordering::Relaxed);
68 }
69
70 pub fn inc_documents(&self) {
72 self.documents_extracted.fetch_add(1, Ordering::Relaxed);
73 }
74
75 pub fn success_rate(&self) -> f64 {
77 let total = self.requests_total.load(Ordering::Relaxed);
78 if total == 0 {
79 return 0.0;
80 }
81 let success = self.requests_success.load(Ordering::Relaxed);
82 success as f64 / total as f64
83 }
84
85 pub fn avg_latency_ms(&self) -> f64 {
87 let total = self.requests_total.load(Ordering::Relaxed);
88 if total == 0 {
89 return 0.0;
90 }
91 let time = self.request_time_total_ms.load(Ordering::Relaxed);
92 time as f64 / total as f64
93 }
94
95 pub fn snapshot(&self) -> MetricsSnapshot {
97 MetricsSnapshot {
98 requests_total: self.requests_total.load(Ordering::Relaxed),
99 requests_success: self.requests_success.load(Ordering::Relaxed),
100 requests_failed: self.requests_failed.load(Ordering::Relaxed),
101 requests_rate_limited: self.requests_rate_limited.load(Ordering::Relaxed),
102 bytes_downloaded: self.bytes_downloaded.load(Ordering::Relaxed),
103 retries_total: self.retries_total.load(Ordering::Relaxed),
104 documents_extracted: self.documents_extracted.load(Ordering::Relaxed),
105 success_rate: self.success_rate(),
106 avg_latency_ms: self.avg_latency_ms(),
107 }
108 }
109}
110
111#[derive(Debug, Clone)]
113pub struct MetricsSnapshot {
114 pub requests_total: u64,
116 pub requests_success: u64,
118 pub requests_failed: u64,
120 pub requests_rate_limited: u64,
122 pub bytes_downloaded: u64,
124 pub retries_total: u64,
126 pub documents_extracted: u64,
128 pub success_rate: f64,
130 pub avg_latency_ms: f64,
132}
133
134pub struct MetricsCollector {
136 global: Metrics,
138 by_domain: RwLock<HashMap<String, DomainMetrics>>,
140 started_at: Instant,
142}
143
144impl Default for MetricsCollector {
145 fn default() -> Self {
146 Self::new()
147 }
148}
149
150impl MetricsCollector {
151 pub fn new() -> Self {
153 Self {
154 global: Metrics::new(),
155 by_domain: RwLock::new(HashMap::new()),
156 started_at: Instant::now(),
157 }
158 }
159
160 pub fn global(&self) -> &Metrics {
162 &self.global
163 }
164
165 pub fn record_request(&self, domain: &str) {
167 self.global.inc_requests();
168 self.with_domain(domain, |m| m.requests += 1);
169 }
170
171 pub fn record_success(&self, domain: &str, bytes: u64, duration_ms: u64) {
173 self.global.inc_success();
174 self.global.add_bytes(bytes);
175 self.global.add_request_time(duration_ms);
176
177 self.with_domain(domain, |m| {
178 m.successes += 1;
179 m.bytes += bytes;
180 m.total_time_ms += duration_ms;
181 });
182 }
183
184 pub fn record_failure(&self, domain: &str, duration_ms: u64) {
186 self.global.inc_failed();
187 self.global.add_request_time(duration_ms);
188
189 self.with_domain(domain, |m| {
190 m.failures += 1;
191 m.total_time_ms += duration_ms;
192 });
193 }
194
195 pub fn record_rate_limit(&self, domain: &str) {
197 self.global.inc_rate_limited();
198 self.with_domain(domain, |m| m.rate_limits += 1);
199 }
200
201 pub fn record_retry(&self, domain: &str) {
203 self.global.inc_retries();
204 self.with_domain(domain, |m| m.retries += 1);
205 }
206
207 pub fn record_document(&self) {
209 self.global.inc_documents();
210 }
211
212 pub fn elapsed(&self) -> Duration {
214 self.started_at.elapsed()
215 }
216
217 pub fn requests_per_second(&self) -> f64 {
219 let elapsed = self.elapsed().as_secs_f64();
220 if elapsed < 0.001 {
221 return 0.0;
222 }
223 self.global.requests_total.load(Ordering::Relaxed) as f64 / elapsed
224 }
225
226 pub fn domain_stats(&self) -> HashMap<String, DomainMetrics> {
228 self.by_domain.read().unwrap().clone()
229 }
230
231 fn with_domain<F>(&self, domain: &str, f: F)
233 where
234 F: FnOnce(&mut DomainMetrics),
235 {
236 let mut by_domain = self.by_domain.write().unwrap();
237 let metrics = by_domain
238 .entry(domain.to_string())
239 .or_insert_with(DomainMetrics::default);
240 f(metrics);
241 }
242}
243
244#[derive(Debug, Clone, Default)]
246pub struct DomainMetrics {
247 pub requests: u64,
249 pub successes: u64,
251 pub failures: u64,
253 pub rate_limits: u64,
255 pub retries: u64,
257 pub bytes: u64,
259 pub total_time_ms: u64,
261}
262
263impl DomainMetrics {
264 pub fn success_rate(&self) -> f64 {
266 if self.requests == 0 {
267 return 0.0;
268 }
269 self.successes as f64 / self.requests as f64
270 }
271
272 pub fn avg_latency_ms(&self) -> f64 {
274 if self.requests == 0 {
275 return 0.0;
276 }
277 self.total_time_ms as f64 / self.requests as f64
278 }
279}