1use std::{
4 collections::HashMap,
5 sync::{
6 Arc, Mutex,
7 atomic::{AtomicUsize, Ordering},
8 },
9 time::Instant, };
11use crate::error::SpiderError;
12
13struct StatsSnapshot {
16 requests_enqueued: usize,
17 requests_sent: usize,
18 requests_succeeded: usize,
19 requests_failed: usize,
20 requests_retried: usize,
21 requests_dropped: usize,
22 responses_received: usize,
23 responses_from_cache: usize,
24 total_bytes_downloaded: usize,
25 items_scraped: usize,
26 items_processed: usize,
27 items_dropped_by_pipeline: usize,
28 response_status_counts: HashMap<u16, usize>,
29 elapsed_duration: std::time::Duration,
30}
31
32impl StatsSnapshot {
33 fn formatted_duration(&self) -> String {
34 format!("{:?}", self.elapsed_duration)
35 }
36
37 fn requests_per_second(&self) -> f64 {
38 let total_seconds = self.elapsed_duration.as_secs();
39 if total_seconds > 0 { self.requests_sent as f64 / total_seconds as f64 } else { 0.0 }
40 }
41
42 fn responses_per_second(&self) -> f64 {
43 let total_seconds = self.elapsed_duration.as_secs();
44 if total_seconds > 0 { self.responses_received as f64 / total_seconds as f64 } else { 0.0 }
45 }
46
47 fn items_per_second(&self) -> f64 {
48 let total_seconds = self.elapsed_duration.as_secs();
49 if total_seconds > 0 { self.items_scraped as f64 / total_seconds as f64 } else { 0.0 }
50 }
51
52 fn formatted_bytes(&self) -> String {
53 const KB: usize = 1024;
54 const MB: usize = 1024 * KB;
55 const GB: usize = 1024 * MB;
56
57 if self.total_bytes_downloaded >= GB {
58 format!("{:.2} GB", self.total_bytes_downloaded as f64 / GB as f64)
59 } else if self.total_bytes_downloaded >= MB {
60 format!("{:.2} MB", self.total_bytes_downloaded as f64 / MB as f64)
61 } else if self.total_bytes_downloaded >= KB {
62 format!("{:.2} KB", self.total_bytes_downloaded as f64 / KB as f64)
63 } else {
64 format!("{} B", self.total_bytes_downloaded)
65 }
66 }
67}
68
69
70#[derive(Debug, Serialize)]
72pub struct StatCollector {
73 #[serde(skip)] pub start_time: Instant,
76 pub requests_enqueued: AtomicUsize,
78 pub requests_sent: AtomicUsize,
79 pub requests_succeeded: AtomicUsize,
80 pub requests_failed: AtomicUsize,
81 pub requests_retried: AtomicUsize,
82 pub requests_dropped: AtomicUsize,
83
84 pub responses_received: AtomicUsize,
86 pub responses_from_cache: AtomicUsize,
87 pub response_status_counts: Arc<Mutex<HashMap<u16, AtomicUsize>>>, pub total_bytes_downloaded: AtomicUsize,
89 pub items_scraped: AtomicUsize,
93 pub items_processed: AtomicUsize,
94 pub items_dropped_by_pipeline: AtomicUsize,
95}
96
97impl StatCollector {
98 pub(crate) fn new() -> Self {
100 StatCollector {
101 start_time: Instant::now(),
102 requests_enqueued: AtomicUsize::new(0),
103 requests_sent: AtomicUsize::new(0),
104 requests_succeeded: AtomicUsize::new(0),
105 requests_failed: AtomicUsize::new(0),
106 requests_retried: AtomicUsize::new(0),
107 requests_dropped: AtomicUsize::new(0),
108 responses_received: AtomicUsize::new(0),
109 responses_from_cache: AtomicUsize::new(0),
110 response_status_counts: Arc::new(Mutex::new(HashMap::new())),
111 total_bytes_downloaded: AtomicUsize::new(0),
112 items_scraped: AtomicUsize::new(0),
113 items_processed: AtomicUsize::new(0),
114 items_dropped_by_pipeline: AtomicUsize::new(0),
115 }
116 }
117
118 fn snapshot(&self) -> StatsSnapshot {
121 let status_counts_guard = self.response_status_counts.lock().unwrap();
122 let status_counts: HashMap<u16, usize> = status_counts_guard
123 .iter()
124 .map(|(code, count)| (*code, count.load(Ordering::SeqCst)))
125 .collect();
126
127 StatsSnapshot {
128 requests_enqueued: self.requests_enqueued.load(Ordering::SeqCst),
129 requests_sent: self.requests_sent.load(Ordering::SeqCst),
130 requests_succeeded: self.requests_succeeded.load(Ordering::SeqCst),
131 requests_failed: self.requests_failed.load(Ordering::SeqCst),
132 requests_retried: self.requests_retried.load(Ordering::SeqCst),
133 requests_dropped: self.requests_dropped.load(Ordering::SeqCst),
134 responses_received: self.responses_received.load(Ordering::SeqCst),
135 responses_from_cache: self.responses_from_cache.load(Ordering::SeqCst),
136 total_bytes_downloaded: self.total_bytes_downloaded.load(Ordering::SeqCst),
137 items_scraped: self.items_scraped.load(Ordering::SeqCst),
138 items_processed: self.items_processed.load(Ordering::SeqCst),
139 items_dropped_by_pipeline: self.items_dropped_by_pipeline.load(Ordering::SeqCst),
140 response_status_counts: status_counts,
141 elapsed_duration: self.start_time.elapsed(),
142 }
143 }
144
145 pub(crate) fn increment_requests_enqueued(&self) {
147 self.requests_enqueued.fetch_add(1, Ordering::SeqCst);
148 }
149
150 pub(crate) fn increment_requests_sent(&self) {
152 self.requests_sent.fetch_add(1, Ordering::SeqCst);
153 }
154
155 pub(crate) fn increment_requests_succeeded(&self) {
157 self.requests_succeeded.fetch_add(1, Ordering::SeqCst);
158 }
159
160 pub(crate) fn increment_requests_failed(&self) {
162 self.requests_failed.fetch_add(1, Ordering::SeqCst);
163 }
164
165 pub(crate) fn increment_requests_retried(&self) {
167 self.requests_retried.fetch_add(1, Ordering::SeqCst);
168 }
169
170 pub(crate) fn increment_requests_dropped(&self) {
172 self.requests_dropped.fetch_add(1, Ordering::SeqCst);
173 }
174
175 pub(crate) fn increment_responses_received(&self) {
177 self.responses_received.fetch_add(1, Ordering::SeqCst);
178 }
179
180 pub(crate) fn increment_responses_from_cache(&self) {
182 self.responses_from_cache.fetch_add(1, Ordering::SeqCst);
183 }
184
185 pub(crate) fn record_response_status(&self, status_code: u16) {
187 let mut counts = self.response_status_counts.lock().unwrap();
188 counts
189 .entry(status_code)
190 .or_insert_with(|| AtomicUsize::new(0))
191 .fetch_add(1, Ordering::SeqCst);
192 }
193
194 pub(crate) fn add_bytes_downloaded(&self, bytes: usize) {
196 self.total_bytes_downloaded
197 .fetch_add(bytes, Ordering::SeqCst);
198 }
199
200 pub(crate) fn increment_items_scraped(&self) {
202 self.items_scraped.fetch_add(1, Ordering::SeqCst);
203 }
204
205 pub(crate) fn increment_items_processed(&self) {
207 self.items_processed.fetch_add(1, Ordering::SeqCst);
208 }
209
210 pub(crate) fn increment_items_dropped_by_pipeline(&self) {
212 self.items_dropped_by_pipeline
213 .fetch_add(1, Ordering::SeqCst);
214 }
215
216 pub fn to_json_string(&self) -> Result<String, SpiderError> {
218 Ok(serde_json::to_string(self)?)
219 }
220
221 pub fn to_json_string_pretty(&self) -> Result<String, SpiderError> {
223 Ok(serde_json::to_string_pretty(self)?)
224 }
225
226 pub fn to_markdown_string(&self) -> String {
228 let snapshot = self.snapshot();
229
230 let status_codes_list: String = snapshot.response_status_counts
231 .iter()
232 .map(|(code, count)| format!("- **{}**: {}", code, count))
233 .collect::<Vec<String>>()
234 .join("\n");
235 let status_codes_output = if status_codes_list.is_empty() { "N/A".to_string() } else { status_codes_list };
236
237 format!(
238 r#"# Crawl Statistics Report
239
240- **Duration**: {}
241- **Average Speed**: {:.2} req/s, {:.2} resp/s, {:.2} item/s
242
243## Requests
244| Metric | Count |
245|------------|-------|
246| Enqueued | {} |
247| Sent | {} |
248| Succeeded | {} |
249| Failed | {} |
250| Retried | {} |
251| Dropped | {} |
252
253## Responses
254| Metric | Count |
255|------------|-------|
256| Received | {} |
257 From Cache | {} |
258| Downloaded | {} |
259
260## Items
261| Metric | Count |
262|------------|--------|
263| Scraped | {} |
264| Processed | {} |
265| Dropped | {} |
266
267## Status Codes
268{}
269"#,
270 snapshot.formatted_duration(),
271 snapshot.requests_per_second(), snapshot.responses_per_second(), snapshot.items_per_second(),
272 snapshot.requests_enqueued,
273 snapshot.requests_sent,
274 snapshot.requests_succeeded,
275 snapshot.requests_failed,
276 snapshot.requests_retried,
277 snapshot.requests_dropped,
278 snapshot.responses_received,
279 snapshot.responses_from_cache,
280 snapshot.formatted_bytes(),
281 snapshot.items_scraped,
282 snapshot.items_processed,
283 snapshot.items_dropped_by_pipeline,
284 status_codes_output
285 )
286 }
287}
288
289impl Default for StatCollector {
290 fn default() -> Self {
291 Self::new()
292 }
293}
294
295use serde::Serialize;
296
297impl std::fmt::Display for StatCollector {
298 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299 let snapshot = self.snapshot();
300
301 writeln!(f, "\nCrawl Statistics")?;
302 writeln!(f, "----------------")?;
303 writeln!(f, " duration : {}", snapshot.formatted_duration())?;
304 writeln!(f, " speed : req/s: {:.2}, resp/s: {:.2}, item/s: {:.2}",
305 snapshot.requests_per_second(), snapshot.responses_per_second(), snapshot.items_per_second())?;
306 writeln!(f, " requests : enqueued: {}, sent: {}, ok: {}, fail: {}, retry: {}, drop: {}",
307 snapshot.requests_enqueued, snapshot.requests_sent, snapshot.requests_succeeded,
308 snapshot.requests_failed, snapshot.requests_retried, snapshot.requests_dropped)?;
309 writeln!(f, " response : received: {}, from_cache: {}, downloaded: {}",
310 snapshot.responses_received, snapshot.responses_from_cache, snapshot.formatted_bytes())?;
311 writeln!(f, " items : scraped: {}, processed: {}, dropped: {}",
312 snapshot.items_scraped, snapshot.items_processed, snapshot.items_dropped_by_pipeline)?;
313
314 let status_string = if snapshot.response_status_counts.is_empty() {
315 "none".to_string()
316 } else {
317 snapshot.response_status_counts
318 .iter()
319 .map(|(code, count)| format!("{}: {}", code, count))
320 .collect::<Vec<String>>()
321 .join(", ")
322 };
323
324 writeln!(f, " status : {}\n", status_string)
325 }
326}