Skip to main content

spider_util/
metrics.rs

1//! Metrics helpers shared by runtime reporting code.
2
3use parking_lot::RwLock;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7/// Formatter traits and default implementations for metrics output.
8pub use crate::formatters::{
9    ByteFormatter, DefaultByteFormatter, DefaultDurationFormatter, DefaultRateCalculator,
10    DurationFormatter, RateCalculator,
11};
12
13/// Thread-safe exponential moving average used to track recent event rates.
14#[derive(Debug)]
15pub struct ExpMovingAverage {
16    alpha: f64,
17    rate: Arc<RwLock<f64>>,
18    last_update: Arc<RwLock<Instant>>,
19    event_count: Arc<RwLock<usize>>,
20}
21
22impl ExpMovingAverage {
23    /// Creates a new moving average with smoothing factor `alpha`.
24    ///
25    /// Lower values react more slowly to changes; higher values react faster.
26    pub fn new(alpha: f64) -> Self {
27        ExpMovingAverage {
28            alpha,
29            rate: Arc::new(RwLock::new(0.0)),
30            last_update: Arc::new(RwLock::new(Instant::now())),
31            event_count: Arc::new(RwLock::new(0)),
32        }
33    }
34
35    /// Records `count` new events and updates the smoothed rate periodically.
36    pub fn update(&self, count: usize) {
37        let now = Instant::now();
38        let mut last_update = self.last_update.write();
39        let mut event_count = self.event_count.write();
40
41        *event_count += count;
42        let time_delta = now.duration_since(*last_update).as_secs_f64();
43
44        if time_delta >= 1.0 {
45            let current_rate = *event_count as f64 / time_delta;
46            let mut rate = self.rate.write();
47            *rate = self.alpha * current_rate + (1.0 - self.alpha) * (*rate);
48
49            *event_count = 0;
50            *last_update = now;
51        }
52    }
53
54    /// Returns the current smoothed events-per-second rate.
55    pub fn get_rate(&self) -> f64 {
56        *self.rate.read()
57    }
58}
59
60/// Point-in-time snapshot of crawler metrics for reporting and export.
61#[derive(Debug, Clone, serde::Serialize)]
62pub struct MetricsSnapshot {
63    pub requests_enqueued: usize,
64    pub requests_sent: usize,
65    pub requests_succeeded: usize,
66    pub requests_failed: usize,
67    pub requests_retried: usize,
68    pub requests_scheduled_for_retry: usize,
69    pub requests_dropped: usize,
70    pub retry_delay_in_flight_ms: u64,
71    pub responses_received: usize,
72    pub responses_from_cache: usize,
73    pub total_bytes_downloaded: usize,
74    pub items_scraped: usize,
75    pub items_processed: usize,
76    pub items_dropped_by_pipeline: usize,
77    pub queue_depth: usize,
78    pub parser_backlog: usize,
79    pub pipeline_backlog: usize,
80    pub retry_backlog: usize,
81    pub response_status_counts: std::collections::HashMap<u16, usize>,
82    pub elapsed_duration: Duration,
83    pub average_request_time: Option<Duration>,
84    pub fastest_request_time: Option<Duration>,
85    pub slowest_request_time: Option<Duration>,
86    pub request_time_count: usize,
87    pub average_parsing_time: Option<Duration>,
88    pub fastest_parsing_time: Option<Duration>,
89    pub slowest_parsing_time: Option<Duration>,
90    pub parsing_time_count: usize,
91    pub recent_requests_per_second: f64,
92    pub recent_responses_per_second: f64,
93    pub recent_items_per_second: f64,
94    pub current_item_preview: String,
95}
96
97impl MetricsSnapshot {
98    /// Formats [`Self::elapsed_duration`] into a human-readable string.
99    pub fn formatted_duration(&self) -> String {
100        DefaultDurationFormatter.formatted_duration(self.elapsed_duration)
101    }
102
103    /// Formats an optional request duration for display.
104    pub fn formatted_request_time(&self, duration: Option<Duration>) -> String {
105        DefaultDurationFormatter.formatted_request_time(duration)
106    }
107
108    /// Returns average sent requests per second over total elapsed duration.
109    pub fn requests_per_second(&self) -> f64 {
110        DefaultRateCalculator.calculate_rate(self.requests_sent, self.elapsed_duration)
111    }
112
113    /// Returns average received responses per second over total elapsed duration.
114    pub fn responses_per_second(&self) -> f64 {
115        DefaultRateCalculator.calculate_rate(self.responses_received, self.elapsed_duration)
116    }
117
118    /// Returns average scraped items per second over total elapsed duration.
119    pub fn items_per_second(&self) -> f64 {
120        DefaultRateCalculator.calculate_rate(self.items_scraped, self.elapsed_duration)
121    }
122
123    /// Returns average downloaded bytes per second over total elapsed duration.
124    pub fn bytes_per_second(&self) -> f64 {
125        DefaultRateCalculator.calculate_rate(self.total_bytes_downloaded, self.elapsed_duration)
126    }
127
128    /// Formats [`Self::total_bytes_downloaded`] into a human-readable size string.
129    pub fn formatted_bytes(&self) -> String {
130        DefaultByteFormatter.formatted_bytes(self.total_bytes_downloaded)
131    }
132
133    /// Formats [`Self::bytes_per_second`] into a human-readable rate string.
134    pub fn formatted_bytes_per_second(&self) -> String {
135        format!(
136            "{}/s",
137            DefaultByteFormatter.formatted_bytes(self.bytes_per_second() as usize)
138        )
139    }
140}
141
142/// Trait for metrics collectors that can produce a snapshot value.
143pub trait SnapshotProvider {
144    /// Snapshot type produced by this provider.
145    type Snapshot: Clone;
146
147    /// Builds a snapshot of the current metrics state.
148    fn create_snapshot(&self) -> Self::Snapshot;
149}
150
151/// Trait for exporting metrics into multiple output formats.
152pub trait MetricsExporter<T> {
153    /// Exports metrics as compact JSON.
154    ///
155    /// # Errors
156    ///
157    /// Returns an error when serialization fails.
158    fn to_json_string(&self) -> Result<String, crate::error::SpiderError>;
159
160    /// Exports metrics as pretty-printed JSON.
161    ///
162    /// # Errors
163    ///
164    /// Returns an error when serialization fails.
165    fn to_json_string_pretty(&self) -> Result<String, crate::error::SpiderError>;
166
167    /// Exports metrics as a Markdown report.
168    fn to_markdown_string(&self) -> String;
169
170    /// Exports metrics as a plain-text display report.
171    fn to_display_string(&self) -> String;
172}
173
174/// Default formatter for human-readable metrics display output.
175pub struct MetricsDisplayFormatter;
176
177impl MetricsDisplayFormatter {
178    /// Formats a snapshot provider into a multi-line summary string.
179    pub fn format_metrics<T: MetricsSnapshotProvider>(&self, snapshot: &T) -> String {
180        format!("\n{}\n", format_plain_text_metrics(snapshot))
181    }
182}
183
184/// Formats a metrics snapshot provider into the shared plain-text terminal layout.
185pub fn format_plain_text_metrics<T: MetricsSnapshotProvider>(snapshot: &T) -> String {
186    let overall_req_per_sec = calculate_rate(
187        snapshot.get_requests_sent(),
188        snapshot.get_elapsed_duration(),
189    );
190    let overall_resp_per_sec = calculate_rate(
191        snapshot.get_responses_received(),
192        snapshot.get_elapsed_duration(),
193    );
194    let overall_item_per_sec = calculate_rate(
195        snapshot.get_items_scraped(),
196        snapshot.get_elapsed_duration(),
197    );
198    let pending_requests = snapshot.get_requests_enqueued().saturating_sub(
199        snapshot.get_requests_succeeded()
200            + snapshot.get_requests_failed()
201            + snapshot.get_requests_dropped(),
202    );
203    let success_ratio = format_ratio(
204        snapshot.get_requests_succeeded(),
205        snapshot.get_requests_sent(),
206    );
207    let failure_ratio = format_ratio(snapshot.get_requests_failed(), snapshot.get_requests_sent());
208    let cache_hit_ratio = format_ratio(
209        snapshot.get_responses_from_cache(),
210        snapshot.get_responses_received(),
211    );
212    let bytes_per_second = format_byte_rate(
213        snapshot.get_total_bytes_downloaded(),
214        snapshot.get_elapsed_duration(),
215    );
216
217    format!(
218        "Crawl Statistics\n\
219         ----------------\n\
220         duration : {}\n\
221         speed    : req/s {:.2}, resp/s {:.2}, item/s {:.2}\n\
222         requests : enqueued {}, sent {}, pending {}, ok {}, fail {}\n\
223         retry    : retry {}, scheduled {}, drop {}\n\
224         ratios   : success {}, failure {}, cache hit {}\n\
225         response : received {}, cache {}, downloaded {}, bytes/s {}\n\
226         delay    : retry in flight {} ms\n\
227         backlog  : queue {}, parser {}, pipeline {}, retry {}\n\
228         items    : scraped {}, processed {}, dropped {}\n\
229         current  : {}\n\
230         req time : avg {}, fastest {}, slowest {}, total {}\n\
231         parsing  : avg {}, fastest {}, slowest {}, total {}\n\
232         status   : {}",
233        snapshot.formatted_duration(),
234        overall_req_per_sec,
235        overall_resp_per_sec,
236        overall_item_per_sec,
237        snapshot.get_requests_enqueued(),
238        snapshot.get_requests_sent(),
239        pending_requests,
240        snapshot.get_requests_succeeded(),
241        snapshot.get_requests_failed(),
242        snapshot.get_requests_retried(),
243        snapshot.get_requests_scheduled_for_retry(),
244        snapshot.get_requests_dropped(),
245        success_ratio,
246        failure_ratio,
247        cache_hit_ratio,
248        snapshot.get_responses_received(),
249        snapshot.get_responses_from_cache(),
250        snapshot.formatted_bytes(),
251        bytes_per_second,
252        snapshot.get_retry_delay_in_flight_ms(),
253        snapshot.get_queue_depth(),
254        snapshot.get_parser_backlog(),
255        snapshot.get_pipeline_backlog(),
256        snapshot.get_retry_backlog(),
257        snapshot.get_items_scraped(),
258        snapshot.get_items_processed(),
259        snapshot.get_items_dropped_by_pipeline(),
260        snapshot.get_current_item_preview(),
261        snapshot.formatted_request_time(snapshot.get_average_request_time()),
262        snapshot.formatted_request_time(snapshot.get_fastest_request_time()),
263        snapshot.formatted_request_time(snapshot.get_slowest_request_time()),
264        snapshot.get_request_time_count(),
265        snapshot.formatted_request_time(snapshot.get_average_parsing_time()),
266        snapshot.formatted_request_time(snapshot.get_fastest_parsing_time()),
267        snapshot.formatted_request_time(snapshot.get_slowest_parsing_time()),
268        snapshot.get_parsing_time_count(),
269        format_status_counts(snapshot.get_response_status_counts())
270    )
271}
272
273fn format_status_counts(status_counts: &std::collections::HashMap<u16, usize>) -> String {
274    if status_counts.is_empty() {
275        return "none".to_string();
276    }
277
278    let mut status_entries = status_counts
279        .iter()
280        .map(|(code, count)| (*code, *count))
281        .collect::<Vec<_>>();
282    status_entries.sort_unstable_by_key(|(code, _)| *code);
283
284    status_entries
285        .into_iter()
286        .map(|(code, count)| format!("{code}: {count}"))
287        .collect::<Vec<_>>()
288        .join(", ")
289}
290
291fn calculate_rate(count: usize, elapsed_duration: Duration) -> f64 {
292    DefaultRateCalculator.calculate_rate(count, elapsed_duration)
293}
294
295fn format_ratio(numerator: usize, denominator: usize) -> String {
296    if denominator == 0 {
297        return "0.00%".to_string();
298    }
299
300    format!("{:.2}%", (numerator as f64 / denominator as f64) * 100.0)
301}
302
303fn format_byte_rate(total_bytes: usize, elapsed_duration: Duration) -> String {
304    let bytes_per_second = calculate_rate(total_bytes, elapsed_duration);
305    format!(
306        "{}/s",
307        DefaultByteFormatter.formatted_bytes(bytes_per_second as usize)
308    )
309}
310
311/// Read-only accessor interface consumed by metrics display/export formatters.
312pub trait MetricsSnapshotProvider {
313    fn get_requests_enqueued(&self) -> usize;
314    fn get_requests_sent(&self) -> usize;
315    fn get_requests_succeeded(&self) -> usize;
316    fn get_requests_failed(&self) -> usize;
317    fn get_requests_retried(&self) -> usize;
318    fn get_requests_scheduled_for_retry(&self) -> usize;
319    fn get_requests_dropped(&self) -> usize;
320    fn get_retry_delay_in_flight_ms(&self) -> u64;
321    fn get_responses_received(&self) -> usize;
322    fn get_responses_from_cache(&self) -> usize;
323    fn get_total_bytes_downloaded(&self) -> usize;
324    fn get_items_scraped(&self) -> usize;
325    fn get_items_processed(&self) -> usize;
326    fn get_items_dropped_by_pipeline(&self) -> usize;
327    fn get_queue_depth(&self) -> usize;
328    fn get_parser_backlog(&self) -> usize;
329    fn get_pipeline_backlog(&self) -> usize;
330    fn get_retry_backlog(&self) -> usize;
331    fn get_response_status_counts(&self) -> &std::collections::HashMap<u16, usize>;
332    fn get_elapsed_duration(&self) -> Duration;
333    fn get_average_request_time(&self) -> Option<Duration>;
334    fn get_fastest_request_time(&self) -> Option<Duration>;
335    fn get_slowest_request_time(&self) -> Option<Duration>;
336    fn get_request_time_count(&self) -> usize;
337    fn get_average_parsing_time(&self) -> Option<Duration>;
338    fn get_fastest_parsing_time(&self) -> Option<Duration>;
339    fn get_slowest_parsing_time(&self) -> Option<Duration>;
340    fn get_parsing_time_count(&self) -> usize;
341    fn get_recent_requests_per_second(&self) -> f64;
342    fn get_recent_responses_per_second(&self) -> f64;
343    fn get_recent_items_per_second(&self) -> f64;
344    fn get_current_item_preview(&self) -> &str;
345    fn formatted_duration(&self) -> String;
346    fn formatted_request_time(&self, duration: Option<Duration>) -> String;
347    fn formatted_bytes(&self) -> String;
348}
349
350impl MetricsSnapshotProvider for MetricsSnapshot {
351    fn get_requests_enqueued(&self) -> usize {
352        self.requests_enqueued
353    }
354
355    fn get_requests_sent(&self) -> usize {
356        self.requests_sent
357    }
358
359    fn get_requests_succeeded(&self) -> usize {
360        self.requests_succeeded
361    }
362
363    fn get_requests_failed(&self) -> usize {
364        self.requests_failed
365    }
366
367    fn get_requests_retried(&self) -> usize {
368        self.requests_retried
369    }
370
371    fn get_requests_scheduled_for_retry(&self) -> usize {
372        self.requests_scheduled_for_retry
373    }
374
375    fn get_requests_dropped(&self) -> usize {
376        self.requests_dropped
377    }
378
379    fn get_retry_delay_in_flight_ms(&self) -> u64 {
380        self.retry_delay_in_flight_ms
381    }
382
383    fn get_responses_received(&self) -> usize {
384        self.responses_received
385    }
386
387    fn get_responses_from_cache(&self) -> usize {
388        self.responses_from_cache
389    }
390
391    fn get_total_bytes_downloaded(&self) -> usize {
392        self.total_bytes_downloaded
393    }
394
395    fn get_items_scraped(&self) -> usize {
396        self.items_scraped
397    }
398
399    fn get_items_processed(&self) -> usize {
400        self.items_processed
401    }
402
403    fn get_items_dropped_by_pipeline(&self) -> usize {
404        self.items_dropped_by_pipeline
405    }
406
407    fn get_queue_depth(&self) -> usize {
408        self.queue_depth
409    }
410
411    fn get_parser_backlog(&self) -> usize {
412        self.parser_backlog
413    }
414
415    fn get_pipeline_backlog(&self) -> usize {
416        self.pipeline_backlog
417    }
418
419    fn get_retry_backlog(&self) -> usize {
420        self.retry_backlog
421    }
422
423    fn get_response_status_counts(&self) -> &std::collections::HashMap<u16, usize> {
424        &self.response_status_counts
425    }
426
427    fn get_elapsed_duration(&self) -> Duration {
428        self.elapsed_duration
429    }
430
431    fn get_average_request_time(&self) -> Option<Duration> {
432        self.average_request_time
433    }
434
435    fn get_fastest_request_time(&self) -> Option<Duration> {
436        self.fastest_request_time
437    }
438
439    fn get_slowest_request_time(&self) -> Option<Duration> {
440        self.slowest_request_time
441    }
442
443    fn get_request_time_count(&self) -> usize {
444        self.request_time_count
445    }
446
447    fn get_average_parsing_time(&self) -> Option<Duration> {
448        self.average_parsing_time
449    }
450
451    fn get_fastest_parsing_time(&self) -> Option<Duration> {
452        self.fastest_parsing_time
453    }
454
455    fn get_slowest_parsing_time(&self) -> Option<Duration> {
456        self.slowest_parsing_time
457    }
458
459    fn get_parsing_time_count(&self) -> usize {
460        self.parsing_time_count
461    }
462
463    fn get_recent_requests_per_second(&self) -> f64 {
464        self.recent_requests_per_second
465    }
466
467    fn get_recent_responses_per_second(&self) -> f64 {
468        self.recent_responses_per_second
469    }
470
471    fn get_recent_items_per_second(&self) -> f64 {
472        self.recent_items_per_second
473    }
474
475    fn get_current_item_preview(&self) -> &str {
476        &self.current_item_preview
477    }
478
479    fn formatted_duration(&self) -> String {
480        self.formatted_duration()
481    }
482
483    fn formatted_request_time(&self, duration: Option<Duration>) -> String {
484        self.formatted_request_time(duration)
485    }
486
487    fn formatted_bytes(&self) -> String {
488        self.formatted_bytes()
489    }
490}