Skip to main content

spider_lib/
stats.rs

1//! Collects and stores various statistics about the crawler's operation.
2
3use std::{
4    collections::HashMap,
5    sync::{
6        Arc, Mutex,
7        atomic::{AtomicUsize, Ordering},
8    },
9    time::Instant, // Added
10};
11use crate::error::SpiderError;
12
13// A snapshot of the current statistics, used for reporting.
14// This avoids code duplication in the various export/display methods.
15struct StatsSnapshot {
16    requests_enqueued: usize,
17    requests_sent: usize,
18    requests_succeeded: usize,
19    requests_failed: usize,
20    requests_retried: usize,
21    requests_dropped: usize,
22    responses_received: usize,
23    responses_from_cache: usize,
24    total_bytes_downloaded: usize,
25    items_scraped: usize,
26    items_processed: usize,
27    items_dropped_by_pipeline: usize,
28    response_status_counts: HashMap<u16, usize>,
29    elapsed_duration: std::time::Duration,
30}
31
32impl StatsSnapshot {
33    fn formatted_duration(&self) -> String {
34        format!("{:?}", self.elapsed_duration)
35    }
36
37    fn requests_per_second(&self) -> f64 {
38        let total_seconds = self.elapsed_duration.as_secs();
39        if total_seconds > 0 { self.requests_sent as f64 / total_seconds as f64 } else { 0.0 }
40    }
41
42    fn responses_per_second(&self) -> f64 {
43        let total_seconds = self.elapsed_duration.as_secs();
44        if total_seconds > 0 { self.responses_received as f64 / total_seconds as f64 } else { 0.0 }
45    }
46
47    fn items_per_second(&self) -> f64 {
48        let total_seconds = self.elapsed_duration.as_secs();
49        if total_seconds > 0 { self.items_scraped as f64 / total_seconds as f64 } else { 0.0 }
50    }
51
52    fn formatted_bytes(&self) -> String {
53        const KB: usize = 1024;
54        const MB: usize = 1024 * KB;
55        const GB: usize = 1024 * MB;
56
57        if self.total_bytes_downloaded >= GB {
58            format!("{:.2} GB", self.total_bytes_downloaded as f64 / GB as f64)
59        } else if self.total_bytes_downloaded >= MB {
60            format!("{:.2} MB", self.total_bytes_downloaded as f64 / MB as f64)
61        } else if self.total_bytes_downloaded >= KB {
62            format!("{:.2} KB", self.total_bytes_downloaded as f64 / KB as f64)
63        } else {
64            format!("{} B", self.total_bytes_downloaded)
65        }
66    }
67}
68
69
70/// Collects and stores various statistics about the crawler's operation.
71#[derive(Debug, Serialize)]
72pub struct StatCollector {
73    // Crawl-related metrics
74    #[serde(skip)] // Skip serialization for Instant
75    pub start_time: Instant,
76    // Request-related metrics
77    pub requests_enqueued: AtomicUsize,
78    pub requests_sent: AtomicUsize,
79    pub requests_succeeded: AtomicUsize,
80    pub requests_failed: AtomicUsize,
81    pub requests_retried: AtomicUsize,
82    pub requests_dropped: AtomicUsize,
83
84    // Response-related metrics
85    pub responses_received: AtomicUsize,
86    pub responses_from_cache: AtomicUsize,
87    pub response_status_counts: Arc<Mutex<HashMap<u16, AtomicUsize>>>, // e.g., 200, 404, 500
88    pub total_bytes_downloaded: AtomicUsize,
89    // Add more advanced response time metrics if needed (e.g., histograms)
90
91    // Item-related metrics
92    pub items_scraped: AtomicUsize,
93    pub items_processed: AtomicUsize,
94    pub items_dropped_by_pipeline: AtomicUsize,
95}
96
97impl StatCollector {
98    /// Creates a new `StatCollector` with all counters initialized to zero.
99    pub(crate) fn new() -> Self {
100        StatCollector {
101            start_time: Instant::now(),
102            requests_enqueued: AtomicUsize::new(0),
103            requests_sent: AtomicUsize::new(0),
104            requests_succeeded: AtomicUsize::new(0),
105            requests_failed: AtomicUsize::new(0),
106            requests_retried: AtomicUsize::new(0),
107            requests_dropped: AtomicUsize::new(0),
108            responses_received: AtomicUsize::new(0),
109            responses_from_cache: AtomicUsize::new(0),
110            response_status_counts: Arc::new(Mutex::new(HashMap::new())),
111            total_bytes_downloaded: AtomicUsize::new(0),
112            items_scraped: AtomicUsize::new(0),
113            items_processed: AtomicUsize::new(0),
114            items_dropped_by_pipeline: AtomicUsize::new(0),
115        }
116    }
117
118    /// Creates a snapshot of the current statistics.
119    /// This is the single source of truth for all presentation logic.
120    fn snapshot(&self) -> StatsSnapshot {
121        let status_counts_guard = self.response_status_counts.lock().unwrap();
122        let status_counts: HashMap<u16, usize> = status_counts_guard
123            .iter()
124            .map(|(code, count)| (*code, count.load(Ordering::SeqCst)))
125            .collect();
126
127        StatsSnapshot {
128            requests_enqueued: self.requests_enqueued.load(Ordering::SeqCst),
129            requests_sent: self.requests_sent.load(Ordering::SeqCst),
130            requests_succeeded: self.requests_succeeded.load(Ordering::SeqCst),
131            requests_failed: self.requests_failed.load(Ordering::SeqCst),
132            requests_retried: self.requests_retried.load(Ordering::SeqCst),
133            requests_dropped: self.requests_dropped.load(Ordering::SeqCst),
134            responses_received: self.responses_received.load(Ordering::SeqCst),
135            responses_from_cache: self.responses_from_cache.load(Ordering::SeqCst),
136            total_bytes_downloaded: self.total_bytes_downloaded.load(Ordering::SeqCst),
137            items_scraped: self.items_scraped.load(Ordering::SeqCst),
138            items_processed: self.items_processed.load(Ordering::SeqCst),
139            items_dropped_by_pipeline: self.items_dropped_by_pipeline.load(Ordering::SeqCst),
140            response_status_counts: status_counts,
141            elapsed_duration: self.start_time.elapsed(),
142        }
143    }
144
145    /// Increments the count of enqueued requests.
146    pub(crate) fn increment_requests_enqueued(&self) {
147        self.requests_enqueued.fetch_add(1, Ordering::SeqCst);
148    }
149
150    /// Increments the count of sent requests.
151    pub(crate) fn increment_requests_sent(&self) {
152        self.requests_sent.fetch_add(1, Ordering::SeqCst);
153    }
154
155    /// Increments the count of successful requests.
156    pub(crate) fn increment_requests_succeeded(&self) {
157        self.requests_succeeded.fetch_add(1, Ordering::SeqCst);
158    }
159
160    /// Increments the count of failed requests.
161    pub(crate) fn increment_requests_failed(&self) {
162        self.requests_failed.fetch_add(1, Ordering::SeqCst);
163    }
164
165    /// Increments the count of retried requests.
166    pub(crate) fn increment_requests_retried(&self) {
167        self.requests_retried.fetch_add(1, Ordering::SeqCst);
168    }
169
170    /// Increments the count of dropped requests.
171    pub(crate) fn increment_requests_dropped(&self) {
172        self.requests_dropped.fetch_add(1, Ordering::SeqCst);
173    }
174
175    /// Increments the count of received responses.
176    pub(crate) fn increment_responses_received(&self) {
177        self.responses_received.fetch_add(1, Ordering::SeqCst);
178    }
179
180    /// Increments the count of responses served from cache.
181    pub(crate) fn increment_responses_from_cache(&self) {
182        self.responses_from_cache.fetch_add(1, Ordering::SeqCst);
183    }
184
185    /// Records a response status code.
186    pub(crate) fn record_response_status(&self, status_code: u16) {
187        let mut counts = self.response_status_counts.lock().unwrap();
188        counts
189            .entry(status_code)
190            .or_insert_with(|| AtomicUsize::new(0))
191            .fetch_add(1, Ordering::SeqCst);
192    }
193
194    /// Adds to the total bytes downloaded.
195    pub(crate) fn add_bytes_downloaded(&self, bytes: usize) {
196        self.total_bytes_downloaded
197            .fetch_add(bytes, Ordering::SeqCst);
198    }
199
200    /// Increments the count of scraped items.
201    pub(crate) fn increment_items_scraped(&self) {
202        self.items_scraped.fetch_add(1, Ordering::SeqCst);
203    }
204
205    /// Increments the count of processed items.
206    pub(crate) fn increment_items_processed(&self) {
207        self.items_processed.fetch_add(1, Ordering::SeqCst);
208    }
209
210    /// Increments the count of items dropped by pipelines.
211    pub(crate) fn increment_items_dropped_by_pipeline(&self) {
212        self.items_dropped_by_pipeline
213            .fetch_add(1, Ordering::SeqCst);
214    }
215
216    /// Converts the snapshot into a JSON string.
217    pub fn to_json_string(&self) -> Result<String, SpiderError> {
218        Ok(serde_json::to_string(self)?)
219    }
220
221    /// Converts the snapshot into a pretty-printed JSON string.
222    pub fn to_json_string_pretty(&self) -> Result<String, SpiderError> {
223        Ok(serde_json::to_string_pretty(self)?)
224    }
225
226    /// Exports the current statistics to a Markdown formatted string.
227    pub fn to_markdown_string(&self) -> String {
228        let snapshot = self.snapshot();
229
230        let status_codes_list: String = snapshot.response_status_counts
231            .iter()
232            .map(|(code, count)| format!("- **{}**: {}", code, count))
233            .collect::<Vec<String>>()
234            .join("\n");
235        let status_codes_output = if status_codes_list.is_empty() { "N/A".to_string() } else { status_codes_list };
236
237        format!(
238            r#"# Crawl Statistics Report
239
240- **Duration**: {}
241- **Average Speed**: {:.2} req/s, {:.2} resp/s, {:.2} item/s
242
243## Requests
244| Metric     | Count |
245|------------|-------|
246| Enqueued   | {}     |
247| Sent       | {}     |
248| Succeeded  | {}     |
249| Failed     | {}     |
250| Retried    | {}     |
251| Dropped    | {}     |
252
253## Responses
254| Metric     | Count |
255|------------|-------|
256| Received   | {}     |
257 From Cache | {}     |
258| Downloaded | {}     |
259
260## Items
261| Metric     | Count |
262|------------|--------|
263| Scraped    | {}     |
264| Processed  | {}     |
265| Dropped    | {}     |
266
267## Status Codes
268{}
269"#,
270            snapshot.formatted_duration(),
271            snapshot.requests_per_second(), snapshot.responses_per_second(), snapshot.items_per_second(),
272            snapshot.requests_enqueued,
273            snapshot.requests_sent,
274            snapshot.requests_succeeded,
275            snapshot.requests_failed,
276            snapshot.requests_retried,
277            snapshot.requests_dropped,
278            snapshot.responses_received,
279            snapshot.responses_from_cache,
280            snapshot.formatted_bytes(),
281            snapshot.items_scraped,
282            snapshot.items_processed,
283            snapshot.items_dropped_by_pipeline,
284            status_codes_output
285        )
286    }
287}
288
289impl Default for StatCollector {
290    fn default() -> Self {
291        Self::new()
292    }
293}
294
295use serde::Serialize;
296
297impl std::fmt::Display for StatCollector {
298    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299        let snapshot = self.snapshot();
300
301        writeln!(f, "\nCrawl Statistics")?;
302        writeln!(f, "----------------")?;
303        writeln!(f, "  duration : {}", snapshot.formatted_duration())?;
304        writeln!(f, "  speed    : req/s: {:.2}, resp/s: {:.2}, item/s: {:.2}",
305            snapshot.requests_per_second(), snapshot.responses_per_second(), snapshot.items_per_second())?;
306        writeln!(f, "  requests : enqueued: {}, sent: {}, ok: {}, fail: {}, retry: {}, drop: {}",
307            snapshot.requests_enqueued, snapshot.requests_sent, snapshot.requests_succeeded,
308            snapshot.requests_failed, snapshot.requests_retried, snapshot.requests_dropped)?;
309        writeln!(f, "  response : received: {}, from_cache: {}, downloaded: {}",
310            snapshot.responses_received, snapshot.responses_from_cache, snapshot.formatted_bytes())?;
311        writeln!(f, "  items    : scraped: {}, processed: {}, dropped: {}",
312            snapshot.items_scraped, snapshot.items_processed, snapshot.items_dropped_by_pipeline)?;
313        
314        let status_string = if snapshot.response_status_counts.is_empty() {
315            "none".to_string()
316        } else {
317            snapshot.response_status_counts
318                .iter()
319                .map(|(code, count)| format!("{}: {}", code, count))
320                .collect::<Vec<String>>()
321                .join(", ")
322        };
323
324        writeln!(f, "  status   : {}\n", status_string)
325    }
326}