Skip to main content

spider_core/
stats.rs

1//! # Statistics Module
2//!
3//! Collects and stores various metrics and statistics about the crawler's operation.
4//!
5//! ## Overview
6//!
7//! The `StatCollector` tracks important metrics throughout the crawling process,
8//! including request counts, response statistics, item processing metrics, and
9//! performance indicators. This data is essential for monitoring crawl progress,
10//! diagnosing issues, and optimizing performance.
11//!
12//! ## Key Metrics Tracked
13//!
14//! - **Request Metrics**: Enqueued, sent, succeeded, failed, retried, and dropped requests
15//! - **Response Metrics**: Received, cached, and status code distributions
16//! - **Item Metrics**: Scraped, processed, and dropped items
17//! - **Performance Metrics**: Throughput, response times, and bandwidth usage
18//! - **Timing Metrics**: Elapsed time and processing rates
19//!
20//! ## Features
21//!
22//! - **Thread-Safe**: Uses atomic operations for concurrent metric updates
23//! - **Real-Time Monitoring**: Provides live statistics during crawling
24//! - **Export Formats**: Supports JSON and Markdown export formats
25//! - **Snapshot Capability**: Captures consistent state for reporting
26//!
27//! ## Example
28//!
29//! ```rust,ignore
30//! use spider_core::StatCollector;
31//!
32//! let stats = StatCollector::new();
33//!
34//! // During crawling, metrics are automatically updated
35//! stats.increment_requests_sent();
36//! stats.increment_items_scraped();
37//!
38//! // Export statistics in various formats
39//! println!("{}", stats.to_json_string_pretty().unwrap());
40//! println!("{}", stats.to_markdown_string());
41//! ```
42
43use spider_util::error::SpiderError;
44use spider_util::metrics::ExpMovingAverage;
45use std::{
46    collections::HashMap,
47    sync::{
48        Arc,
49        atomic::{AtomicUsize, Ordering},
50    },
51    time::{Duration, Instant},
52};
53
54// A snapshot of the current statistics, used for reporting.
55// This avoids code duplication in the various export/display methods.
56struct StatsSnapshot {
57    requests_enqueued: usize,
58    requests_sent: usize,
59    requests_succeeded: usize,
60    requests_failed: usize,
61    requests_retried: usize,
62    requests_dropped: usize,
63    responses_received: usize,
64    responses_from_cache: usize,
65    total_bytes_downloaded: usize,
66    items_scraped: usize,
67    items_processed: usize,
68    items_dropped_by_pipeline: usize,
69    response_status_counts: HashMap<u16, usize>,
70    elapsed_duration: Duration,
71    average_request_time: Option<Duration>,
72    fastest_request_time: Option<Duration>,
73    slowest_request_time: Option<Duration>,
74    request_time_count: usize,
75    average_parsing_time: Option<Duration>,
76    fastest_parsing_time: Option<Duration>,
77    slowest_parsing_time: Option<Duration>,
78    parsing_time_count: usize,
79
80    // Recent rates from sliding windows
81    recent_requests_per_second: f64,
82    recent_responses_per_second: f64,
83    recent_items_per_second: f64,
84}
85
86impl StatsSnapshot {
87    fn formatted_duration(&self) -> String {
88        format!("{:?}", self.elapsed_duration)
89    }
90
91    fn formatted_request_time(&self, duration: Option<Duration>) -> String {
92        match duration {
93            Some(d) => {
94                if d.as_millis() < 1000 {
95                    format!("{} ms", d.as_millis())
96                } else {
97                    format!("{:.2} s", d.as_secs_f64())
98                }
99            }
100            None => "N/A".to_string(),
101        }
102    }
103
104    fn requests_per_second(&self) -> f64 {
105        let elapsed = self.elapsed_duration.as_secs_f64();
106        if elapsed > 0.0 {
107            self.requests_sent as f64 / elapsed
108        } else {
109            0.0
110        }
111    }
112
113    fn responses_per_second(&self) -> f64 {
114        let elapsed = self.elapsed_duration.as_secs_f64();
115        if elapsed > 0.0 {
116            self.responses_received as f64 / elapsed
117        } else {
118            0.0
119        }
120    }
121
122    fn items_per_second(&self) -> f64 {
123        let elapsed = self.elapsed_duration.as_secs_f64();
124        if elapsed > 0.0 {
125            self.items_scraped as f64 / elapsed
126        } else {
127            0.0
128        }
129    }
130
131    fn formatted_bytes(&self) -> String {
132        const KB: usize = 1024;
133        const MB: usize = 1024 * KB;
134        const GB: usize = 1024 * MB;
135
136        if self.total_bytes_downloaded >= GB {
137            format!("{:.2} GB", self.total_bytes_downloaded as f64 / GB as f64)
138        } else if self.total_bytes_downloaded >= MB {
139            format!("{:.2} MB", self.total_bytes_downloaded as f64 / MB as f64)
140        } else if self.total_bytes_downloaded >= KB {
141            format!("{:.2} KB", self.total_bytes_downloaded as f64 / KB as f64)
142        } else {
143            format!("{} B", self.total_bytes_downloaded)
144        }
145    }
146}
147
148/// Collects and stores various statistics about the crawler's operation.
149#[derive(Debug, serde::Serialize)]
150pub struct StatCollector {
151    // Crawl-related metrics
152    #[serde(skip)]
153    pub start_time: Instant,
154
155    // Request-related metrics
156    pub requests_enqueued: AtomicUsize,
157    pub requests_sent: AtomicUsize,
158    pub requests_succeeded: AtomicUsize,
159    pub requests_failed: AtomicUsize,
160    pub requests_retried: AtomicUsize,
161    pub requests_dropped: AtomicUsize,
162
163    // Response-related metrics
164    pub responses_received: AtomicUsize,
165    pub responses_from_cache: AtomicUsize,
166    pub response_status_counts: Arc<dashmap::DashMap<u16, usize>>, // e.g., 200, 404, 500
167    pub total_bytes_downloaded: AtomicUsize,
168
169    // Add more advanced response time metrics if needed (e.g., histograms)
170
171    // Item-related metrics
172    pub items_scraped: AtomicUsize,
173    pub items_processed: AtomicUsize,
174    pub items_dropped_by_pipeline: AtomicUsize,
175
176    // Timing metrics
177    pub request_times: Arc<dashmap::DashMap<String, Duration>>,
178    pub parsing_times: Arc<dashmap::DashMap<String, Duration>>,
179
180    // Exponential moving average metrics for accurate speed calculations
181    #[serde(skip)]
182    requests_sent_ema: ExpMovingAverage,
183    #[serde(skip)]
184    responses_received_ema: ExpMovingAverage,
185    #[serde(skip)]
186    items_scraped_ema: ExpMovingAverage,
187}
188
189impl StatCollector {
190    /// Creates a new `StatCollector` with all counters initialized to zero.
191    pub(crate) fn new() -> Self {
192        StatCollector {
193            start_time: Instant::now(),
194            requests_enqueued: AtomicUsize::new(0),
195            requests_sent: AtomicUsize::new(0),
196            requests_succeeded: AtomicUsize::new(0),
197            requests_failed: AtomicUsize::new(0),
198            requests_retried: AtomicUsize::new(0),
199            requests_dropped: AtomicUsize::new(0),
200            responses_received: AtomicUsize::new(0),
201            responses_from_cache: AtomicUsize::new(0),
202            response_status_counts: Arc::new(dashmap::DashMap::new()),
203            total_bytes_downloaded: AtomicUsize::new(0),
204            items_scraped: AtomicUsize::new(0),
205            items_processed: AtomicUsize::new(0),
206            items_dropped_by_pipeline: AtomicUsize::new(0),
207            request_times: Arc::new(dashmap::DashMap::new()),
208            parsing_times: Arc::new(dashmap::DashMap::new()),
209            // Initialize exponential moving averages for recent speed calculations (alpha = 0.2 for good balance)
210            requests_sent_ema: ExpMovingAverage::new(0.2),
211            responses_received_ema: ExpMovingAverage::new(0.2),
212            items_scraped_ema: ExpMovingAverage::new(0.2),
213        }
214    }
215
216    /// Creates a snapshot of the current statistics.
217    /// This is the single source of truth for all presentation logic.
218    fn snapshot(&self) -> StatsSnapshot {
219        let mut status_counts: HashMap<u16, usize> = HashMap::new();
220        for entry in self.response_status_counts.iter() {
221            let (key, value) = entry.pair();
222            status_counts.insert(*key, *value);
223        }
224
225        // Get recent rates from exponential moving averages
226        let recent_requests_per_second = self.requests_sent_ema.get_rate();
227        let recent_responses_per_second = self.responses_received_ema.get_rate();
228        let recent_items_per_second = self.items_scraped_ema.get_rate();
229
230        StatsSnapshot {
231            requests_enqueued: self.requests_enqueued.load(Ordering::SeqCst),
232            requests_sent: self.requests_sent.load(Ordering::SeqCst),
233            requests_succeeded: self.requests_succeeded.load(Ordering::SeqCst),
234            requests_failed: self.requests_failed.load(Ordering::SeqCst),
235            requests_retried: self.requests_retried.load(Ordering::SeqCst),
236            requests_dropped: self.requests_dropped.load(Ordering::SeqCst),
237            responses_received: self.responses_received.load(Ordering::SeqCst),
238            responses_from_cache: self.responses_from_cache.load(Ordering::SeqCst),
239            total_bytes_downloaded: self.total_bytes_downloaded.load(Ordering::SeqCst),
240            items_scraped: self.items_scraped.load(Ordering::SeqCst),
241            items_processed: self.items_processed.load(Ordering::SeqCst),
242            items_dropped_by_pipeline: self.items_dropped_by_pipeline.load(Ordering::SeqCst),
243            response_status_counts: status_counts,
244            elapsed_duration: self.start_time.elapsed(),
245            average_request_time: self.average_request_time(),
246            fastest_request_time: self.fastest_request_time(),
247            slowest_request_time: self.slowest_request_time(),
248            request_time_count: self.request_time_count(),
249            average_parsing_time: self.average_parsing_time(),
250            fastest_parsing_time: self.fastest_parsing_time(),
251            slowest_parsing_time: self.slowest_parsing_time(),
252            parsing_time_count: self.parsing_time_count(),
253
254            // Recent rates from sliding windows
255            recent_requests_per_second,
256            recent_responses_per_second,
257            recent_items_per_second,
258        }
259    }
260
261    /// Increments the count of enqueued requests.
262    pub(crate) fn increment_requests_enqueued(&self) {
263        self.requests_enqueued.fetch_add(1, Ordering::SeqCst);
264    }
265
266    /// Increments the count of sent requests.
267    pub(crate) fn increment_requests_sent(&self) {
268        self.requests_sent.fetch_add(1, Ordering::SeqCst);
269        // Update the EMA with a count of 1 for this event
270        self.requests_sent_ema.update(1);
271    }
272
273    /// Increments the count of successful requests.
274    pub(crate) fn increment_requests_succeeded(&self) {
275        self.requests_succeeded.fetch_add(1, Ordering::SeqCst);
276    }
277
278    /// Increments the count of failed requests.
279    pub(crate) fn increment_requests_failed(&self) {
280        self.requests_failed.fetch_add(1, Ordering::SeqCst);
281    }
282
283    /// Increments the count of retried requests.
284    pub(crate) fn increment_requests_retried(&self) {
285        self.requests_retried.fetch_add(1, Ordering::SeqCst);
286    }
287
288    /// Increments the count of dropped requests.
289    pub(crate) fn increment_requests_dropped(&self) {
290        self.requests_dropped.fetch_add(1, Ordering::SeqCst);
291    }
292
293    /// Increments the count of received responses.
294    pub(crate) fn increment_responses_received(&self) {
295        self.responses_received.fetch_add(1, Ordering::SeqCst);
296        // Update the EMA with a count of 1 for this event
297        self.responses_received_ema.update(1);
298    }
299
300    /// Increments the count of responses served from cache.
301    pub(crate) fn increment_responses_from_cache(&self) {
302        self.responses_from_cache.fetch_add(1, Ordering::SeqCst);
303    }
304
305    /// Records a response status code.
306    pub(crate) fn record_response_status(&self, status_code: u16) {
307        *self.response_status_counts.entry(status_code).or_insert(0) += 1;
308    }
309
310    /// Adds to the total bytes downloaded.
311    pub(crate) fn add_bytes_downloaded(&self, bytes: usize) {
312        self.total_bytes_downloaded
313            .fetch_add(bytes, Ordering::SeqCst);
314    }
315
316    /// Increments the count of scraped items.
317    pub(crate) fn increment_items_scraped(&self) {
318        self.items_scraped.fetch_add(1, Ordering::SeqCst);
319        // Update the EMA with a count of 1 for this event
320        self.items_scraped_ema.update(1);
321    }
322
323    /// Increments the count of processed items.
324    pub(crate) fn increment_items_processed(&self) {
325        self.items_processed.fetch_add(1, Ordering::SeqCst);
326    }
327
328    /// Increments the count of items dropped by pipelines.
329    pub(crate) fn increment_items_dropped_by_pipeline(&self) {
330        self.items_dropped_by_pipeline
331            .fetch_add(1, Ordering::SeqCst);
332    }
333
334    /// Records the time taken for a request.
335    pub fn record_request_time(&self, url: &str, duration: Duration) {
336        self.request_times.insert(url.to_string(), duration);
337    }
338
339    /// Calculates the average request time across all recorded requests.
340    pub fn average_request_time(&self) -> Option<Duration> {
341        let times: Vec<Duration> = self
342            .request_times
343            .iter()
344            .map(|entry| *entry.value())
345            .collect();
346        if times.is_empty() {
347            None
348        } else {
349            let total_nanos: u128 = times.iter().map(|d| d.as_nanos()).sum();
350            let avg_nanos = total_nanos / times.len() as u128;
351            Some(Duration::from_nanos(avg_nanos as u64))
352        }
353    }
354
355    /// Gets the fastest request time among all recorded requests.
356    pub fn fastest_request_time(&self) -> Option<Duration> {
357        self.request_times.iter().map(|entry| *entry.value()).min()
358    }
359
360    /// Gets the slowest request time among all recorded requests.
361    pub fn slowest_request_time(&self) -> Option<Duration> {
362        self.request_times.iter().map(|entry| *entry.value()).max()
363    }
364
365    /// Gets the total number of recorded request times.
366    pub fn request_time_count(&self) -> usize {
367        self.request_times.len()
368    }
369
370    /// Gets the request time for a specific URL.
371    pub fn get_request_time(&self, url: &str) -> Option<Duration> {
372        self.request_times
373            .get(url)
374            .map(|duration| *duration.value())
375    }
376
377    /// Gets all recorded request times as a vector of (URL, Duration) pairs.
378    pub fn get_all_request_times(&self) -> Vec<(String, Duration)> {
379        self.request_times
380            .iter()
381            .map(|entry| (entry.key().clone(), *entry.value()))
382            .collect()
383    }
384
385    /// Records the time taken for parsing a response.
386    pub fn record_parsing_time(&self, duration: Duration) {
387        let id = format!("parse_{}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos());
388        self.parsing_times.insert(id, duration);
389    }
390
391    /// Calculates the average parsing time across all recorded parses.
392    pub fn average_parsing_time(&self) -> Option<Duration> {
393        let times: Vec<Duration> = self
394            .parsing_times
395            .iter()
396            .map(|entry| *entry.value())
397            .collect();
398        if times.is_empty() {
399            None
400        } else {
401            let total_nanos: u128 = times.iter().map(|d| d.as_nanos()).sum();
402            let avg_nanos = total_nanos / times.len() as u128;
403            Some(Duration::from_nanos(avg_nanos as u64))
404        }
405    }
406
407    /// Gets the fastest parsing time among all recorded parses.
408    pub fn fastest_parsing_time(&self) -> Option<Duration> {
409        self.parsing_times.iter().map(|entry| *entry.value()).min()
410    }
411
412    /// Gets the slowest parsing time among all recorded parses.
413    pub fn slowest_parsing_time(&self) -> Option<Duration> {
414        self.parsing_times.iter().map(|entry| *entry.value()).max()
415    }
416
417    /// Gets the total number of recorded parsing times.
418    pub fn parsing_time_count(&self) -> usize {
419        self.parsing_times.len()
420    }
421
422    /// Clears all recorded request times.
423    pub fn clear_request_times(&self) {
424        self.request_times.clear();
425    }
426
427    /// Converts the snapshot into a JSON string.
428    pub fn to_json_string(&self) -> Result<String, SpiderError> {
429        Ok(serde_json::to_string(self)?)
430    }
431
432    /// Converts the snapshot into a pretty-printed JSON string.
433    pub fn to_json_string_pretty(&self) -> Result<String, SpiderError> {
434        Ok(serde_json::to_string_pretty(self)?)
435    }
436
437    /// Exports the current statistics to a Markdown formatted string.
438    pub fn to_markdown_string(&self) -> String {
439        let snapshot = self.snapshot();
440
441        let status_codes_list: String = snapshot
442            .response_status_counts
443            .iter()
444            .map(|(code, count)| format!("- **{}**: {}", code, count))
445            .collect::<Vec<String>>()
446            .join("\n");
447        let status_codes_output = if status_codes_list.is_empty() {
448            "N/A".to_string()
449        } else {
450            status_codes_list
451        };
452
453        format!(
454            r#"# Crawl Statistics Report
455
456- **Duration**: {}
457- **Current Rate** (last 10s): {:.2} req/s, {:.2} resp/s, {:.2} item/s
458- **Overall Rate** (total): {:.2} req/s, {:.2} resp/s, {:.2} item/s
459
460## Requests
461| Metric     | Count |
462|------------|-------|
463| Enqueued   | {}     |
464| Sent       | {}     |
465| Succeeded  | {}     |
466| Failed     | {}     |
467| Retried    | {}     |
468| Dropped    | {}     |
469
470## Responses
471| Metric     | Count |
472|------------|-------|
473| Received   | {}     |
474 From Cache | {}     |
475| Downloaded | {}     |
476
477## Items
478| Metric     | Count |
479|------------|--------|
480| Scraped    | {}     |
481| Processed  | {}     |
482| Dropped    | {}     |
483
484## Request Times
485| Metric           | Value      |
486|------------------|------------|
487| Average Time     | {}         |
488| Fastest Request  | {}         |
489| Slowest Request  | {}         |
490| Total Recorded   | {}         |
491
492## Parsing Times
493| Metric           | Value      |
494|------------------|------------|
495| Average Time     | {}         |
496| Fastest Parse    | {}         |
497| Slowest Parse    | {}         |
498| Total Recorded   | {}         |
499
500## Status Codes
501{}
502"#,
503            snapshot.formatted_duration(),
504            snapshot.requests_per_second(),
505            snapshot.responses_per_second(),
506            snapshot.items_per_second(),
507            // Calculate cumulative speeds for comparison
508            {
509                let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
510                if total_seconds > 0.0 { snapshot.requests_sent as f64 / total_seconds } else { 0.0 }
511            },
512            {
513                let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
514                if total_seconds > 0.0 { snapshot.responses_received as f64 / total_seconds } else { 0.0 }
515            },
516            {
517                let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
518                if total_seconds > 0.0 { snapshot.items_scraped as f64 / total_seconds } else { 0.0 }
519            },
520            snapshot.requests_enqueued,
521            snapshot.requests_sent,
522            snapshot.requests_succeeded,
523            snapshot.requests_failed,
524            snapshot.requests_retried,
525            snapshot.requests_dropped,
526            snapshot.responses_received,
527            snapshot.responses_from_cache,
528            snapshot.formatted_bytes(),
529            snapshot.items_scraped,
530            snapshot.items_processed,
531            snapshot.items_dropped_by_pipeline,
532            snapshot.formatted_request_time(snapshot.average_request_time),
533            snapshot.formatted_request_time(snapshot.fastest_request_time),
534            snapshot.formatted_request_time(snapshot.slowest_request_time),
535            snapshot.request_time_count,
536            snapshot.formatted_request_time(snapshot.average_parsing_time),
537            snapshot.formatted_request_time(snapshot.fastest_parsing_time),
538            snapshot.formatted_request_time(snapshot.slowest_parsing_time),
539            snapshot.parsing_time_count,
540            status_codes_output
541        )
542    }
543}
544
545impl Default for StatCollector {
546    fn default() -> Self {
547        Self::new()
548    }
549}
550
551impl std::fmt::Display for StatCollector {
552    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
553        let snapshot = self.snapshot();
554
555        writeln!(f, "\nCrawl Statistics")?;
556        writeln!(f, "----------------")?;
557        writeln!(f, "  duration : {}", snapshot.formatted_duration())?;
558        writeln!(
559            f,
560            "  speed    : req/s: {:.2}, resp/s: {:.2}, item/s: {:.2}",
561            snapshot.recent_requests_per_second,
562            snapshot.recent_responses_per_second,
563            snapshot.recent_items_per_second
564        )?;
565        writeln!(
566            f,
567            "  requests : enqueued: {}, sent: {}, ok: {}, fail: {}, retry: {}, drop: {}",
568            snapshot.requests_enqueued,
569            snapshot.requests_sent,
570            snapshot.requests_succeeded,
571            snapshot.requests_failed,
572            snapshot.requests_retried,
573            snapshot.requests_dropped
574        )?;
575        writeln!(
576            f,
577            "  response : received: {}, from_cache: {}, downloaded: {}",
578            snapshot.responses_received,
579            snapshot.responses_from_cache,
580            snapshot.formatted_bytes()
581        )?;
582        writeln!(
583            f,
584            "  items    : scraped: {}, processed: {}, dropped: {}",
585            snapshot.items_scraped, snapshot.items_processed, snapshot.items_dropped_by_pipeline
586        )?;
587        writeln!(
588            f,
589            "  req time : avg: {}, fastest: {}, slowest: {}, total: {}",
590            snapshot.formatted_request_time(snapshot.average_request_time),
591            snapshot.formatted_request_time(snapshot.fastest_request_time),
592            snapshot.formatted_request_time(snapshot.slowest_request_time),
593            snapshot.request_time_count
594        )?;
595        writeln!(
596            f,
597            "  parsing  : avg: {}, fastest: {}, slowest: {}, total: {}",
598            snapshot.formatted_request_time(snapshot.average_parsing_time),
599            snapshot.formatted_request_time(snapshot.fastest_parsing_time),
600            snapshot.formatted_request_time(snapshot.slowest_parsing_time),
601            snapshot.parsing_time_count
602        )?;
603
604        let status_string = if snapshot.response_status_counts.is_empty() {
605            "none".to_string()
606        } else {
607            snapshot
608                .response_status_counts
609                .iter()
610                .map(|(code, count)| format!("{}: {}", code, count))
611                .collect::<Vec<String>>()
612                .join(", ")
613        };
614
615        writeln!(f, "  status   : {}\n", status_string)
616    }
617}