Skip to main content

spider_util/
metrics.rs

1//! Metrics helpers shared by runtime reporting code.
2
3use parking_lot::RwLock;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7/// Formatter traits and default implementations for metrics output.
8pub use crate::formatters::{
9    ByteFormatter, DefaultByteFormatter, DefaultDurationFormatter, DefaultRateCalculator,
10    DurationFormatter, RateCalculator,
11};
12
13/// Thread-safe exponential moving average used to track recent event rates.
14#[derive(Debug)]
15pub struct ExpMovingAverage {
16    alpha: f64,
17    rate: Arc<RwLock<f64>>,
18    last_update: Arc<RwLock<Instant>>,
19    event_count: Arc<RwLock<usize>>,
20}
21
22impl ExpMovingAverage {
23    /// Creates a new moving average with smoothing factor `alpha`.
24    ///
25    /// Lower values react more slowly to changes; higher values react faster.
26    pub fn new(alpha: f64) -> Self {
27        ExpMovingAverage {
28            alpha,
29            rate: Arc::new(RwLock::new(0.0)),
30            last_update: Arc::new(RwLock::new(Instant::now())),
31            event_count: Arc::new(RwLock::new(0)),
32        }
33    }
34
35    /// Records `count` new events and updates the smoothed rate periodically.
36    pub fn update(&self, count: usize) {
37        let now = Instant::now();
38        let mut last_update = self.last_update.write();
39        let mut event_count = self.event_count.write();
40
41        *event_count += count;
42        let time_delta = now.duration_since(*last_update).as_secs_f64();
43
44        if time_delta >= 1.0 {
45            let current_rate = *event_count as f64 / time_delta;
46            let mut rate = self.rate.write();
47            *rate = self.alpha * current_rate + (1.0 - self.alpha) * (*rate);
48
49            *event_count = 0;
50            *last_update = now;
51        }
52    }
53
54    /// Returns the current smoothed events-per-second rate.
55    pub fn get_rate(&self) -> f64 {
56        *self.rate.read()
57    }
58}
59
60/// Point-in-time snapshot of crawler metrics for reporting and export.
61#[derive(Debug, Clone, serde::Serialize)]
62pub struct MetricsSnapshot {
63    pub requests_enqueued: usize,
64    pub requests_sent: usize,
65    pub requests_succeeded: usize,
66    pub requests_failed: usize,
67    pub requests_retried: usize,
68    pub requests_scheduled_for_retry: usize,
69    pub requests_dropped: usize,
70    pub retry_delay_in_flight_ms: u64,
71    pub responses_received: usize,
72    pub responses_from_cache: usize,
73    pub total_bytes_downloaded: usize,
74    pub items_scraped: usize,
75    pub items_processed: usize,
76    pub items_dropped_by_pipeline: usize,
77    pub response_status_counts: std::collections::HashMap<u16, usize>,
78    pub elapsed_duration: Duration,
79    pub average_request_time: Option<Duration>,
80    pub fastest_request_time: Option<Duration>,
81    pub slowest_request_time: Option<Duration>,
82    pub request_time_count: usize,
83    pub average_parsing_time: Option<Duration>,
84    pub fastest_parsing_time: Option<Duration>,
85    pub slowest_parsing_time: Option<Duration>,
86    pub parsing_time_count: usize,
87    pub recent_requests_per_second: f64,
88    pub recent_responses_per_second: f64,
89    pub recent_items_per_second: f64,
90    pub current_item_preview: String,
91}
92
93impl MetricsSnapshot {
94    /// Formats [`Self::elapsed_duration`] into a human-readable string.
95    pub fn formatted_duration(&self) -> String {
96        DefaultDurationFormatter.formatted_duration(self.elapsed_duration)
97    }
98
99    /// Formats an optional request duration for display.
100    pub fn formatted_request_time(&self, duration: Option<Duration>) -> String {
101        DefaultDurationFormatter.formatted_request_time(duration)
102    }
103
104    /// Returns average sent requests per second over total elapsed duration.
105    pub fn requests_per_second(&self) -> f64 {
106        DefaultRateCalculator.calculate_rate(self.requests_sent, self.elapsed_duration)
107    }
108
109    /// Returns average received responses per second over total elapsed duration.
110    pub fn responses_per_second(&self) -> f64 {
111        DefaultRateCalculator.calculate_rate(self.responses_received, self.elapsed_duration)
112    }
113
114    /// Returns average scraped items per second over total elapsed duration.
115    pub fn items_per_second(&self) -> f64 {
116        DefaultRateCalculator.calculate_rate(self.items_scraped, self.elapsed_duration)
117    }
118
119    /// Returns average downloaded bytes per second over total elapsed duration.
120    pub fn bytes_per_second(&self) -> f64 {
121        DefaultRateCalculator.calculate_rate(self.total_bytes_downloaded, self.elapsed_duration)
122    }
123
124    /// Formats [`Self::total_bytes_downloaded`] into a human-readable size string.
125    pub fn formatted_bytes(&self) -> String {
126        DefaultByteFormatter.formatted_bytes(self.total_bytes_downloaded)
127    }
128
129    /// Formats [`Self::bytes_per_second`] into a human-readable rate string.
130    pub fn formatted_bytes_per_second(&self) -> String {
131        format!(
132            "{}/s",
133            DefaultByteFormatter.formatted_bytes(self.bytes_per_second() as usize)
134        )
135    }
136}
137
138/// Trait for metrics collectors that can produce a snapshot value.
139pub trait SnapshotProvider {
140    /// Snapshot type produced by this provider.
141    type Snapshot: Clone;
142
143    /// Builds a snapshot of the current metrics state.
144    fn create_snapshot(&self) -> Self::Snapshot;
145}
146
147/// Trait for exporting metrics into multiple output formats.
148pub trait MetricsExporter<T> {
149    /// Exports metrics as compact JSON.
150    ///
151    /// # Errors
152    ///
153    /// Returns an error when serialization fails.
154    fn to_json_string(&self) -> Result<String, crate::error::SpiderError>;
155
156    /// Exports metrics as pretty-printed JSON.
157    ///
158    /// # Errors
159    ///
160    /// Returns an error when serialization fails.
161    fn to_json_string_pretty(&self) -> Result<String, crate::error::SpiderError>;
162
163    /// Exports metrics as a Markdown report.
164    fn to_markdown_string(&self) -> String;
165
166    /// Exports metrics as a plain-text display report.
167    fn to_display_string(&self) -> String;
168}
169
170/// Default formatter for human-readable metrics display output.
171pub struct MetricsDisplayFormatter;
172
173impl MetricsDisplayFormatter {
174    /// Formats a snapshot provider into a multi-line summary string.
175    pub fn format_metrics<T: MetricsSnapshotProvider>(&self, snapshot: &T) -> String {
176        format!("\n{}\n", format_plain_text_metrics(snapshot))
177    }
178}
179
180/// Formats a metrics snapshot provider into the shared plain-text terminal layout.
181pub fn format_plain_text_metrics<T: MetricsSnapshotProvider>(snapshot: &T) -> String {
182    let overall_req_per_sec = calculate_rate(
183        snapshot.get_requests_sent(),
184        snapshot.get_elapsed_duration(),
185    );
186    let overall_resp_per_sec = calculate_rate(
187        snapshot.get_responses_received(),
188        snapshot.get_elapsed_duration(),
189    );
190    let overall_item_per_sec = calculate_rate(
191        snapshot.get_items_scraped(),
192        snapshot.get_elapsed_duration(),
193    );
194    let pending_requests = snapshot.get_requests_enqueued().saturating_sub(
195        snapshot.get_requests_succeeded()
196            + snapshot.get_requests_failed()
197            + snapshot.get_requests_dropped(),
198    );
199    let success_ratio = format_ratio(
200        snapshot.get_requests_succeeded(),
201        snapshot.get_requests_sent(),
202    );
203    let failure_ratio = format_ratio(snapshot.get_requests_failed(), snapshot.get_requests_sent());
204    let cache_hit_ratio = format_ratio(
205        snapshot.get_responses_from_cache(),
206        snapshot.get_responses_received(),
207    );
208    let bytes_per_second = format_byte_rate(
209        snapshot.get_total_bytes_downloaded(),
210        snapshot.get_elapsed_duration(),
211    );
212
213    format!(
214        "Crawl Statistics\n\
215         ----------------\n\
216         duration : {}\n\
217         speed    : req/s {:.2}, resp/s {:.2}, item/s {:.2}\n\
218         requests : enqueued {}, sent {}, pending {}, ok {}, fail {}\n\
219         retry    : retry {}, scheduled {}, drop {}\n\
220         ratios   : success {}, failure {}, cache hit {}\n\
221         response : received {}, cache {}, downloaded {}, bytes/s {}\n\
222         delay    : retry in flight {} ms\n\
223         items    : scraped {}, processed {}, dropped {}\n\
224         current  : {}\n\
225         req time : avg {}, fastest {}, slowest {}, total {}\n\
226         parsing  : avg {}, fastest {}, slowest {}, total {}\n\
227         status   : {}",
228        snapshot.formatted_duration(),
229        overall_req_per_sec,
230        overall_resp_per_sec,
231        overall_item_per_sec,
232        snapshot.get_requests_enqueued(),
233        snapshot.get_requests_sent(),
234        pending_requests,
235        snapshot.get_requests_succeeded(),
236        snapshot.get_requests_failed(),
237        snapshot.get_requests_retried(),
238        snapshot.get_requests_scheduled_for_retry(),
239        snapshot.get_requests_dropped(),
240        success_ratio,
241        failure_ratio,
242        cache_hit_ratio,
243        snapshot.get_responses_received(),
244        snapshot.get_responses_from_cache(),
245        snapshot.formatted_bytes(),
246        bytes_per_second,
247        snapshot.get_retry_delay_in_flight_ms(),
248        snapshot.get_items_scraped(),
249        snapshot.get_items_processed(),
250        snapshot.get_items_dropped_by_pipeline(),
251        snapshot.get_current_item_preview(),
252        snapshot.formatted_request_time(snapshot.get_average_request_time()),
253        snapshot.formatted_request_time(snapshot.get_fastest_request_time()),
254        snapshot.formatted_request_time(snapshot.get_slowest_request_time()),
255        snapshot.get_request_time_count(),
256        snapshot.formatted_request_time(snapshot.get_average_parsing_time()),
257        snapshot.formatted_request_time(snapshot.get_fastest_parsing_time()),
258        snapshot.formatted_request_time(snapshot.get_slowest_parsing_time()),
259        snapshot.get_parsing_time_count(),
260        format_status_counts(snapshot.get_response_status_counts())
261    )
262}
263
264fn format_status_counts(status_counts: &std::collections::HashMap<u16, usize>) -> String {
265    if status_counts.is_empty() {
266        return "none".to_string();
267    }
268
269    let mut status_entries = status_counts
270        .iter()
271        .map(|(code, count)| (*code, *count))
272        .collect::<Vec<_>>();
273    status_entries.sort_unstable_by_key(|(code, _)| *code);
274
275    status_entries
276        .into_iter()
277        .map(|(code, count)| format!("{code}: {count}"))
278        .collect::<Vec<_>>()
279        .join(", ")
280}
281
282fn calculate_rate(count: usize, elapsed_duration: Duration) -> f64 {
283    DefaultRateCalculator.calculate_rate(count, elapsed_duration)
284}
285
286fn format_ratio(numerator: usize, denominator: usize) -> String {
287    if denominator == 0 {
288        return "0.00%".to_string();
289    }
290
291    format!("{:.2}%", (numerator as f64 / denominator as f64) * 100.0)
292}
293
294fn format_byte_rate(total_bytes: usize, elapsed_duration: Duration) -> String {
295    let bytes_per_second = calculate_rate(total_bytes, elapsed_duration);
296    format!(
297        "{}/s",
298        DefaultByteFormatter.formatted_bytes(bytes_per_second as usize)
299    )
300}
301
302/// Read-only accessor interface consumed by metrics display/export formatters.
303pub trait MetricsSnapshotProvider {
304    fn get_requests_enqueued(&self) -> usize;
305    fn get_requests_sent(&self) -> usize;
306    fn get_requests_succeeded(&self) -> usize;
307    fn get_requests_failed(&self) -> usize;
308    fn get_requests_retried(&self) -> usize;
309    fn get_requests_scheduled_for_retry(&self) -> usize;
310    fn get_requests_dropped(&self) -> usize;
311    fn get_retry_delay_in_flight_ms(&self) -> u64;
312    fn get_responses_received(&self) -> usize;
313    fn get_responses_from_cache(&self) -> usize;
314    fn get_total_bytes_downloaded(&self) -> usize;
315    fn get_items_scraped(&self) -> usize;
316    fn get_items_processed(&self) -> usize;
317    fn get_items_dropped_by_pipeline(&self) -> usize;
318    fn get_response_status_counts(&self) -> &std::collections::HashMap<u16, usize>;
319    fn get_elapsed_duration(&self) -> Duration;
320    fn get_average_request_time(&self) -> Option<Duration>;
321    fn get_fastest_request_time(&self) -> Option<Duration>;
322    fn get_slowest_request_time(&self) -> Option<Duration>;
323    fn get_request_time_count(&self) -> usize;
324    fn get_average_parsing_time(&self) -> Option<Duration>;
325    fn get_fastest_parsing_time(&self) -> Option<Duration>;
326    fn get_slowest_parsing_time(&self) -> Option<Duration>;
327    fn get_parsing_time_count(&self) -> usize;
328    fn get_recent_requests_per_second(&self) -> f64;
329    fn get_recent_responses_per_second(&self) -> f64;
330    fn get_recent_items_per_second(&self) -> f64;
331    fn get_current_item_preview(&self) -> &str;
332    fn formatted_duration(&self) -> String;
333    fn formatted_request_time(&self, duration: Option<Duration>) -> String;
334    fn formatted_bytes(&self) -> String;
335}
336
337impl MetricsSnapshotProvider for MetricsSnapshot {
338    fn get_requests_enqueued(&self) -> usize {
339        self.requests_enqueued
340    }
341
342    fn get_requests_sent(&self) -> usize {
343        self.requests_sent
344    }
345
346    fn get_requests_succeeded(&self) -> usize {
347        self.requests_succeeded
348    }
349
350    fn get_requests_failed(&self) -> usize {
351        self.requests_failed
352    }
353
354    fn get_requests_retried(&self) -> usize {
355        self.requests_retried
356    }
357
358    fn get_requests_scheduled_for_retry(&self) -> usize {
359        self.requests_scheduled_for_retry
360    }
361
362    fn get_requests_dropped(&self) -> usize {
363        self.requests_dropped
364    }
365
366    fn get_retry_delay_in_flight_ms(&self) -> u64 {
367        self.retry_delay_in_flight_ms
368    }
369
370    fn get_responses_received(&self) -> usize {
371        self.responses_received
372    }
373
374    fn get_responses_from_cache(&self) -> usize {
375        self.responses_from_cache
376    }
377
378    fn get_total_bytes_downloaded(&self) -> usize {
379        self.total_bytes_downloaded
380    }
381
382    fn get_items_scraped(&self) -> usize {
383        self.items_scraped
384    }
385
386    fn get_items_processed(&self) -> usize {
387        self.items_processed
388    }
389
390    fn get_items_dropped_by_pipeline(&self) -> usize {
391        self.items_dropped_by_pipeline
392    }
393
394    fn get_response_status_counts(&self) -> &std::collections::HashMap<u16, usize> {
395        &self.response_status_counts
396    }
397
398    fn get_elapsed_duration(&self) -> Duration {
399        self.elapsed_duration
400    }
401
402    fn get_average_request_time(&self) -> Option<Duration> {
403        self.average_request_time
404    }
405
406    fn get_fastest_request_time(&self) -> Option<Duration> {
407        self.fastest_request_time
408    }
409
410    fn get_slowest_request_time(&self) -> Option<Duration> {
411        self.slowest_request_time
412    }
413
414    fn get_request_time_count(&self) -> usize {
415        self.request_time_count
416    }
417
418    fn get_average_parsing_time(&self) -> Option<Duration> {
419        self.average_parsing_time
420    }
421
422    fn get_fastest_parsing_time(&self) -> Option<Duration> {
423        self.fastest_parsing_time
424    }
425
426    fn get_slowest_parsing_time(&self) -> Option<Duration> {
427        self.slowest_parsing_time
428    }
429
430    fn get_parsing_time_count(&self) -> usize {
431        self.parsing_time_count
432    }
433
434    fn get_recent_requests_per_second(&self) -> f64 {
435        self.recent_requests_per_second
436    }
437
438    fn get_recent_responses_per_second(&self) -> f64 {
439        self.recent_responses_per_second
440    }
441
442    fn get_recent_items_per_second(&self) -> f64 {
443        self.recent_items_per_second
444    }
445
446    fn get_current_item_preview(&self) -> &str {
447        &self.current_item_preview
448    }
449
450    fn formatted_duration(&self) -> String {
451        self.formatted_duration()
452    }
453
454    fn formatted_request_time(&self, duration: Option<Duration>) -> String {
455        self.formatted_request_time(duration)
456    }
457
458    fn formatted_bytes(&self) -> String {
459        self.formatted_bytes()
460    }
461}