Skip to main content

spider_core/
stats.rs

1//! # Statistics Module
2//!
3//! Collects and stores various metrics and statistics about the crawler's operation.
4//!
5//! ## Overview
6//!
7//! The `StatCollector` tracks important metrics throughout the crawling process,
8//! including request counts, response statistics, item processing metrics, and
9//! performance indicators. This data is essential for monitoring crawl progress,
10//! diagnosing issues, and optimizing performance.
11//!
12//! ## Key Metrics Tracked
13//!
14//! - **Request Metrics**: Enqueued, sent, succeeded, failed, retried, and dropped requests
15//! - **Response Metrics**: Received, cached, and status code distributions
16//! - **Item Metrics**: Scraped, processed, and dropped items
17//! - **Performance Metrics**: Throughput, response times, and bandwidth usage
18//! - **Timing Metrics**: Elapsed time and processing rates
19//!
20//! ## Features
21//!
22//! - **Thread-Safe**: Uses atomic operations for concurrent metric updates
23//! - **Real-Time Monitoring**: Provides live statistics during crawling
24//! - **Export Formats**: Supports JSON and Markdown export formats
25//! - **Snapshot Capability**: Captures consistent state for reporting
26//!
27//! ## Example
28//!
29//! ```rust,ignore
30//! use spider_core::StatCollector;
31//!
32//! let stats = StatCollector::new();
33//!
34//! // During crawling, metrics are automatically updated
35//! stats.increment_requests_sent();
36//! stats.increment_items_scraped();
37//!
38//! // Export statistics in various formats
39//! println!("{}", stats.to_json_string_pretty().unwrap());
40//! println!("{}", stats.to_markdown_string());
41//! ```
42
43use spider_util::error::SpiderError;
44use std::{
45    collections::HashMap,
46    sync::{
47        Arc,
48        atomic::{AtomicUsize, Ordering},
49    },
50    time::{Duration, Instant},
51};
52// Thread-safe exponential moving average for tracking recent rates
53#[derive(Debug)]
54pub(crate) struct ExpMovingAverage {
55    alpha: f64,  // Smoothing factor (typically 0.1-0.3)
56    rate: Arc<parking_lot::RwLock<f64>>,
57    last_update: Arc<parking_lot::RwLock<Instant>>,
58}
59
60impl ExpMovingAverage {
61    fn new(alpha: f64) -> Self {
62        ExpMovingAverage {
63            alpha,
64            rate: Arc::new(parking_lot::RwLock::new(0.0)),
65            last_update: Arc::new(parking_lot::RwLock::new(Instant::now())),
66        }
67    }
68
69    fn update(&self, count: usize) {
70        let now = Instant::now();
71        let mut last_update = self.last_update.write();
72        let time_delta = now.duration_since(*last_update).as_secs_f64();
73        *last_update = now;
74        
75        if time_delta > 0.0 {
76            let current_rate = count as f64 / time_delta;
77            let mut rate = self.rate.write();
78            // Apply exponential moving average formula
79            *rate = self.alpha * current_rate + (1.0 - self.alpha) * (*rate);
80        }
81    }
82
83    fn get_rate(&self) -> f64 {
84        *self.rate.read()
85    }
86}
87
88// A snapshot of the current statistics, used for reporting.
89// This avoids code duplication in the various export/display methods.
90struct StatsSnapshot {
91    requests_enqueued: usize,
92    requests_sent: usize,
93    requests_succeeded: usize,
94    requests_failed: usize,
95    requests_retried: usize,
96    requests_dropped: usize,
97    responses_received: usize,
98    responses_from_cache: usize,
99    total_bytes_downloaded: usize,
100    items_scraped: usize,
101    items_processed: usize,
102    items_dropped_by_pipeline: usize,
103    response_status_counts: HashMap<u16, usize>,
104    elapsed_duration: Duration,
105    average_request_time: Option<Duration>,
106    fastest_request_time: Option<Duration>,
107    slowest_request_time: Option<Duration>,
108    request_time_count: usize,
109    
110    // Recent rates from sliding windows
111    recent_requests_per_second: f64,
112    recent_responses_per_second: f64,
113    recent_items_per_second: f64,
114}
115
116impl StatsSnapshot {
117    fn formatted_duration(&self) -> String {
118        format!("{:?}", self.elapsed_duration)
119    }
120
121    fn formatted_request_time(&self, duration: Option<Duration>) -> String {
122        match duration {
123            Some(d) => {
124                if d.as_millis() < 1000 {
125                    format!("{} ms", d.as_millis())
126                } else {
127                    format!("{:.2} s", d.as_secs_f64())
128                }
129            }
130            None => "N/A".to_string(),
131        }
132    }
133
134    fn requests_per_second(&self) -> f64 {
135        self.recent_requests_per_second
136    }
137
138    fn responses_per_second(&self) -> f64 {
139        self.recent_responses_per_second
140    }
141
142    fn items_per_second(&self) -> f64 {
143        self.recent_items_per_second
144    }
145
146    fn formatted_bytes(&self) -> String {
147        const KB: usize = 1024;
148        const MB: usize = 1024 * KB;
149        const GB: usize = 1024 * MB;
150
151        if self.total_bytes_downloaded >= GB {
152            format!("{:.2} GB", self.total_bytes_downloaded as f64 / GB as f64)
153        } else if self.total_bytes_downloaded >= MB {
154            format!("{:.2} MB", self.total_bytes_downloaded as f64 / MB as f64)
155        } else if self.total_bytes_downloaded >= KB {
156            format!("{:.2} KB", self.total_bytes_downloaded as f64 / KB as f64)
157        } else {
158            format!("{} B", self.total_bytes_downloaded)
159        }
160    }
161}
162
163/// Collects and stores various statistics about the crawler's operation.
164#[derive(Debug, serde::Serialize)]
165pub struct StatCollector {
166    // Crawl-related metrics
167    #[serde(skip)]
168    pub start_time: Instant,
169
170    // Request-related metrics
171    pub requests_enqueued: AtomicUsize,
172    pub requests_sent: AtomicUsize,
173    pub requests_succeeded: AtomicUsize,
174    pub requests_failed: AtomicUsize,
175    pub requests_retried: AtomicUsize,
176    pub requests_dropped: AtomicUsize,
177
178    // Response-related metrics
179    pub responses_received: AtomicUsize,
180    pub responses_from_cache: AtomicUsize,
181    pub response_status_counts: Arc<dashmap::DashMap<u16, usize>>, // e.g., 200, 404, 500
182    pub total_bytes_downloaded: AtomicUsize,
183
184    // Add more advanced response time metrics if needed (e.g., histograms)
185
186    // Item-related metrics
187    pub items_scraped: AtomicUsize,
188    pub items_processed: AtomicUsize,
189    pub items_dropped_by_pipeline: AtomicUsize,
190
191    // Timing metrics
192    pub request_times: Arc<dashmap::DashMap<String, Duration>>,
193    
194    // Exponential moving average metrics for accurate speed calculations
195    #[serde(skip)]
196    requests_sent_ema: ExpMovingAverage,
197    #[serde(skip)]
198    responses_received_ema: ExpMovingAverage,
199    #[serde(skip)]
200    items_scraped_ema: ExpMovingAverage,
201    
202    // Track previous counts for rate calculation
203    #[serde(skip)]
204    prev_requests_sent: AtomicUsize,
205    #[serde(skip)]
206    prev_responses_received: AtomicUsize,
207    #[serde(skip)]
208    prev_items_scraped: AtomicUsize,
209}
210
211impl StatCollector {
212    /// Creates a new `StatCollector` with all counters initialized to zero.
213    pub(crate) fn new() -> Self {
214        StatCollector {
215            start_time: Instant::now(),
216            requests_enqueued: AtomicUsize::new(0),
217            requests_sent: AtomicUsize::new(0),
218            requests_succeeded: AtomicUsize::new(0),
219            requests_failed: AtomicUsize::new(0),
220            requests_retried: AtomicUsize::new(0),
221            requests_dropped: AtomicUsize::new(0),
222            responses_received: AtomicUsize::new(0),
223            responses_from_cache: AtomicUsize::new(0),
224            response_status_counts: Arc::new(dashmap::DashMap::new()),
225            total_bytes_downloaded: AtomicUsize::new(0),
226            items_scraped: AtomicUsize::new(0),
227            items_processed: AtomicUsize::new(0),
228            items_dropped_by_pipeline: AtomicUsize::new(0),
229            request_times: Arc::new(dashmap::DashMap::new()),
230            // Initialize exponential moving averages for recent speed calculations (alpha = 0.2 for good balance)
231            requests_sent_ema: ExpMovingAverage::new(0.2),
232            responses_received_ema: ExpMovingAverage::new(0.2),
233            items_scraped_ema: ExpMovingAverage::new(0.2),
234            
235            // Initialize previous counts to 0
236            prev_requests_sent: AtomicUsize::new(0),
237            prev_responses_received: AtomicUsize::new(0),
238            prev_items_scraped: AtomicUsize::new(0),
239        }
240    }
241
242    /// Creates a snapshot of the current statistics.
243    /// This is the single source of truth for all presentation logic.
244    fn snapshot(&self) -> StatsSnapshot {
245        let mut status_counts: HashMap<u16, usize> = HashMap::new();
246        for entry in self.response_status_counts.iter() {
247            let (key, value) = entry.pair();
248            status_counts.insert(*key, *value);
249        }
250
251        // Get recent rates from exponential moving averages
252        let recent_requests_per_second = self.requests_sent_ema.get_rate();
253        let recent_responses_per_second = self.responses_received_ema.get_rate();
254        let recent_items_per_second = self.items_scraped_ema.get_rate();
255
256        StatsSnapshot {
257            requests_enqueued: self.requests_enqueued.load(Ordering::SeqCst),
258            requests_sent: self.requests_sent.load(Ordering::SeqCst),
259            requests_succeeded: self.requests_succeeded.load(Ordering::SeqCst),
260            requests_failed: self.requests_failed.load(Ordering::SeqCst),
261            requests_retried: self.requests_retried.load(Ordering::SeqCst),
262            requests_dropped: self.requests_dropped.load(Ordering::SeqCst),
263            responses_received: self.responses_received.load(Ordering::SeqCst),
264            responses_from_cache: self.responses_from_cache.load(Ordering::SeqCst),
265            total_bytes_downloaded: self.total_bytes_downloaded.load(Ordering::SeqCst),
266            items_scraped: self.items_scraped.load(Ordering::SeqCst),
267            items_processed: self.items_processed.load(Ordering::SeqCst),
268            items_dropped_by_pipeline: self.items_dropped_by_pipeline.load(Ordering::SeqCst),
269            response_status_counts: status_counts,
270            elapsed_duration: self.start_time.elapsed(),
271            average_request_time: self.average_request_time(),
272            fastest_request_time: self.fastest_request_time(),
273            slowest_request_time: self.slowest_request_time(),
274            request_time_count: self.request_time_count(),
275            
276            // Recent rates from sliding windows
277            recent_requests_per_second,
278            recent_responses_per_second,
279            recent_items_per_second,
280        }
281    }
282
283    /// Increments the count of enqueued requests.
284    pub(crate) fn increment_requests_enqueued(&self) {
285        self.requests_enqueued.fetch_add(1, Ordering::SeqCst);
286    }
287
288    /// Increments the count of sent requests.
289    pub(crate) fn increment_requests_sent(&self) {
290        let new_count = self.requests_sent.fetch_add(1, Ordering::SeqCst) + 1;
291        // Calculate how many requests were sent since the last update
292        let prev_count = self.prev_requests_sent.load(Ordering::SeqCst);
293        let delta = new_count - prev_count;
294        self.requests_sent_ema.update(delta);
295        // Update the previous count
296        self.prev_requests_sent.store(new_count, Ordering::SeqCst);
297    }
298
299    /// Increments the count of successful requests.
300    pub(crate) fn increment_requests_succeeded(&self) {
301        self.requests_succeeded.fetch_add(1, Ordering::SeqCst);
302    }
303
304    /// Increments the count of failed requests.
305    pub(crate) fn increment_requests_failed(&self) {
306        self.requests_failed.fetch_add(1, Ordering::SeqCst);
307    }
308
309    /// Increments the count of retried requests.
310    pub(crate) fn increment_requests_retried(&self) {
311        self.requests_retried.fetch_add(1, Ordering::SeqCst);
312    }
313
314    /// Increments the count of dropped requests.
315    pub(crate) fn increment_requests_dropped(&self) {
316        self.requests_dropped.fetch_add(1, Ordering::SeqCst);
317    }
318
319    /// Increments the count of received responses.
320    pub(crate) fn increment_responses_received(&self) {
321        let new_count = self.responses_received.fetch_add(1, Ordering::SeqCst) + 1;
322        // Calculate how many responses were received since the last update
323        let prev_count = self.prev_responses_received.load(Ordering::SeqCst);
324        let delta = new_count - prev_count;
325        self.responses_received_ema.update(delta);
326        // Update the previous count
327        self.prev_responses_received.store(new_count, Ordering::SeqCst);
328    }
329
330    /// Increments the count of responses served from cache.
331    pub(crate) fn increment_responses_from_cache(&self) {
332        self.responses_from_cache.fetch_add(1, Ordering::SeqCst);
333    }
334
335    /// Records a response status code.
336    pub(crate) fn record_response_status(&self, status_code: u16) {
337        *self.response_status_counts.entry(status_code).or_insert(0) += 1;
338    }
339
340    /// Adds to the total bytes downloaded.
341    #[cfg(not(feature = "stream"))]
342    pub(crate) fn add_bytes_downloaded(&self, bytes: usize) {
343        self.total_bytes_downloaded
344            .fetch_add(bytes, Ordering::SeqCst);
345    }
346
347    /// Increments the count of scraped items.
348    pub(crate) fn increment_items_scraped(&self) {
349        let new_count = self.items_scraped.fetch_add(1, Ordering::SeqCst) + 1;
350        // Calculate how many items were scraped since the last update
351        let prev_count = self.prev_items_scraped.load(Ordering::SeqCst);
352        let delta = new_count - prev_count;
353        self.items_scraped_ema.update(delta);
354        // Update the previous count
355        self.prev_items_scraped.store(new_count, Ordering::SeqCst);
356    }
357
358    /// Increments the count of processed items.
359    pub(crate) fn increment_items_processed(&self) {
360        self.items_processed.fetch_add(1, Ordering::SeqCst);
361    }
362
363    /// Increments the count of items dropped by pipelines.
364    pub(crate) fn increment_items_dropped_by_pipeline(&self) {
365        self.items_dropped_by_pipeline
366            .fetch_add(1, Ordering::SeqCst);
367    }
368
369    /// Records the time taken for a request.
370    pub fn record_request_time(&self, url: &str, duration: Duration) {
371        self.request_times.insert(url.to_string(), duration);
372    }
373
374    /// Calculates the average request time across all recorded requests.
375    pub fn average_request_time(&self) -> Option<Duration> {
376        let times: Vec<Duration> = self
377            .request_times
378            .iter()
379            .map(|entry| *entry.value())
380            .collect();
381        if times.is_empty() {
382            None
383        } else {
384            let total_nanos: u128 = times.iter().map(|d| d.as_nanos()).sum();
385            let avg_nanos = total_nanos / times.len() as u128;
386            Some(Duration::from_nanos(avg_nanos as u64))
387        }
388    }
389
390    /// Gets the fastest request time among all recorded requests.
391    pub fn fastest_request_time(&self) -> Option<Duration> {
392        self.request_times.iter().map(|entry| *entry.value()).min()
393    }
394
395    /// Gets the slowest request time among all recorded requests.
396    pub fn slowest_request_time(&self) -> Option<Duration> {
397        self.request_times.iter().map(|entry| *entry.value()).max()
398    }
399
400    /// Gets the total number of recorded request times.
401    pub fn request_time_count(&self) -> usize {
402        self.request_times.len()
403    }
404
405    /// Gets the request time for a specific URL.
406    pub fn get_request_time(&self, url: &str) -> Option<Duration> {
407        self.request_times
408            .get(url)
409            .map(|duration| *duration.value())
410    }
411
412    /// Gets all recorded request times as a vector of (URL, Duration) pairs.
413    pub fn get_all_request_times(&self) -> Vec<(String, Duration)> {
414        self.request_times
415            .iter()
416            .map(|entry| (entry.key().clone(), *entry.value()))
417            .collect()
418    }
419
420    /// Clears all recorded request times.
421    pub fn clear_request_times(&self) {
422        self.request_times.clear();
423    }
424
425    /// Converts the snapshot into a JSON string.
426    pub fn to_json_string(&self) -> Result<String, SpiderError> {
427        Ok(serde_json::to_string(self)?)
428    }
429
430    /// Converts the snapshot into a pretty-printed JSON string.
431    pub fn to_json_string_pretty(&self) -> Result<String, SpiderError> {
432        Ok(serde_json::to_string_pretty(self)?)
433    }
434
435    /// Exports the current statistics to a Markdown formatted string.
436    pub fn to_markdown_string(&self) -> String {
437        let snapshot = self.snapshot();
438
439        let status_codes_list: String = snapshot
440            .response_status_counts
441            .iter()
442            .map(|(code, count)| format!("- **{}**: {}", code, count))
443            .collect::<Vec<String>>()
444            .join("\n");
445        let status_codes_output = if status_codes_list.is_empty() {
446            "N/A".to_string()
447        } else {
448            status_codes_list
449        };
450
451        format!(
452            r#"# Crawl Statistics Report
453
454- **Duration**: {}
455- **Current Rate** (last 10s): {:.2} req/s, {:.2} resp/s, {:.2} item/s
456- **Overall Rate** (total): {:.2} req/s, {:.2} resp/s, {:.2} item/s
457
458## Requests
459| Metric     | Count |
460|------------|-------|
461| Enqueued   | {}     |
462| Sent       | {}     |
463| Succeeded  | {}     |
464| Failed     | {}     |
465| Retried    | {}     |
466| Dropped    | {}     |
467
468## Responses
469| Metric     | Count |
470|------------|-------|
471| Received   | {}     |
472 From Cache | {}     |
473| Downloaded | {}     |
474
475## Items
476| Metric     | Count |
477|------------|--------|
478| Scraped    | {}     |
479| Processed  | {}     |
480| Dropped    | {}     |
481
482## Request Times
483| Metric           | Value      |
484|------------------|------------|
485| Average Time     | {}         |
486| Fastest Request  | {}         |
487| Slowest Request  | {}         |
488| Total Recorded   | {}         |
489
490## Status Codes
491{}
492"#,
493            snapshot.formatted_duration(),
494            snapshot.requests_per_second(),
495            snapshot.responses_per_second(),
496            snapshot.items_per_second(),
497            // Calculate cumulative speeds for comparison
498            {
499                let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
500                if total_seconds > 0.0 { snapshot.requests_sent as f64 / total_seconds } else { 0.0 }
501            },
502            {
503                let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
504                if total_seconds > 0.0 { snapshot.responses_received as f64 / total_seconds } else { 0.0 }
505            },
506            {
507                let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
508                if total_seconds > 0.0 { snapshot.items_scraped as f64 / total_seconds } else { 0.0 }
509            },
510            snapshot.requests_enqueued,
511            snapshot.requests_sent,
512            snapshot.requests_succeeded,
513            snapshot.requests_failed,
514            snapshot.requests_retried,
515            snapshot.requests_dropped,
516            snapshot.responses_received,
517            snapshot.responses_from_cache,
518            snapshot.formatted_bytes(),
519            snapshot.items_scraped,
520            snapshot.items_processed,
521            snapshot.items_dropped_by_pipeline,
522            snapshot.formatted_request_time(snapshot.average_request_time),
523            snapshot.formatted_request_time(snapshot.fastest_request_time),
524            snapshot.formatted_request_time(snapshot.slowest_request_time),
525            snapshot.request_time_count,
526            status_codes_output
527        )
528    }
529}
530
531impl Default for StatCollector {
532    fn default() -> Self {
533        Self::new()
534    }
535}
536
537impl std::fmt::Display for StatCollector {
538    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
539        let snapshot = self.snapshot();
540
541        writeln!(f, "\nCrawl Statistics")?;
542        writeln!(f, "----------------")?;
543        writeln!(f, "  duration : {}", snapshot.formatted_duration())?;
544        writeln!(
545            f,
546            "  speed    : req/s: {:.2}, resp/s: {:.2}, item/s: {:.2}",
547            snapshot.requests_per_second(),
548            snapshot.responses_per_second(),
549            snapshot.items_per_second()
550        )?;
551        writeln!(
552            f,
553            "  requests : enqueued: {}, sent: {}, ok: {}, fail: {}, retry: {}, drop: {}",
554            snapshot.requests_enqueued,
555            snapshot.requests_sent,
556            snapshot.requests_succeeded,
557            snapshot.requests_failed,
558            snapshot.requests_retried,
559            snapshot.requests_dropped
560        )?;
561        writeln!(
562            f,
563            "  response : received: {}, from_cache: {}, downloaded: {}",
564            snapshot.responses_received,
565            snapshot.responses_from_cache,
566            snapshot.formatted_bytes()
567        )?;
568        writeln!(
569            f,
570            "  items    : scraped: {}, processed: {}, dropped: {}",
571            snapshot.items_scraped, snapshot.items_processed, snapshot.items_dropped_by_pipeline
572        )?;
573        writeln!(
574            f,
575            "  times    : avg: {}, fastest: {}, slowest: {}, total: {}",
576            snapshot.formatted_request_time(snapshot.average_request_time),
577            snapshot.formatted_request_time(snapshot.fastest_request_time),
578            snapshot.formatted_request_time(snapshot.slowest_request_time),
579            snapshot.request_time_count
580        )?;
581
582        let status_string = if snapshot.response_status_counts.is_empty() {
583            "none".to_string()
584        } else {
585            snapshot
586                .response_status_counts
587                .iter()
588                .map(|(code, count)| format!("{}: {}", code, count))
589                .collect::<Vec<String>>()
590                .join(", ")
591        };
592
593        writeln!(f, "  status   : {}\n", status_string)
594    }
595}