Skip to main content

spider_core/
stats.rs

1//! # Statistics Module
2//!
3//! Collects and stores various metrics and statistics about the crawler's operation.
4//!
5//! ## Overview
6//!
7//! The `StatCollector` tracks important metrics throughout the crawling process,
8//! including request counts, response statistics, item processing metrics, and
9//! performance indicators. This data is essential for monitoring crawl progress,
10//! diagnosing issues, and optimizing performance.
11//!
12//! ## Key Metrics Tracked
13//!
14//! - **Request Metrics**: Enqueued, sent, succeeded, failed, retried, and dropped requests
15//! - **Response Metrics**: Received, cached, and status code distributions
16//! - **Item Metrics**: Scraped, processed, and dropped items
17//! - **Performance Metrics**: Throughput, response times, and bandwidth usage
18//! - **Timing Metrics**: Elapsed time and processing rates
19//!
20//! ## Features
21//!
22//! - **Thread-Safe**: Uses atomic operations for concurrent metric updates
23//! - **Real-Time Monitoring**: Provides live statistics during crawling
24//! - **Export Formats**: Supports JSON and Markdown export formats
25//! - **Snapshot Capability**: Captures consistent state for reporting
26//!
27//! ## Example
28//!
29//! ```rust,ignore
30//! use spider_core::StatCollector;
31//!
32//! let stats = StatCollector::new();
33//!
34//! // During crawling, metrics are automatically updated
35//! stats.increment_requests_sent();
36//! stats.increment_items_scraped();
37//!
38//! // Export statistics in various formats
39//! println!("{}", stats.to_json_string_pretty().unwrap());
40//! println!("{}", stats.to_markdown_string());
41//! ```
42
43use spider_util::error::SpiderError;
44use std::{
45    collections::HashMap,
46    sync::{
47        Arc,
48        atomic::{AtomicUsize, Ordering},
49    },
50    time::{Duration, Instant},
51};
52
53// A snapshot of the current statistics, used for reporting.
54// This avoids code duplication in the various export/display methods.
55struct StatsSnapshot {
56    requests_enqueued: usize,
57    requests_sent: usize,
58    requests_succeeded: usize,
59    requests_failed: usize,
60    requests_retried: usize,
61    requests_dropped: usize,
62    responses_received: usize,
63    responses_from_cache: usize,
64    total_bytes_downloaded: usize,
65    items_scraped: usize,
66    items_processed: usize,
67    items_dropped_by_pipeline: usize,
68    response_status_counts: HashMap<u16, usize>,
69    elapsed_duration: Duration,
70}
71
72impl StatsSnapshot {
73    fn formatted_duration(&self) -> String {
74        format!("{:?}", self.elapsed_duration)
75    }
76
77    fn requests_per_second(&self) -> f64 {
78        let total_seconds = self.elapsed_duration.as_secs();
79        if total_seconds > 0 {
80            self.requests_sent as f64 / total_seconds as f64
81        } else {
82            0.0
83        }
84    }
85
86    fn responses_per_second(&self) -> f64 {
87        let total_seconds = self.elapsed_duration.as_secs();
88        if total_seconds > 0 {
89            self.responses_received as f64 / total_seconds as f64
90        } else {
91            0.0
92        }
93    }
94
95    fn items_per_second(&self) -> f64 {
96        let total_seconds = self.elapsed_duration.as_secs();
97        if total_seconds > 0 {
98            self.items_scraped as f64 / total_seconds as f64
99        } else {
100            0.0
101        }
102    }
103
104    fn formatted_bytes(&self) -> String {
105        const KB: usize = 1024;
106        const MB: usize = 1024 * KB;
107        const GB: usize = 1024 * MB;
108
109        if self.total_bytes_downloaded >= GB {
110            format!("{:.2} GB", self.total_bytes_downloaded as f64 / GB as f64)
111        } else if self.total_bytes_downloaded >= MB {
112            format!("{:.2} MB", self.total_bytes_downloaded as f64 / MB as f64)
113        } else if self.total_bytes_downloaded >= KB {
114            format!("{:.2} KB", self.total_bytes_downloaded as f64 / KB as f64)
115        } else {
116            format!("{} B", self.total_bytes_downloaded)
117        }
118    }
119}
120
121/// Collects and stores various statistics about the crawler's operation.
122#[derive(Debug, serde::Serialize)]
123pub struct StatCollector {
124    // Crawl-related metrics
125    #[serde(skip)]
126    pub start_time: Instant,
127
128    // Request-related metrics
129    pub requests_enqueued: AtomicUsize,
130    pub requests_sent: AtomicUsize,
131    pub requests_succeeded: AtomicUsize,
132    pub requests_failed: AtomicUsize,
133    pub requests_retried: AtomicUsize,
134    pub requests_dropped: AtomicUsize,
135
136    // Response-related metrics
137    pub responses_received: AtomicUsize,
138    pub responses_from_cache: AtomicUsize,
139    pub response_status_counts: Arc<dashmap::DashMap<u16, usize>>, // e.g., 200, 404, 500
140    pub total_bytes_downloaded: AtomicUsize,
141
142    // Add more advanced response time metrics if needed (e.g., histograms)
143
144    // Item-related metrics
145    pub items_scraped: AtomicUsize,
146    pub items_processed: AtomicUsize,
147    pub items_dropped_by_pipeline: AtomicUsize,
148
149    // Timing metrics
150    pub request_times: Arc<dashmap::DashMap<String, Duration>>,
151}
152
153impl StatCollector {
154    /// Creates a new `StatCollector` with all counters initialized to zero.
155    pub(crate) fn new() -> Self {
156        StatCollector {
157            start_time: Instant::now(),
158            requests_enqueued: AtomicUsize::new(0),
159            requests_sent: AtomicUsize::new(0),
160            requests_succeeded: AtomicUsize::new(0),
161            requests_failed: AtomicUsize::new(0),
162            requests_retried: AtomicUsize::new(0),
163            requests_dropped: AtomicUsize::new(0),
164            responses_received: AtomicUsize::new(0),
165            responses_from_cache: AtomicUsize::new(0),
166            response_status_counts: Arc::new(dashmap::DashMap::new()),
167            total_bytes_downloaded: AtomicUsize::new(0),
168            items_scraped: AtomicUsize::new(0),
169            items_processed: AtomicUsize::new(0),
170            items_dropped_by_pipeline: AtomicUsize::new(0),
171            request_times: Arc::new(dashmap::DashMap::new()),
172        }
173    }
174
175    /// Creates a snapshot of the current statistics.
176    /// This is the single source of truth for all presentation logic.
177    fn snapshot(&self) -> StatsSnapshot {
178        let mut status_counts: HashMap<u16, usize> = HashMap::new();
179        for entry in self.response_status_counts.iter() {
180            let (key, value) = entry.pair();
181            status_counts.insert(*key, *value);
182        }
183
184        StatsSnapshot {
185            requests_enqueued: self.requests_enqueued.load(Ordering::SeqCst),
186            requests_sent: self.requests_sent.load(Ordering::SeqCst),
187            requests_succeeded: self.requests_succeeded.load(Ordering::SeqCst),
188            requests_failed: self.requests_failed.load(Ordering::SeqCst),
189            requests_retried: self.requests_retried.load(Ordering::SeqCst),
190            requests_dropped: self.requests_dropped.load(Ordering::SeqCst),
191            responses_received: self.responses_received.load(Ordering::SeqCst),
192            responses_from_cache: self.responses_from_cache.load(Ordering::SeqCst),
193            total_bytes_downloaded: self.total_bytes_downloaded.load(Ordering::SeqCst),
194            items_scraped: self.items_scraped.load(Ordering::SeqCst),
195            items_processed: self.items_processed.load(Ordering::SeqCst),
196            items_dropped_by_pipeline: self.items_dropped_by_pipeline.load(Ordering::SeqCst),
197            response_status_counts: status_counts,
198            elapsed_duration: self.start_time.elapsed(),
199        }
200    }
201
202    /// Increments the count of enqueued requests.
203    pub(crate) fn increment_requests_enqueued(&self) {
204        self.requests_enqueued.fetch_add(1, Ordering::SeqCst);
205    }
206
207    /// Increments the count of sent requests.
208    pub(crate) fn increment_requests_sent(&self) {
209        self.requests_sent.fetch_add(1, Ordering::SeqCst);
210    }
211
212    /// Increments the count of successful requests.
213    pub(crate) fn increment_requests_succeeded(&self) {
214        self.requests_succeeded.fetch_add(1, Ordering::SeqCst);
215    }
216
217    /// Increments the count of failed requests.
218    pub(crate) fn increment_requests_failed(&self) {
219        self.requests_failed.fetch_add(1, Ordering::SeqCst);
220    }
221
222    /// Increments the count of retried requests.
223    pub(crate) fn increment_requests_retried(&self) {
224        self.requests_retried.fetch_add(1, Ordering::SeqCst);
225    }
226
227    /// Increments the count of dropped requests.
228    pub(crate) fn increment_requests_dropped(&self) {
229        self.requests_dropped.fetch_add(1, Ordering::SeqCst);
230    }
231
232    /// Increments the count of received responses.
233    pub(crate) fn increment_responses_received(&self) {
234        self.responses_received.fetch_add(1, Ordering::SeqCst);
235    }
236
237    /// Increments the count of responses served from cache.
238    pub(crate) fn increment_responses_from_cache(&self) {
239        self.responses_from_cache.fetch_add(1, Ordering::SeqCst);
240    }
241
242    /// Records a response status code.
243    pub(crate) fn record_response_status(&self, status_code: u16) {
244        *self.response_status_counts.entry(status_code).or_insert(0) += 1;
245    }
246
247    /// Adds to the total bytes downloaded.
248    pub(crate) fn add_bytes_downloaded(&self, bytes: usize) {
249        self.total_bytes_downloaded
250            .fetch_add(bytes, Ordering::SeqCst);
251    }
252
253    /// Increments the count of scraped items.
254    pub(crate) fn increment_items_scraped(&self) {
255        self.items_scraped.fetch_add(1, Ordering::SeqCst);
256    }
257
258    /// Increments the count of processed items.
259    pub(crate) fn increment_items_processed(&self) {
260        self.items_processed.fetch_add(1, Ordering::SeqCst);
261    }
262
263    /// Increments the count of items dropped by pipelines.
264    pub(crate) fn increment_items_dropped_by_pipeline(&self) {
265        self.items_dropped_by_pipeline
266            .fetch_add(1, Ordering::SeqCst);
267    }
268
269    /// Records the time taken for a request.
270    #[allow(dead_code)]
271    pub(crate) fn record_request_time(&self, url: &str, duration: Duration) {
272        self.request_times.insert(url.to_string(), duration);
273    }
274
275    /// Converts the snapshot into a JSON string.
276    pub fn to_json_string(&self) -> Result<String, SpiderError> {
277        Ok(serde_json::to_string(self)?)
278    }
279
280    /// Converts the snapshot into a pretty-printed JSON string.
281    pub fn to_json_string_pretty(&self) -> Result<String, SpiderError> {
282        Ok(serde_json::to_string_pretty(self)?)
283    }
284
285    /// Exports the current statistics to a Markdown formatted string.
286    pub fn to_markdown_string(&self) -> String {
287        let snapshot = self.snapshot();
288
289        let status_codes_list: String = snapshot
290            .response_status_counts
291            .iter()
292            .map(|(code, count)| format!("- **{}**: {}", code, count))
293            .collect::<Vec<String>>()
294            .join("\n");
295        let status_codes_output = if status_codes_list.is_empty() {
296            "N/A".to_string()
297        } else {
298            status_codes_list
299        };
300
301        format!(
302            r#"# Crawl Statistics Report
303
304- **Duration**: {}
305- **Average Speed**: {:.2} req/s, {:.2} resp/s, {:.2} item/s
306
307## Requests
308| Metric     | Count |
309|------------|-------|
310| Enqueued   | {}     |
311| Sent       | {}     |
312| Succeeded  | {}     |
313| Failed     | {}     |
314| Retried    | {}     |
315| Dropped    | {}     |
316
317## Responses
318| Metric     | Count |
319|------------|-------|
320| Received   | {}     |
321 From Cache | {}     |
322| Downloaded | {}     |
323
324## Items
325| Metric     | Count |
326|------------|--------|
327| Scraped    | {}     |
328| Processed  | {}     |
329| Dropped    | {}     |
330
331## Status Codes
332{}
333"#,
334            snapshot.formatted_duration(),
335            snapshot.requests_per_second(),
336            snapshot.responses_per_second(),
337            snapshot.items_per_second(),
338            snapshot.requests_enqueued,
339            snapshot.requests_sent,
340            snapshot.requests_succeeded,
341            snapshot.requests_failed,
342            snapshot.requests_retried,
343            snapshot.requests_dropped,
344            snapshot.responses_received,
345            snapshot.responses_from_cache,
346            snapshot.formatted_bytes(),
347            snapshot.items_scraped,
348            snapshot.items_processed,
349            snapshot.items_dropped_by_pipeline,
350            status_codes_output
351        )
352    }
353}
354
355impl Default for StatCollector {
356    fn default() -> Self {
357        Self::new()
358    }
359}
360
361impl std::fmt::Display for StatCollector {
362    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
363        let snapshot = self.snapshot();
364
365        writeln!(f, "\nCrawl Statistics")?;
366        writeln!(f, "----------------")?;
367        writeln!(f, "  duration : {}", snapshot.formatted_duration())?;
368        writeln!(
369            f,
370            "  speed    : req/s: {:.2}, resp/s: {:.2}, item/s: {:.2}",
371            snapshot.requests_per_second(),
372            snapshot.responses_per_second(),
373            snapshot.items_per_second()
374        )?;
375        writeln!(
376            f,
377            "  requests : enqueued: {}, sent: {}, ok: {}, fail: {}, retry: {}, drop: {}",
378            snapshot.requests_enqueued,
379            snapshot.requests_sent,
380            snapshot.requests_succeeded,
381            snapshot.requests_failed,
382            snapshot.requests_retried,
383            snapshot.requests_dropped
384        )?;
385        writeln!(
386            f,
387            "  response : received: {}, from_cache: {}, downloaded: {}",
388            snapshot.responses_received,
389            snapshot.responses_from_cache,
390            snapshot.formatted_bytes()
391        )?;
392        writeln!(
393            f,
394            "  items    : scraped: {}, processed: {}, dropped: {}",
395            snapshot.items_scraped, snapshot.items_processed, snapshot.items_dropped_by_pipeline
396        )?;
397
398        let status_string = if snapshot.response_status_counts.is_empty() {
399            "none".to_string()
400        } else {
401            snapshot
402                .response_status_counts
403                .iter()
404                .map(|(code, count)| format!("{}: {}", code, count))
405                .collect::<Vec<String>>()
406                .join(", ")
407        };
408
409        writeln!(f, "  status   : {}\n", status_string)
410    }
411}