1use std::collections::HashMap;
23use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
24use std::sync::{OnceLock, RwLock};
25use std::time::{Duration, Instant};
26
27const MAX_HISTOGRAM_SAMPLES: usize = 1000;
30
31#[allow(dead_code)]
33const DEFAULT_BUCKETS_MS: &[u64] = &[5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000];
34
35#[derive(Debug)]
40pub struct Metrics {
41 pub requests_total: AtomicU64,
44 pub captures_total: AtomicU64,
46 pub errors_total: AtomicU64,
48 pub extractions_total: AtomicU64,
50 pub navigations_total: AtomicU64,
52
53 pub active_connections: AtomicU32,
56 pub active_pages: AtomicU32,
58
59 request_durations: RwLock<RingBuffer<Duration>>,
62
63 requests_by_path_status: RwLock<HashMap<(String, u16), u64>>,
66 errors_by_type: RwLock<HashMap<String, u64>>,
68 captures_by_format: RwLock<HashMap<String, u64>>,
70
71 start_time: RwLock<Option<Instant>>,
74}
75
76#[derive(Debug)]
78pub struct RingBuffer<T> {
79 data: Vec<T>,
80 capacity: usize,
81 write_pos: usize,
83 total_samples: u64,
85}
86
87#[allow(dead_code)]
88impl<T: Clone + Ord> RingBuffer<T> {
89 fn new(capacity: usize) -> Self {
90 Self {
91 data: Vec::with_capacity(capacity),
92 capacity,
93 write_pos: 0,
94 total_samples: 0,
95 }
96 }
97
98 fn push(&mut self, value: T) {
99 if self.data.len() < self.capacity {
100 self.data.push(value);
101 } else {
102 self.data[self.write_pos] = value;
103 }
104 self.write_pos = (self.write_pos + 1) % self.capacity;
105 self.total_samples += 1;
106 }
107
108 fn len(&self) -> usize {
109 self.data.len()
110 }
111
112 fn total_samples(&self) -> u64 {
113 self.total_samples
114 }
115
116 fn sorted_samples(&self) -> Vec<T> {
118 let mut sorted = self.data.clone();
119 sorted.sort();
120 sorted
121 }
122
123 fn percentile(&self, p: f64) -> Option<T> {
125 if self.data.is_empty() {
126 return None;
127 }
128 let sorted = self.sorted_samples();
129 let idx = ((sorted.len() as f64 - 1.0) * p).round() as usize;
130 sorted.get(idx).cloned()
131 }
132}
133
134impl Default for Metrics {
135 fn default() -> Self {
136 Self::new()
137 }
138}
139
140impl Metrics {
141 pub fn new() -> Self {
145 Self {
146 requests_total: AtomicU64::new(0),
147 captures_total: AtomicU64::new(0),
148 errors_total: AtomicU64::new(0),
149 extractions_total: AtomicU64::new(0),
150 navigations_total: AtomicU64::new(0),
151 active_connections: AtomicU32::new(0),
152 active_pages: AtomicU32::new(0),
153 request_durations: RwLock::new(RingBuffer {
154 data: Vec::new(),
155 capacity: MAX_HISTOGRAM_SAMPLES,
156 write_pos: 0,
157 total_samples: 0,
158 }),
159 requests_by_path_status: RwLock::new(HashMap::new()),
160 errors_by_type: RwLock::new(HashMap::new()),
161 captures_by_format: RwLock::new(HashMap::new()),
162 start_time: RwLock::new(None),
163 }
164 }
165
166 pub fn record_request(&self, path: &str, status_code: u16, duration: Duration) {
168 self.requests_total.fetch_add(1, Ordering::Relaxed);
169
170 if let Ok(mut durations) = self.request_durations.write() {
172 durations.push(duration);
173 }
174
175 if let Ok(mut breakdown) = self.requests_by_path_status.write() {
177 *breakdown
178 .entry((path.to_string(), status_code))
179 .or_insert(0) += 1;
180 }
181 }
182
183 pub fn record_capture(&self, format: &str) {
185 self.captures_total.fetch_add(1, Ordering::Relaxed);
186
187 if let Ok(mut breakdown) = self.captures_by_format.write() {
188 *breakdown.entry(format.to_string()).or_insert(0) += 1;
189 }
190 }
191
192 pub fn record_error(&self, error_type: &str) {
194 self.errors_total.fetch_add(1, Ordering::Relaxed);
195
196 if let Ok(mut breakdown) = self.errors_by_type.write() {
197 *breakdown.entry(error_type.to_string()).or_insert(0) += 1;
198 }
199 }
200
201 pub fn record_extraction(&self) {
203 self.extractions_total.fetch_add(1, Ordering::Relaxed);
204 }
205
206 pub fn record_navigation(&self) {
208 self.navigations_total.fetch_add(1, Ordering::Relaxed);
209 }
210
211 pub fn inc_active_connections(&self) {
213 self.active_connections.fetch_add(1, Ordering::Relaxed);
214 }
215
216 pub fn dec_active_connections(&self) {
218 self.active_connections.fetch_sub(1, Ordering::Relaxed);
219 }
220
221 pub fn inc_active_pages(&self) {
223 self.active_pages.fetch_add(1, Ordering::Relaxed);
224 }
225
226 pub fn dec_active_pages(&self) {
228 self.active_pages.fetch_sub(1, Ordering::Relaxed);
229 }
230
231 pub fn get_request_durations(&self) -> Option<RingBuffer<Duration>> {
233 self.request_durations
234 .read()
235 .ok()
236 .map(|durations| RingBuffer {
237 data: durations.data.clone(),
238 capacity: durations.capacity,
239 write_pos: durations.write_pos,
240 total_samples: durations.total_samples,
241 })
242 }
243
244 pub fn to_prometheus_format(&self) -> String {
246 let mut output = String::new();
247
248 output.push_str(&format!(
250 "reasonkit_web_requests_total {}\n",
251 self.requests_total.load(Ordering::Relaxed)
252 ));
253 output.push_str(&format!(
254 "reasonkit_web_captures_total {}\n",
255 self.captures_total.load(Ordering::Relaxed)
256 ));
257 output.push_str(&format!(
258 "reasonkit_web_errors_total {}\n",
259 self.errors_total.load(Ordering::Relaxed)
260 ));
261 output.push_str(&format!(
262 "reasonkit_web_extractions_total {}\n",
263 self.extractions_total.load(Ordering::Relaxed)
264 ));
265 output.push_str(&format!(
266 "reasonkit_web_navigations_total {}\n",
267 self.navigations_total.load(Ordering::Relaxed)
268 ));
269
270 output.push_str(&format!(
272 "reasonkit_web_active_connections {}\n",
273 self.active_connections.load(Ordering::Relaxed)
274 ));
275 output.push_str(&format!(
276 "reasonkit_web_active_pages {}\n",
277 self.active_pages.load(Ordering::Relaxed)
278 ));
279
280 if let Ok(durations) = self.request_durations.read() {
282 if durations.len() > 0 {
283 if let Some(p50) = durations.percentile(0.5) {
284 output.push_str(&format!(
285 "reasonkit_web_request_duration_p50_ms {}\n",
286 p50.as_millis()
287 ));
288 }
289 if let Some(p95) = durations.percentile(0.95) {
290 output.push_str(&format!(
291 "reasonkit_web_request_duration_p95_ms {}\n",
292 p95.as_millis()
293 ));
294 }
295 if let Some(p99) = durations.percentile(0.99) {
296 output.push_str(&format!(
297 "reasonkit_web_request_duration_p99_ms {}\n",
298 p99.as_millis()
299 ));
300 }
301 }
302 }
303
304 output
305 }
306}
307
308pub static METRICS: OnceLock<Metrics> = OnceLock::new();
316
317pub fn global_metrics() -> &'static Metrics {
319 METRICS.get_or_init(Metrics::new)
320}
321
322pub fn init() {
324 let _ = METRICS.get_or_init(Metrics::new);
325
326 if let Ok(mut start_time) = global_metrics().start_time.write() {
328 *start_time = Some(Instant::now());
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335
336 #[test]
337 fn test_metrics_recording() {
338 let metrics = Metrics::new();
339
340 metrics.record_request("/test", 200, Duration::from_millis(100));
341 assert_eq!(metrics.requests_total.load(Ordering::Relaxed), 1);
342
343 metrics.record_capture("screenshot");
344 assert_eq!(metrics.captures_total.load(Ordering::Relaxed), 1);
345
346 metrics.record_error("timeout");
347 assert_eq!(metrics.errors_total.load(Ordering::Relaxed), 1);
348 }
349
350 #[test]
351 fn test_global_metrics() {
352 init();
353
354 let metrics = global_metrics();
355 metrics.record_request("/test", 200, Duration::from_millis(150));
356
357 assert_eq!(metrics.requests_total.load(Ordering::Relaxed), 1);
358 }
359}