Skip to main content

spider_core/
stats.rs

1//! # Statistics Module
2//!
3//! Collects and stores various metrics and statistics about the crawler's operation.
4//!
5//! ## Overview
6//!
7//! The [`StatCollector`] tracks important metrics throughout the crawling process,
8//! including request counts, response statistics, item processing metrics, and
9//! performance indicators. This data is essential for monitoring crawl progress,
10//! diagnosing issues, and optimizing performance.
11//!
12//! ## Key Metrics Tracked
13//!
14//! - **Request Metrics**: Enqueued, sent, succeeded, failed, retried, and dropped requests
15//! - **Response Metrics**: Received, cached, and status code distributions
16//! - **Item Metrics**: Scraped, processed, and dropped items
17//! - **Performance Metrics**: Throughput, response times, and bandwidth usage
18//! - **Timing Metrics**: Elapsed time and processing rates
19//!
20//! ## Features
21//!
22//! - **Thread-Safe**: Uses atomic operations for concurrent metric updates
23//! - **Real-Time Monitoring**: Provides live statistics during crawling
24//! - **Export Formats**: Supports JSON and Markdown export formats
25//! - **Snapshot Capability**: Captures consistent state for reporting
26//!
27//! ## Example
28//!
29//! ```rust,ignore
30//! use spider_core::StatCollector;
31//!
32//! let stats = StatCollector::new();
33//!
34//! // During crawling, metrics are automatically updated
35//! stats.increment_requests_sent();
36//! stats.increment_items_scraped();
37//!
38//! // Export statistics in various formats
39//! println!("{}", stats.to_json_string_pretty().unwrap());
40//! println!("{}", stats.to_markdown_string());
41//! ```
42
43use moka::sync::Cache;
44use spider_util::error::SpiderError;
45use spider_util::metrics::ExpMovingAverage;
46use std::{
47    collections::HashMap,
48    sync::{
49        Arc,
50        atomic::{AtomicUsize, Ordering},
51    },
52    time::{Duration, Instant},
53};
54
55// A snapshot of the current statistics, used for reporting.
56// This avoids code duplication in the various export/display methods.
57struct StatsSnapshot {
58    requests_enqueued: usize,
59    requests_sent: usize,
60    requests_succeeded: usize,
61    requests_failed: usize,
62    requests_retried: usize,
63    requests_dropped: usize,
64    responses_received: usize,
65    responses_from_cache: usize,
66    total_bytes_downloaded: usize,
67    items_scraped: usize,
68    items_processed: usize,
69    items_dropped_by_pipeline: usize,
70    response_status_counts: HashMap<u16, usize>,
71    elapsed_duration: Duration,
72    average_request_time: Option<Duration>,
73    fastest_request_time: Option<Duration>,
74    slowest_request_time: Option<Duration>,
75    request_time_count: usize,
76    average_parsing_time: Option<Duration>,
77    fastest_parsing_time: Option<Duration>,
78    slowest_parsing_time: Option<Duration>,
79    parsing_time_count: usize,
80
81    // Recent rates from sliding windows
82    recent_requests_per_second: f64,
83    recent_responses_per_second: f64,
84    recent_items_per_second: f64,
85}
86
87impl StatsSnapshot {
88    fn formatted_duration(&self) -> String {
89        format!("{:?}", self.elapsed_duration)
90    }
91
92    fn formatted_request_time(&self, duration: Option<Duration>) -> String {
93        match duration {
94            Some(d) => {
95                if d.as_millis() < 1000 {
96                    format!("{} ms", d.as_millis())
97                } else {
98                    format!("{:.2} s", d.as_secs_f64())
99                }
100            }
101            None => "N/A".to_string(),
102        }
103    }
104
105    fn requests_per_second(&self) -> f64 {
106        let elapsed = self.elapsed_duration.as_secs_f64();
107        if elapsed > 0.0 {
108            self.requests_sent as f64 / elapsed
109        } else {
110            0.0
111        }
112    }
113
114    fn responses_per_second(&self) -> f64 {
115        let elapsed = self.elapsed_duration.as_secs_f64();
116        if elapsed > 0.0 {
117            self.responses_received as f64 / elapsed
118        } else {
119            0.0
120        }
121    }
122
123    fn items_per_second(&self) -> f64 {
124        let elapsed = self.elapsed_duration.as_secs_f64();
125        if elapsed > 0.0 {
126            self.items_scraped as f64 / elapsed
127        } else {
128            0.0
129        }
130    }
131
132    fn formatted_bytes(&self) -> String {
133        const KB: usize = 1024;
134        const MB: usize = 1024 * KB;
135        const GB: usize = 1024 * MB;
136
137        if self.total_bytes_downloaded >= GB {
138            format!("{:.2} GB", self.total_bytes_downloaded as f64 / GB as f64)
139        } else if self.total_bytes_downloaded >= MB {
140            format!("{:.2} MB", self.total_bytes_downloaded as f64 / MB as f64)
141        } else if self.total_bytes_downloaded >= KB {
142            format!("{:.2} KB", self.total_bytes_downloaded as f64 / KB as f64)
143        } else {
144            format!("{} B", self.total_bytes_downloaded)
145        }
146    }
147}
148
149/// Collects and stores various statistics about the crawler's operation.
150#[derive(Debug, serde::Serialize)]
151pub struct StatCollector {
152    // Crawl-related metrics
153    #[serde(skip)]
154    pub start_time: Instant,
155
156    // Request-related metrics
157    pub requests_enqueued: AtomicUsize,
158    pub requests_sent: AtomicUsize,
159    pub requests_succeeded: AtomicUsize,
160    pub requests_failed: AtomicUsize,
161    pub requests_retried: AtomicUsize,
162    pub requests_dropped: AtomicUsize,
163
164    // Response-related metrics
165    pub responses_received: AtomicUsize,
166    pub responses_from_cache: AtomicUsize,
167    pub response_status_counts: Arc<dashmap::DashMap<u16, usize>>, // e.g., 200, 404, 500
168    pub total_bytes_downloaded: AtomicUsize,
169
170    // Add more advanced response time metrics if needed (e.g., histograms)
171
172    // Item-related metrics
173    pub items_scraped: AtomicUsize,
174    pub items_processed: AtomicUsize,
175    pub items_dropped_by_pipeline: AtomicUsize,
176
177    // Timing metrics - Using bounded LRU caches to prevent memory leaks
178    // Only keeps recent entries (max 10,000 for requests, 1,000 for parsing)
179    #[serde(skip)]
180    pub request_times: Cache<String, Duration>,
181    #[serde(skip)]
182    pub parsing_times: Cache<String, Duration>,
183
184    // Exponential moving average metrics for accurate speed calculations
185    #[serde(skip)]
186    requests_sent_ema: ExpMovingAverage,
187    #[serde(skip)]
188    responses_received_ema: ExpMovingAverage,
189    #[serde(skip)]
190    items_scraped_ema: ExpMovingAverage,
191}
192
193impl StatCollector {
194    /// Creates a new `StatCollector` with all counters initialized to zero.
195    pub(crate) fn new() -> Self {
196        StatCollector {
197            start_time: Instant::now(),
198            requests_enqueued: AtomicUsize::new(0),
199            requests_sent: AtomicUsize::new(0),
200            requests_succeeded: AtomicUsize::new(0),
201            requests_failed: AtomicUsize::new(0),
202            requests_retried: AtomicUsize::new(0),
203            requests_dropped: AtomicUsize::new(0),
204            responses_received: AtomicUsize::new(0),
205            responses_from_cache: AtomicUsize::new(0),
206            response_status_counts: Arc::new(dashmap::DashMap::new()),
207            total_bytes_downloaded: AtomicUsize::new(0),
208            items_scraped: AtomicUsize::new(0),
209            items_processed: AtomicUsize::new(0),
210            items_dropped_by_pipeline: AtomicUsize::new(0),
211            // Use bounded LRU caches to prevent memory leaks
212            // Automatically evicts oldest entries when capacity is reached
213            request_times: Cache::builder()
214                .max_capacity(10_000)
215                .time_to_idle(Duration::from_secs(300)) // 5 minutes TTL
216                .build(),
217            parsing_times: Cache::builder()
218                .max_capacity(1_000)
219                .time_to_idle(Duration::from_secs(60)) // 1 minute TTL
220                .build(),
221            // Initialize exponential moving averages for recent speed calculations (alpha = 0.2 for good balance)
222            requests_sent_ema: ExpMovingAverage::new(0.2),
223            responses_received_ema: ExpMovingAverage::new(0.2),
224            items_scraped_ema: ExpMovingAverage::new(0.2),
225        }
226    }
227
228    /// Creates a snapshot of the current statistics.
229    /// This is the single source of truth for all presentation logic.
230    fn snapshot(&self) -> StatsSnapshot {
231        let mut status_counts: HashMap<u16, usize> = HashMap::new();
232        for entry in self.response_status_counts.iter() {
233            let (key, value) = entry.pair();
234            status_counts.insert(*key, *value);
235        }
236
237        // Get recent rates from exponential moving averages
238        let recent_requests_per_second = self.requests_sent_ema.get_rate();
239        let recent_responses_per_second = self.responses_received_ema.get_rate();
240        let recent_items_per_second = self.items_scraped_ema.get_rate();
241
242        StatsSnapshot {
243            requests_enqueued: self.requests_enqueued.load(Ordering::Acquire),
244            requests_sent: self.requests_sent.load(Ordering::Acquire),
245            requests_succeeded: self.requests_succeeded.load(Ordering::Acquire),
246            requests_failed: self.requests_failed.load(Ordering::Acquire),
247            requests_retried: self.requests_retried.load(Ordering::Acquire),
248            requests_dropped: self.requests_dropped.load(Ordering::Acquire),
249            responses_received: self.responses_received.load(Ordering::Acquire),
250            responses_from_cache: self.responses_from_cache.load(Ordering::Acquire),
251            total_bytes_downloaded: self.total_bytes_downloaded.load(Ordering::Acquire),
252            items_scraped: self.items_scraped.load(Ordering::Acquire),
253            items_processed: self.items_processed.load(Ordering::Acquire),
254            items_dropped_by_pipeline: self.items_dropped_by_pipeline.load(Ordering::Acquire),
255            response_status_counts: status_counts,
256            elapsed_duration: self.start_time.elapsed(),
257            average_request_time: self.average_request_time(),
258            fastest_request_time: self.fastest_request_time(),
259            slowest_request_time: self.slowest_request_time(),
260            request_time_count: self.request_time_count(),
261            average_parsing_time: self.average_parsing_time(),
262            fastest_parsing_time: self.fastest_parsing_time(),
263            slowest_parsing_time: self.slowest_parsing_time(),
264            parsing_time_count: self.parsing_time_count(),
265
266            // Recent rates from sliding windows
267            recent_requests_per_second,
268            recent_responses_per_second,
269            recent_items_per_second,
270        }
271    }
272
273    /// Increments the count of enqueued requests.
274    pub(crate) fn increment_requests_enqueued(&self) {
275        self.requests_enqueued.fetch_add(1, Ordering::AcqRel);
276    }
277
278    /// Increments the count of sent requests.
279    pub(crate) fn increment_requests_sent(&self) {
280        self.requests_sent.fetch_add(1, Ordering::AcqRel);
281        // Update the EMA with a count of 1 for this event
282        self.requests_sent_ema.update(1);
283    }
284
285    /// Increments the count of successful requests.
286    pub(crate) fn increment_requests_succeeded(&self) {
287        self.requests_succeeded.fetch_add(1, Ordering::AcqRel);
288    }
289
290    /// Increments the count of failed requests.
291    pub(crate) fn increment_requests_failed(&self) {
292        self.requests_failed.fetch_add(1, Ordering::AcqRel);
293    }
294
295    /// Increments the count of retried requests.
296    pub(crate) fn increment_requests_retried(&self) {
297        self.requests_retried.fetch_add(1, Ordering::AcqRel);
298    }
299
300    /// Increments the count of dropped requests.
301    pub(crate) fn increment_requests_dropped(&self) {
302        self.requests_dropped.fetch_add(1, Ordering::AcqRel);
303    }
304
305    /// Increments the count of received responses.
306    pub(crate) fn increment_responses_received(&self) {
307        self.responses_received.fetch_add(1, Ordering::AcqRel);
308        // Update the EMA with a count of 1 for this event
309        self.responses_received_ema.update(1);
310    }
311
312    /// Increments the count of responses served from cache.
313    pub(crate) fn increment_responses_from_cache(&self) {
314        self.responses_from_cache.fetch_add(1, Ordering::AcqRel);
315    }
316
317    /// Records a response status code.
318    pub(crate) fn record_response_status(&self, status_code: u16) {
319        *self.response_status_counts.entry(status_code).or_insert(0) += 1;
320    }
321
322    /// Adds to the total bytes downloaded.
323    pub(crate) fn add_bytes_downloaded(&self, bytes: usize) {
324        self.total_bytes_downloaded
325            .fetch_add(bytes, Ordering::AcqRel);
326    }
327
328    /// Adds multiple scraped items to the counter.
329    pub(crate) fn add_items_scraped(&self, count: usize) {
330        if count == 0 {
331            return;
332        }
333        self.items_scraped.fetch_add(count, Ordering::AcqRel);
334        self.items_scraped_ema.update(count);
335    }
336
337    /// Increments the count of processed items.
338    pub(crate) fn increment_items_processed(&self) {
339        self.items_processed.fetch_add(1, Ordering::AcqRel);
340    }
341
342    /// Increments the count of items dropped by pipelines.
343    pub(crate) fn increment_items_dropped_by_pipeline(&self) {
344        self.items_dropped_by_pipeline
345            .fetch_add(1, Ordering::AcqRel);
346    }
347
348    /// Records the time taken for a request.
349    pub fn record_request_time(&self, url: &str, duration: Duration) {
350        self.request_times.insert(url.to_string(), duration);
351    }
352
353    /// Calculates the average request time across all recorded requests.
354    pub fn average_request_time(&self) -> Option<Duration> {
355        let times: Vec<Duration> = self
356            .request_times
357            .iter()
358            .map(|(_key, value)| value)
359            .collect();
360        if times.is_empty() {
361            None
362        } else {
363            let total_nanos: u128 = times.iter().map(|d| d.as_nanos()).sum();
364            let avg_nanos = total_nanos / times.len() as u128;
365            Some(Duration::from_nanos(avg_nanos as u64))
366        }
367    }
368
369    /// Gets the fastest request time among all recorded requests.
370    pub fn fastest_request_time(&self) -> Option<Duration> {
371        self.request_times.iter().map(|(_key, value)| value).min()
372    }
373
374    /// Gets the slowest request time among all recorded requests.
375    pub fn slowest_request_time(&self) -> Option<Duration> {
376        self.request_times.iter().map(|(_key, value)| value).max()
377    }
378
379    /// Gets the total number of recorded request times.
380    pub fn request_time_count(&self) -> usize {
381        self.request_times.entry_count() as usize
382    }
383
384    /// Gets the request time for a specific URL.
385    pub fn get_request_time(&self, url: &str) -> Option<Duration> {
386        self.request_times.get(url)
387    }
388
389    /// Gets all recorded request times as a vector of (URL, Duration) pairs.
390    pub fn get_all_request_times(&self) -> Vec<(String, Duration)> {
391        self.request_times
392            .iter()
393            .map(|(key, value)| (key.to_string(), value))
394            .collect()
395    }
396
397    /// Records the time taken for parsing a response.
398    pub fn record_parsing_time(&self, duration: Duration) {
399        let id = format!(
400            "parse_{}",
401            match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
402                Ok(duration) => duration.as_nanos(),
403                Err(err) => err.duration().as_nanos(),
404            }
405        );
406        self.parsing_times.insert(id, duration);
407    }
408
409    /// Calculates the average parsing time across all recorded parses.
410    pub fn average_parsing_time(&self) -> Option<Duration> {
411        let times: Vec<Duration> = self
412            .parsing_times
413            .iter()
414            .map(|(_key, value)| value)
415            .collect();
416        if times.is_empty() {
417            None
418        } else {
419            let total_nanos: u128 = times.iter().map(|d| d.as_nanos()).sum();
420            let avg_nanos = total_nanos / times.len() as u128;
421            Some(Duration::from_nanos(avg_nanos as u64))
422        }
423    }
424
425    /// Gets the fastest parsing time among all recorded parses.
426    pub fn fastest_parsing_time(&self) -> Option<Duration> {
427        self.parsing_times.iter().map(|(_key, value)| value).min()
428    }
429
430    /// Gets the slowest parsing time among all recorded parses.
431    pub fn slowest_parsing_time(&self) -> Option<Duration> {
432        self.parsing_times.iter().map(|(_key, value)| value).max()
433    }
434
435    /// Gets the total number of recorded parsing times.
436    pub fn parsing_time_count(&self) -> usize {
437        self.parsing_times.entry_count() as usize
438    }
439
440    /// Clears all recorded request times.
441    pub fn clear_request_times(&self) {
442        self.request_times.invalidate_all();
443    }
444
445    /// Clears all recorded parsing times.
446    pub fn clear_parsing_times(&self) {
447        self.parsing_times.invalidate_all();
448    }
449
450    /// Converts the snapshot into a JSON string.
451    pub fn to_json_string(&self) -> Result<String, SpiderError> {
452        Ok(serde_json::to_string(self)?)
453    }
454
455    /// Converts the snapshot into a pretty-printed JSON string.
456    pub fn to_json_string_pretty(&self) -> Result<String, SpiderError> {
457        Ok(serde_json::to_string_pretty(self)?)
458    }
459
460    /// Exports the current statistics to a Markdown formatted string.
461    pub fn to_markdown_string(&self) -> String {
462        let snapshot = self.snapshot();
463
464        let status_codes_list: String = snapshot
465            .response_status_counts
466            .iter()
467            .map(|(code, count)| format!("- **{}**: {}", code, count))
468            .collect::<Vec<String>>()
469            .join("\n");
470        let status_codes_output = if status_codes_list.is_empty() {
471            "N/A".to_string()
472        } else {
473            status_codes_list
474        };
475
476        format!(
477            r#"# Crawl Statistics Report
478
479- **Duration**: {}
480- **Current Rate** (last 10s): {:.2} req/s, {:.2} resp/s, {:.2} item/s
481- **Overall Rate** (total): {:.2} req/s, {:.2} resp/s, {:.2} item/s
482
483## Requests
484| Metric     | Count |
485|------------|-------|
486| Enqueued   | {}     |
487| Sent       | {}     |
488| Succeeded  | {}     |
489| Failed     | {}     |
490| Retried    | {}     |
491| Dropped    | {}     |
492
493## Responses
494| Metric     | Count |
495|------------|-------|
496| Received   | {}     |
497 From Cache | {}     |
498| Downloaded | {}     |
499
500## Items
501| Metric     | Count |
502|------------|--------|
503| Scraped    | {}     |
504| Processed  | {}     |
505| Dropped    | {}     |
506
507## Request Times
508| Metric           | Value      |
509|------------------|------------|
510| Average Time     | {}         |
511| Fastest Request  | {}         |
512| Slowest Request  | {}         |
513| Total Recorded   | {}         |
514
515## Parsing Times
516| Metric           | Value      |
517|------------------|------------|
518| Average Time     | {}         |
519| Fastest Parse    | {}         |
520| Slowest Parse    | {}         |
521| Total Recorded   | {}         |
522
523## Status Codes
524{}
525"#,
526            snapshot.formatted_duration(),
527            snapshot.requests_per_second(),
528            snapshot.responses_per_second(),
529            snapshot.items_per_second(),
530            // Calculate cumulative speeds for comparison
531            {
532                let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
533                if total_seconds > 0.0 {
534                    snapshot.requests_sent as f64 / total_seconds
535                } else {
536                    0.0
537                }
538            },
539            {
540                let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
541                if total_seconds > 0.0 {
542                    snapshot.responses_received as f64 / total_seconds
543                } else {
544                    0.0
545                }
546            },
547            {
548                let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
549                if total_seconds > 0.0 {
550                    snapshot.items_scraped as f64 / total_seconds
551                } else {
552                    0.0
553                }
554            },
555            snapshot.requests_enqueued,
556            snapshot.requests_sent,
557            snapshot.requests_succeeded,
558            snapshot.requests_failed,
559            snapshot.requests_retried,
560            snapshot.requests_dropped,
561            snapshot.responses_received,
562            snapshot.responses_from_cache,
563            snapshot.formatted_bytes(),
564            snapshot.items_scraped,
565            snapshot.items_processed,
566            snapshot.items_dropped_by_pipeline,
567            snapshot.formatted_request_time(snapshot.average_request_time),
568            snapshot.formatted_request_time(snapshot.fastest_request_time),
569            snapshot.formatted_request_time(snapshot.slowest_request_time),
570            snapshot.request_time_count,
571            snapshot.formatted_request_time(snapshot.average_parsing_time),
572            snapshot.formatted_request_time(snapshot.fastest_parsing_time),
573            snapshot.formatted_request_time(snapshot.slowest_parsing_time),
574            snapshot.parsing_time_count,
575            status_codes_output
576        )
577    }
578
579    /// Exports current statistics to the text layout used for terminal output.
580    pub fn to_live_report_string(&self) -> String {
581        let snapshot = self.snapshot();
582        let status_string = if snapshot.response_status_counts.is_empty() {
583            "none".to_string()
584        } else {
585            snapshot
586                .response_status_counts
587                .iter()
588                .map(|(code, count)| format!("{}: {}", code, count))
589                .collect::<Vec<String>>()
590                .join(", ")
591        };
592
593        format!(
594            "Crawl Statistics\n\
595             ----------------\n\
596             duration : {}\n\
597             speed    : req/s: {:.2}, resp/s: {:.2}, item/s: {:.2}\n\
598             requests : enqueued: {}, sent: {}, ok: {}, fail: {}, retry: {}, drop: {}\n\
599             response : received: {}, from_cache: {}, downloaded: {}\n\
600             items    : scraped: {}, processed: {}, dropped: {}\n\
601             req time : avg: {}, fastest: {}, slowest: {}, total: {}\n\
602             parsing  : avg: {}, fastest: {}, slowest: {}, total: {}\n\
603             status   : {}",
604            snapshot.formatted_duration(),
605            snapshot.recent_requests_per_second,
606            snapshot.recent_responses_per_second,
607            snapshot.recent_items_per_second,
608            snapshot.requests_enqueued,
609            snapshot.requests_sent,
610            snapshot.requests_succeeded,
611            snapshot.requests_failed,
612            snapshot.requests_retried,
613            snapshot.requests_dropped,
614            snapshot.responses_received,
615            snapshot.responses_from_cache,
616            snapshot.formatted_bytes(),
617            snapshot.items_scraped,
618            snapshot.items_processed,
619            snapshot.items_dropped_by_pipeline,
620            snapshot.formatted_request_time(snapshot.average_request_time),
621            snapshot.formatted_request_time(snapshot.fastest_request_time),
622            snapshot.formatted_request_time(snapshot.slowest_request_time),
623            snapshot.request_time_count,
624            snapshot.formatted_request_time(snapshot.average_parsing_time),
625            snapshot.formatted_request_time(snapshot.fastest_parsing_time),
626            snapshot.formatted_request_time(snapshot.slowest_parsing_time),
627            snapshot.parsing_time_count,
628            status_string
629        )
630    }
631}
632
633impl Default for StatCollector {
634    fn default() -> Self {
635        Self::new()
636    }
637}
638
639impl std::fmt::Display for StatCollector {
640    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
641        write!(f, "\n{}\n", self.to_live_report_string())
642    }
643}