Skip to main content

spider_lib/
stats.rs

1//! Collects and stores various statistics about the crawler's operation.
2
3use crate::error::SpiderError;
4use std::{
5    collections::HashMap,
6    sync::{
7        Arc, Mutex,
8        atomic::{AtomicUsize, Ordering},
9    },
10    time::Instant,
11};
12
13// A snapshot of the current statistics, used for reporting.
14// This avoids code duplication in the various export/display methods.
15struct StatsSnapshot {
16    requests_enqueued: usize,
17    requests_sent: usize,
18    requests_succeeded: usize,
19    requests_failed: usize,
20    requests_retried: usize,
21    requests_dropped: usize,
22    responses_received: usize,
23    responses_from_cache: usize,
24    total_bytes_downloaded: usize,
25    items_scraped: usize,
26    items_processed: usize,
27    items_dropped_by_pipeline: usize,
28    response_status_counts: HashMap<u16, usize>,
29    elapsed_duration: std::time::Duration,
30}
31
32impl StatsSnapshot {
33    fn formatted_duration(&self) -> String {
34        format!("{:?}", self.elapsed_duration)
35    }
36
37    fn requests_per_second(&self) -> f64 {
38        let total_seconds = self.elapsed_duration.as_secs();
39        if total_seconds > 0 {
40            self.requests_sent as f64 / total_seconds as f64
41        } else {
42            0.0
43        }
44    }
45
46    fn responses_per_second(&self) -> f64 {
47        let total_seconds = self.elapsed_duration.as_secs();
48        if total_seconds > 0 {
49            self.responses_received as f64 / total_seconds as f64
50        } else {
51            0.0
52        }
53    }
54
55    fn items_per_second(&self) -> f64 {
56        let total_seconds = self.elapsed_duration.as_secs();
57        if total_seconds > 0 {
58            self.items_scraped as f64 / total_seconds as f64
59        } else {
60            0.0
61        }
62    }
63
64    fn formatted_bytes(&self) -> String {
65        const KB: usize = 1024;
66        const MB: usize = 1024 * KB;
67        const GB: usize = 1024 * MB;
68
69        if self.total_bytes_downloaded >= GB {
70            format!("{:.2} GB", self.total_bytes_downloaded as f64 / GB as f64)
71        } else if self.total_bytes_downloaded >= MB {
72            format!("{:.2} MB", self.total_bytes_downloaded as f64 / MB as f64)
73        } else if self.total_bytes_downloaded >= KB {
74            format!("{:.2} KB", self.total_bytes_downloaded as f64 / KB as f64)
75        } else {
76            format!("{} B", self.total_bytes_downloaded)
77        }
78    }
79}
80
81/// Collects and stores various statistics about the crawler's operation.
82#[derive(Debug, Serialize)]
83pub struct StatCollector {
84    // Crawl-related metrics
85    #[serde(skip)]
86    pub start_time: Instant,
87
88    // Request-related metrics
89    pub requests_enqueued: AtomicUsize,
90    pub requests_sent: AtomicUsize,
91    pub requests_succeeded: AtomicUsize,
92    pub requests_failed: AtomicUsize,
93    pub requests_retried: AtomicUsize,
94    pub requests_dropped: AtomicUsize,
95
96    // Response-related metrics
97    pub responses_received: AtomicUsize,
98    pub responses_from_cache: AtomicUsize,
99    pub response_status_counts: Arc<Mutex<HashMap<u16, usize>>>, // e.g., 200, 404, 500
100    pub total_bytes_downloaded: AtomicUsize,
101
102    // Add more advanced response time metrics if needed (e.g., histograms)
103
104    // Item-related metrics
105    pub items_scraped: AtomicUsize,
106    pub items_processed: AtomicUsize,
107    pub items_dropped_by_pipeline: AtomicUsize,
108}
109
110impl StatCollector {
111    /// Creates a new `StatCollector` with all counters initialized to zero.
112    pub(crate) fn new() -> Self {
113        StatCollector {
114            start_time: Instant::now(),
115            requests_enqueued: AtomicUsize::new(0),
116            requests_sent: AtomicUsize::new(0),
117            requests_succeeded: AtomicUsize::new(0),
118            requests_failed: AtomicUsize::new(0),
119            requests_retried: AtomicUsize::new(0),
120            requests_dropped: AtomicUsize::new(0),
121            responses_received: AtomicUsize::new(0),
122            responses_from_cache: AtomicUsize::new(0),
123            response_status_counts: Arc::new(Mutex::new(HashMap::new())),
124            total_bytes_downloaded: AtomicUsize::new(0),
125            items_scraped: AtomicUsize::new(0),
126            items_processed: AtomicUsize::new(0),
127            items_dropped_by_pipeline: AtomicUsize::new(0),
128        }
129    }
130
131    /// Creates a snapshot of the current statistics.
132    /// This is the single source of truth for all presentation logic.
133    fn snapshot(&self) -> StatsSnapshot {
134        let status_counts_guard = self.response_status_counts.lock().unwrap();
135        let status_counts: HashMap<u16, usize> = status_counts_guard.clone();
136
137        StatsSnapshot {
138            requests_enqueued: self.requests_enqueued.load(Ordering::SeqCst),
139            requests_sent: self.requests_sent.load(Ordering::SeqCst),
140            requests_succeeded: self.requests_succeeded.load(Ordering::SeqCst),
141            requests_failed: self.requests_failed.load(Ordering::SeqCst),
142            requests_retried: self.requests_retried.load(Ordering::SeqCst),
143            requests_dropped: self.requests_dropped.load(Ordering::SeqCst),
144            responses_received: self.responses_received.load(Ordering::SeqCst),
145            responses_from_cache: self.responses_from_cache.load(Ordering::SeqCst),
146            total_bytes_downloaded: self.total_bytes_downloaded.load(Ordering::SeqCst),
147            items_scraped: self.items_scraped.load(Ordering::SeqCst),
148            items_processed: self.items_processed.load(Ordering::SeqCst),
149            items_dropped_by_pipeline: self.items_dropped_by_pipeline.load(Ordering::SeqCst),
150            response_status_counts: status_counts,
151            elapsed_duration: self.start_time.elapsed(),
152        }
153    }
154
155    /// Increments the count of enqueued requests.
156    pub(crate) fn increment_requests_enqueued(&self) {
157        self.requests_enqueued.fetch_add(1, Ordering::SeqCst);
158    }
159
160    /// Increments the count of sent requests.
161    pub(crate) fn increment_requests_sent(&self) {
162        self.requests_sent.fetch_add(1, Ordering::SeqCst);
163    }
164
165    /// Increments the count of successful requests.
166    pub(crate) fn increment_requests_succeeded(&self) {
167        self.requests_succeeded.fetch_add(1, Ordering::SeqCst);
168    }
169
170    /// Increments the count of failed requests.
171    pub(crate) fn increment_requests_failed(&self) {
172        self.requests_failed.fetch_add(1, Ordering::SeqCst);
173    }
174
175    /// Increments the count of retried requests.
176    pub(crate) fn increment_requests_retried(&self) {
177        self.requests_retried.fetch_add(1, Ordering::SeqCst);
178    }
179
180    /// Increments the count of dropped requests.
181    pub(crate) fn increment_requests_dropped(&self) {
182        self.requests_dropped.fetch_add(1, Ordering::SeqCst);
183    }
184
185    /// Increments the count of received responses.
186    pub(crate) fn increment_responses_received(&self) {
187        self.responses_received.fetch_add(1, Ordering::SeqCst);
188    }
189
190    /// Increments the count of responses served from cache.
191    pub(crate) fn increment_responses_from_cache(&self) {
192        self.responses_from_cache.fetch_add(1, Ordering::SeqCst);
193    }
194
195    /// Records a response status code.
196    pub(crate) fn record_response_status(&self, status_code: u16) {
197        let mut counts = self.response_status_counts.lock().unwrap();
198        *counts.entry(status_code).or_insert(0) += 1;
199    }
200
201    /// Adds to the total bytes downloaded.
202    pub(crate) fn add_bytes_downloaded(&self, bytes: usize) {
203        self.total_bytes_downloaded
204            .fetch_add(bytes, Ordering::SeqCst);
205    }
206
207    /// Increments the count of scraped items.
208    pub(crate) fn increment_items_scraped(&self) {
209        self.items_scraped.fetch_add(1, Ordering::SeqCst);
210    }
211
212    /// Increments the count of processed items.
213    pub(crate) fn increment_items_processed(&self) {
214        self.items_processed.fetch_add(1, Ordering::SeqCst);
215    }
216
217    /// Increments the count of items dropped by pipelines.
218    pub(crate) fn increment_items_dropped_by_pipeline(&self) {
219        self.items_dropped_by_pipeline
220            .fetch_add(1, Ordering::SeqCst);
221    }
222
223    /// Converts the snapshot into a JSON string.
224    pub fn to_json_string(&self) -> Result<String, SpiderError> {
225        Ok(serde_json::to_string(self)?)
226    }
227
228    /// Converts the snapshot into a pretty-printed JSON string.
229    pub fn to_json_string_pretty(&self) -> Result<String, SpiderError> {
230        Ok(serde_json::to_string_pretty(self)?)
231    }
232
233    /// Exports the current statistics to a Markdown formatted string.
234    pub fn to_markdown_string(&self) -> String {
235        let snapshot = self.snapshot();
236
237        let status_codes_list: String = snapshot
238            .response_status_counts
239            .iter()
240            .map(|(code, count)| format!("- **{}**: {}", code, count))
241            .collect::<Vec<String>>()
242            .join("\n");
243        let status_codes_output = if status_codes_list.is_empty() {
244            "N/A".to_string()
245        } else {
246            status_codes_list
247        };
248
249        format!(
250            r#"# Crawl Statistics Report
251
252- **Duration**: {}
253- **Average Speed**: {:.2} req/s, {:.2} resp/s, {:.2} item/s
254
255## Requests
256| Metric     | Count |
257|------------|-------|
258| Enqueued   | {}     |
259| Sent       | {}     |
260| Succeeded  | {}     |
261| Failed     | {}     |
262| Retried    | {}     |
263| Dropped    | {}     |
264
265## Responses
266| Metric     | Count |
267|------------|-------|
268| Received   | {}     |
269 From Cache | {}     |
270| Downloaded | {}     |
271
272## Items
273| Metric     | Count |
274|------------|--------|
275| Scraped    | {}     |
276| Processed  | {}     |
277| Dropped    | {}     |
278
279## Status Codes
280{}
281"#,
282            snapshot.formatted_duration(),
283            snapshot.requests_per_second(),
284            snapshot.responses_per_second(),
285            snapshot.items_per_second(),
286            snapshot.requests_enqueued,
287            snapshot.requests_sent,
288            snapshot.requests_succeeded,
289            snapshot.requests_failed,
290            snapshot.requests_retried,
291            snapshot.requests_dropped,
292            snapshot.responses_received,
293            snapshot.responses_from_cache,
294            snapshot.formatted_bytes(),
295            snapshot.items_scraped,
296            snapshot.items_processed,
297            snapshot.items_dropped_by_pipeline,
298            status_codes_output
299        )
300    }
301}
302
303impl Default for StatCollector {
304    fn default() -> Self {
305        Self::new()
306    }
307}
308
309use serde::Serialize;
310
311impl std::fmt::Display for StatCollector {
312    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313        let snapshot = self.snapshot();
314
315        writeln!(f, "\nCrawl Statistics")?;
316        writeln!(f, "----------------")?;
317        writeln!(f, "  duration : {}", snapshot.formatted_duration())?;
318        writeln!(
319            f,
320            "  speed    : req/s: {:.2}, resp/s: {:.2}, item/s: {:.2}",
321            snapshot.requests_per_second(),
322            snapshot.responses_per_second(),
323            snapshot.items_per_second()
324        )?;
325        writeln!(
326            f,
327            "  requests : enqueued: {}, sent: {}, ok: {}, fail: {}, retry: {}, drop: {}",
328            snapshot.requests_enqueued,
329            snapshot.requests_sent,
330            snapshot.requests_succeeded,
331            snapshot.requests_failed,
332            snapshot.requests_retried,
333            snapshot.requests_dropped
334        )?;
335        writeln!(
336            f,
337            "  response : received: {}, from_cache: {}, downloaded: {}",
338            snapshot.responses_received,
339            snapshot.responses_from_cache,
340            snapshot.formatted_bytes()
341        )?;
342        writeln!(
343            f,
344            "  items    : scraped: {}, processed: {}, dropped: {}",
345            snapshot.items_scraped, snapshot.items_processed, snapshot.items_dropped_by_pipeline
346        )?;
347
348        let status_string = if snapshot.response_status_counts.is_empty() {
349            "none".to_string()
350        } else {
351            snapshot
352                .response_status_counts
353                .iter()
354                .map(|(code, count)| format!("{}: {}", code, count))
355                .collect::<Vec<String>>()
356                .join(", ")
357        };
358
359        writeln!(f, "  status   : {}\n", status_string)
360    }
361}