Skip to main content

spider_core/
stats.rs

1//! # Statistics Module
2//!
3//! Collects and stores various metrics and statistics about the crawler's operation.
4//!
5//! ## Overview
6//!
7//! The [`StatCollector`] tracks important metrics throughout the crawling process,
8//! including request counts, response statistics, item processing metrics, and
9//! performance indicators. This data is essential for monitoring crawl progress,
10//! diagnosing issues, and optimizing performance.
11//!
12//! ## Key Metrics Tracked
13//!
14//! - **Request Metrics**: Enqueued, sent, succeeded, failed, retried, and dropped requests
15//! - **Response Metrics**: Received, cached, and status code distributions
16//! - **Item Metrics**: Scraped, processed, and dropped items
17//! - **Performance Metrics**: Throughput, response times, and bandwidth usage
18//! - **Timing Metrics**: Elapsed time and processing rates
19//!
20//! ## Features
21//!
22//! - **Thread-Safe**: Uses atomic operations for concurrent metric updates
23//! - **Real-Time Monitoring**: Provides live statistics during crawling
24//! - **Export Formats**: Supports JSON and Markdown export formats
25//! - **Snapshot Capability**: Captures consistent state for reporting
26//!
27//! ## Example
28//!
29//! ```rust,ignore
30//! use spider_core::StatCollector;
31//!
32//! let stats = StatCollector::new();
33//!
34//! // During crawling, metrics are automatically updated
35//! stats.increment_requests_sent();
36//! stats.increment_items_scraped();
37//!
38//! // Export statistics in various formats
39//! println!("{}", stats.to_json_string_pretty().unwrap());
40//! println!("{}", stats.to_markdown_string());
41//! ```
42
43use spider_util::error::SpiderError;
44use spider_util::metrics::ExpMovingAverage;
45use std::{
46    collections::HashMap,
47    sync::{
48        Arc,
49        atomic::{AtomicUsize, Ordering},
50    },
51    time::{Duration, Instant},
52};
53
54// A snapshot of the current statistics, used for reporting.
55// This avoids code duplication in the various export/display methods.
56struct StatsSnapshot {
57    requests_enqueued: usize,
58    requests_sent: usize,
59    requests_succeeded: usize,
60    requests_failed: usize,
61    requests_retried: usize,
62    requests_dropped: usize,
63    responses_received: usize,
64    responses_from_cache: usize,
65    total_bytes_downloaded: usize,
66    items_scraped: usize,
67    items_processed: usize,
68    items_dropped_by_pipeline: usize,
69    response_status_counts: HashMap<u16, usize>,
70    elapsed_duration: Duration,
71    average_request_time: Option<Duration>,
72    fastest_request_time: Option<Duration>,
73    slowest_request_time: Option<Duration>,
74    request_time_count: usize,
75    average_parsing_time: Option<Duration>,
76    fastest_parsing_time: Option<Duration>,
77    slowest_parsing_time: Option<Duration>,
78    parsing_time_count: usize,
79
80    // Recent rates from sliding windows
81    recent_requests_per_second: f64,
82    recent_responses_per_second: f64,
83    recent_items_per_second: f64,
84}
85
86impl StatsSnapshot {
87    fn formatted_duration(&self) -> String {
88        format!("{:?}", self.elapsed_duration)
89    }
90
91    fn formatted_request_time(&self, duration: Option<Duration>) -> String {
92        match duration {
93            Some(d) => {
94                if d.as_millis() < 1000 {
95                    format!("{} ms", d.as_millis())
96                } else {
97                    format!("{:.2} s", d.as_secs_f64())
98                }
99            }
100            None => "N/A".to_string(),
101        }
102    }
103
104    fn requests_per_second(&self) -> f64 {
105        let elapsed = self.elapsed_duration.as_secs_f64();
106        if elapsed > 0.0 {
107            self.requests_sent as f64 / elapsed
108        } else {
109            0.0
110        }
111    }
112
113    fn responses_per_second(&self) -> f64 {
114        let elapsed = self.elapsed_duration.as_secs_f64();
115        if elapsed > 0.0 {
116            self.responses_received as f64 / elapsed
117        } else {
118            0.0
119        }
120    }
121
122    fn items_per_second(&self) -> f64 {
123        let elapsed = self.elapsed_duration.as_secs_f64();
124        if elapsed > 0.0 {
125            self.items_scraped as f64 / elapsed
126        } else {
127            0.0
128        }
129    }
130
131    fn formatted_bytes(&self) -> String {
132        const KB: usize = 1024;
133        const MB: usize = 1024 * KB;
134        const GB: usize = 1024 * MB;
135
136        if self.total_bytes_downloaded >= GB {
137            format!("{:.2} GB", self.total_bytes_downloaded as f64 / GB as f64)
138        } else if self.total_bytes_downloaded >= MB {
139            format!("{:.2} MB", self.total_bytes_downloaded as f64 / MB as f64)
140        } else if self.total_bytes_downloaded >= KB {
141            format!("{:.2} KB", self.total_bytes_downloaded as f64 / KB as f64)
142        } else {
143            format!("{} B", self.total_bytes_downloaded)
144        }
145    }
146}
147
148/// Collects and stores various statistics about the crawler's operation.
149#[derive(Debug, serde::Serialize)]
150pub struct StatCollector {
151    // Crawl-related metrics
152    #[serde(skip)]
153    pub start_time: Instant,
154
155    // Request-related metrics
156    pub requests_enqueued: AtomicUsize,
157    pub requests_sent: AtomicUsize,
158    pub requests_succeeded: AtomicUsize,
159    pub requests_failed: AtomicUsize,
160    pub requests_retried: AtomicUsize,
161    pub requests_dropped: AtomicUsize,
162
163    // Response-related metrics
164    pub responses_received: AtomicUsize,
165    pub responses_from_cache: AtomicUsize,
166    pub response_status_counts: Arc<dashmap::DashMap<u16, usize>>, // e.g., 200, 404, 500
167    pub total_bytes_downloaded: AtomicUsize,
168
169    // Add more advanced response time metrics if needed (e.g., histograms)
170
171    // Item-related metrics
172    pub items_scraped: AtomicUsize,
173    pub items_processed: AtomicUsize,
174    pub items_dropped_by_pipeline: AtomicUsize,
175
176    // Timing metrics
177    pub request_times: Arc<dashmap::DashMap<String, Duration>>,
178    pub parsing_times: Arc<dashmap::DashMap<String, Duration>>,
179
180    // Exponential moving average metrics for accurate speed calculations
181    #[serde(skip)]
182    requests_sent_ema: ExpMovingAverage,
183    #[serde(skip)]
184    responses_received_ema: ExpMovingAverage,
185    #[serde(skip)]
186    items_scraped_ema: ExpMovingAverage,
187}
188
189impl StatCollector {
190    /// Creates a new `StatCollector` with all counters initialized to zero.
191    pub(crate) fn new() -> Self {
192        StatCollector {
193            start_time: Instant::now(),
194            requests_enqueued: AtomicUsize::new(0),
195            requests_sent: AtomicUsize::new(0),
196            requests_succeeded: AtomicUsize::new(0),
197            requests_failed: AtomicUsize::new(0),
198            requests_retried: AtomicUsize::new(0),
199            requests_dropped: AtomicUsize::new(0),
200            responses_received: AtomicUsize::new(0),
201            responses_from_cache: AtomicUsize::new(0),
202            response_status_counts: Arc::new(dashmap::DashMap::new()),
203            total_bytes_downloaded: AtomicUsize::new(0),
204            items_scraped: AtomicUsize::new(0),
205            items_processed: AtomicUsize::new(0),
206            items_dropped_by_pipeline: AtomicUsize::new(0),
207            request_times: Arc::new(dashmap::DashMap::new()),
208            parsing_times: Arc::new(dashmap::DashMap::new()),
209            // Initialize exponential moving averages for recent speed calculations (alpha = 0.2 for good balance)
210            requests_sent_ema: ExpMovingAverage::new(0.2),
211            responses_received_ema: ExpMovingAverage::new(0.2),
212            items_scraped_ema: ExpMovingAverage::new(0.2),
213        }
214    }
215
216    /// Creates a snapshot of the current statistics.
217    /// This is the single source of truth for all presentation logic.
218    fn snapshot(&self) -> StatsSnapshot {
219        let mut status_counts: HashMap<u16, usize> = HashMap::new();
220        for entry in self.response_status_counts.iter() {
221            let (key, value) = entry.pair();
222            status_counts.insert(*key, *value);
223        }
224
225        // Get recent rates from exponential moving averages
226        let recent_requests_per_second = self.requests_sent_ema.get_rate();
227        let recent_responses_per_second = self.responses_received_ema.get_rate();
228        let recent_items_per_second = self.items_scraped_ema.get_rate();
229
230        StatsSnapshot {
231            requests_enqueued: self.requests_enqueued.load(Ordering::SeqCst),
232            requests_sent: self.requests_sent.load(Ordering::SeqCst),
233            requests_succeeded: self.requests_succeeded.load(Ordering::SeqCst),
234            requests_failed: self.requests_failed.load(Ordering::SeqCst),
235            requests_retried: self.requests_retried.load(Ordering::SeqCst),
236            requests_dropped: self.requests_dropped.load(Ordering::SeqCst),
237            responses_received: self.responses_received.load(Ordering::SeqCst),
238            responses_from_cache: self.responses_from_cache.load(Ordering::SeqCst),
239            total_bytes_downloaded: self.total_bytes_downloaded.load(Ordering::SeqCst),
240            items_scraped: self.items_scraped.load(Ordering::SeqCst),
241            items_processed: self.items_processed.load(Ordering::SeqCst),
242            items_dropped_by_pipeline: self.items_dropped_by_pipeline.load(Ordering::SeqCst),
243            response_status_counts: status_counts,
244            elapsed_duration: self.start_time.elapsed(),
245            average_request_time: self.average_request_time(),
246            fastest_request_time: self.fastest_request_time(),
247            slowest_request_time: self.slowest_request_time(),
248            request_time_count: self.request_time_count(),
249            average_parsing_time: self.average_parsing_time(),
250            fastest_parsing_time: self.fastest_parsing_time(),
251            slowest_parsing_time: self.slowest_parsing_time(),
252            parsing_time_count: self.parsing_time_count(),
253
254            // Recent rates from sliding windows
255            recent_requests_per_second,
256            recent_responses_per_second,
257            recent_items_per_second,
258        }
259    }
260
261    /// Increments the count of enqueued requests.
262    pub(crate) fn increment_requests_enqueued(&self) {
263        self.requests_enqueued.fetch_add(1, Ordering::SeqCst);
264    }
265
266    /// Increments the count of sent requests.
267    pub(crate) fn increment_requests_sent(&self) {
268        self.requests_sent.fetch_add(1, Ordering::SeqCst);
269        // Update the EMA with a count of 1 for this event
270        self.requests_sent_ema.update(1);
271    }
272
273    /// Increments the count of successful requests.
274    pub(crate) fn increment_requests_succeeded(&self) {
275        self.requests_succeeded.fetch_add(1, Ordering::SeqCst);
276    }
277
278    /// Increments the count of failed requests.
279    pub(crate) fn increment_requests_failed(&self) {
280        self.requests_failed.fetch_add(1, Ordering::SeqCst);
281    }
282
283    /// Increments the count of retried requests.
284    pub(crate) fn increment_requests_retried(&self) {
285        self.requests_retried.fetch_add(1, Ordering::SeqCst);
286    }
287
288    /// Increments the count of dropped requests.
289    pub(crate) fn increment_requests_dropped(&self) {
290        self.requests_dropped.fetch_add(1, Ordering::SeqCst);
291    }
292
293    /// Increments the count of received responses.
294    pub(crate) fn increment_responses_received(&self) {
295        self.responses_received.fetch_add(1, Ordering::SeqCst);
296        // Update the EMA with a count of 1 for this event
297        self.responses_received_ema.update(1);
298    }
299
300    /// Increments the count of responses served from cache.
301    pub(crate) fn increment_responses_from_cache(&self) {
302        self.responses_from_cache.fetch_add(1, Ordering::SeqCst);
303    }
304
305    /// Records a response status code.
306    pub(crate) fn record_response_status(&self, status_code: u16) {
307        *self.response_status_counts.entry(status_code).or_insert(0) += 1;
308    }
309
310    /// Adds to the total bytes downloaded.
311    pub(crate) fn add_bytes_downloaded(&self, bytes: usize) {
312        self.total_bytes_downloaded
313            .fetch_add(bytes, Ordering::SeqCst);
314    }
315
316    /// Increments the count of scraped items.
317    pub(crate) fn increment_items_scraped(&self) {
318        self.items_scraped.fetch_add(1, Ordering::SeqCst);
319        // Update the EMA with a count of 1 for this event
320        self.items_scraped_ema.update(1);
321    }
322
323    /// Increments the count of processed items.
324    pub(crate) fn increment_items_processed(&self) {
325        self.items_processed.fetch_add(1, Ordering::SeqCst);
326    }
327
328    /// Increments the count of items dropped by pipelines.
329    pub(crate) fn increment_items_dropped_by_pipeline(&self) {
330        self.items_dropped_by_pipeline
331            .fetch_add(1, Ordering::SeqCst);
332    }
333
334    /// Records the time taken for a request.
335    pub fn record_request_time(&self, url: &str, duration: Duration) {
336        self.request_times.insert(url.to_string(), duration);
337    }
338
339    /// Calculates the average request time across all recorded requests.
340    pub fn average_request_time(&self) -> Option<Duration> {
341        let times: Vec<Duration> = self
342            .request_times
343            .iter()
344            .map(|entry| *entry.value())
345            .collect();
346        if times.is_empty() {
347            None
348        } else {
349            let total_nanos: u128 = times.iter().map(|d| d.as_nanos()).sum();
350            let avg_nanos = total_nanos / times.len() as u128;
351            Some(Duration::from_nanos(avg_nanos as u64))
352        }
353    }
354
355    /// Gets the fastest request time among all recorded requests.
356    pub fn fastest_request_time(&self) -> Option<Duration> {
357        self.request_times.iter().map(|entry| *entry.value()).min()
358    }
359
360    /// Gets the slowest request time among all recorded requests.
361    pub fn slowest_request_time(&self) -> Option<Duration> {
362        self.request_times.iter().map(|entry| *entry.value()).max()
363    }
364
365    /// Gets the total number of recorded request times.
366    pub fn request_time_count(&self) -> usize {
367        self.request_times.len()
368    }
369
370    /// Gets the request time for a specific URL.
371    pub fn get_request_time(&self, url: &str) -> Option<Duration> {
372        self.request_times
373            .get(url)
374            .map(|duration| *duration.value())
375    }
376
377    /// Gets all recorded request times as a vector of (URL, Duration) pairs.
378    pub fn get_all_request_times(&self) -> Vec<(String, Duration)> {
379        self.request_times
380            .iter()
381            .map(|entry| (entry.key().clone(), *entry.value()))
382            .collect()
383    }
384
385    /// Records the time taken for parsing a response.
386    pub fn record_parsing_time(&self, duration: Duration) {
387        let id = format!(
388            "parse_{}",
389            std::time::SystemTime::now()
390                .duration_since(std::time::UNIX_EPOCH)
391                .unwrap()
392                .as_nanos()
393        );
394        self.parsing_times.insert(id, duration);
395    }
396
397    /// Calculates the average parsing time across all recorded parses.
398    pub fn average_parsing_time(&self) -> Option<Duration> {
399        let times: Vec<Duration> = self
400            .parsing_times
401            .iter()
402            .map(|entry| *entry.value())
403            .collect();
404        if times.is_empty() {
405            None
406        } else {
407            let total_nanos: u128 = times.iter().map(|d| d.as_nanos()).sum();
408            let avg_nanos = total_nanos / times.len() as u128;
409            Some(Duration::from_nanos(avg_nanos as u64))
410        }
411    }
412
413    /// Gets the fastest parsing time among all recorded parses.
414    pub fn fastest_parsing_time(&self) -> Option<Duration> {
415        self.parsing_times.iter().map(|entry| *entry.value()).min()
416    }
417
418    /// Gets the slowest parsing time among all recorded parses.
419    pub fn slowest_parsing_time(&self) -> Option<Duration> {
420        self.parsing_times.iter().map(|entry| *entry.value()).max()
421    }
422
423    /// Gets the total number of recorded parsing times.
424    pub fn parsing_time_count(&self) -> usize {
425        self.parsing_times.len()
426    }
427
428    /// Clears all recorded request times.
429    pub fn clear_request_times(&self) {
430        self.request_times.clear();
431    }
432
433    /// Converts the snapshot into a JSON string.
434    pub fn to_json_string(&self) -> Result<String, SpiderError> {
435        Ok(serde_json::to_string(self)?)
436    }
437
438    /// Converts the snapshot into a pretty-printed JSON string.
439    pub fn to_json_string_pretty(&self) -> Result<String, SpiderError> {
440        Ok(serde_json::to_string_pretty(self)?)
441    }
442
443    /// Exports the current statistics to a Markdown formatted string.
444    pub fn to_markdown_string(&self) -> String {
445        let snapshot = self.snapshot();
446
447        let status_codes_list: String = snapshot
448            .response_status_counts
449            .iter()
450            .map(|(code, count)| format!("- **{}**: {}", code, count))
451            .collect::<Vec<String>>()
452            .join("\n");
453        let status_codes_output = if status_codes_list.is_empty() {
454            "N/A".to_string()
455        } else {
456            status_codes_list
457        };
458
459        format!(
460            r#"# Crawl Statistics Report
461
462- **Duration**: {}
463- **Current Rate** (last 10s): {:.2} req/s, {:.2} resp/s, {:.2} item/s
464- **Overall Rate** (total): {:.2} req/s, {:.2} resp/s, {:.2} item/s
465
466## Requests
467| Metric     | Count |
468|------------|-------|
469| Enqueued   | {}     |
470| Sent       | {}     |
471| Succeeded  | {}     |
472| Failed     | {}     |
473| Retried    | {}     |
474| Dropped    | {}     |
475
476## Responses
477| Metric     | Count |
478|------------|-------|
479| Received   | {}     |
480 From Cache | {}     |
481| Downloaded | {}     |
482
483## Items
484| Metric     | Count |
485|------------|--------|
486| Scraped    | {}     |
487| Processed  | {}     |
488| Dropped    | {}     |
489
490## Request Times
491| Metric           | Value      |
492|------------------|------------|
493| Average Time     | {}         |
494| Fastest Request  | {}         |
495| Slowest Request  | {}         |
496| Total Recorded   | {}         |
497
498## Parsing Times
499| Metric           | Value      |
500|------------------|------------|
501| Average Time     | {}         |
502| Fastest Parse    | {}         |
503| Slowest Parse    | {}         |
504| Total Recorded   | {}         |
505
506## Status Codes
507{}
508"#,
509            snapshot.formatted_duration(),
510            snapshot.requests_per_second(),
511            snapshot.responses_per_second(),
512            snapshot.items_per_second(),
513            // Calculate cumulative speeds for comparison
514            {
515                let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
516                if total_seconds > 0.0 {
517                    snapshot.requests_sent as f64 / total_seconds
518                } else {
519                    0.0
520                }
521            },
522            {
523                let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
524                if total_seconds > 0.0 {
525                    snapshot.responses_received as f64 / total_seconds
526                } else {
527                    0.0
528                }
529            },
530            {
531                let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
532                if total_seconds > 0.0 {
533                    snapshot.items_scraped as f64 / total_seconds
534                } else {
535                    0.0
536                }
537            },
538            snapshot.requests_enqueued,
539            snapshot.requests_sent,
540            snapshot.requests_succeeded,
541            snapshot.requests_failed,
542            snapshot.requests_retried,
543            snapshot.requests_dropped,
544            snapshot.responses_received,
545            snapshot.responses_from_cache,
546            snapshot.formatted_bytes(),
547            snapshot.items_scraped,
548            snapshot.items_processed,
549            snapshot.items_dropped_by_pipeline,
550            snapshot.formatted_request_time(snapshot.average_request_time),
551            snapshot.formatted_request_time(snapshot.fastest_request_time),
552            snapshot.formatted_request_time(snapshot.slowest_request_time),
553            snapshot.request_time_count,
554            snapshot.formatted_request_time(snapshot.average_parsing_time),
555            snapshot.formatted_request_time(snapshot.fastest_parsing_time),
556            snapshot.formatted_request_time(snapshot.slowest_parsing_time),
557            snapshot.parsing_time_count,
558            status_codes_output
559        )
560    }
561}
562
563impl Default for StatCollector {
564    fn default() -> Self {
565        Self::new()
566    }
567}
568
569impl std::fmt::Display for StatCollector {
570    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
571        let snapshot = self.snapshot();
572
573        writeln!(f, "\nCrawl Statistics")?;
574        writeln!(f, "----------------")?;
575        writeln!(f, "duration : {}", snapshot.formatted_duration())?;
576        writeln!(
577            f,
578            "speed    : req/s: {:.2}, resp/s: {:.2}, item/s: {:.2}",
579            snapshot.recent_requests_per_second,
580            snapshot.recent_responses_per_second,
581            snapshot.recent_items_per_second
582        )?;
583        writeln!(
584            f,
585            "requests : enqueued: {}, sent: {}, ok: {}, fail: {}, retry: {}, drop: {}",
586            snapshot.requests_enqueued,
587            snapshot.requests_sent,
588            snapshot.requests_succeeded,
589            snapshot.requests_failed,
590            snapshot.requests_retried,
591            snapshot.requests_dropped
592        )?;
593        writeln!(
594            f,
595            "response : received: {}, from_cache: {}, downloaded: {}",
596            snapshot.responses_received,
597            snapshot.responses_from_cache,
598            snapshot.formatted_bytes()
599        )?;
600        writeln!(
601            f,
602            "items    : scraped: {}, processed: {}, dropped: {}",
603            snapshot.items_scraped, snapshot.items_processed, snapshot.items_dropped_by_pipeline
604        )?;
605        writeln!(
606            f,
607            "req time : avg: {}, fastest: {}, slowest: {}, total: {}",
608            snapshot.formatted_request_time(snapshot.average_request_time),
609            snapshot.formatted_request_time(snapshot.fastest_request_time),
610            snapshot.formatted_request_time(snapshot.slowest_request_time),
611            snapshot.request_time_count
612        )?;
613        writeln!(
614            f,
615            "parsing  : avg: {}, fastest: {}, slowest: {}, total: {}",
616            snapshot.formatted_request_time(snapshot.average_parsing_time),
617            snapshot.formatted_request_time(snapshot.fastest_parsing_time),
618            snapshot.formatted_request_time(snapshot.slowest_parsing_time),
619            snapshot.parsing_time_count
620        )?;
621
622        let status_string = if snapshot.response_status_counts.is_empty() {
623            "none".to_string()
624        } else {
625            snapshot
626                .response_status_counts
627                .iter()
628                .map(|(code, count)| format!("{}: {}", code, count))
629                .collect::<Vec<String>>()
630                .join(", ")
631        };
632
633        writeln!(f, "status   : {}\n", status_string)
634    }
635}