1use crate::error::SpiderError;
4use std::{
5 collections::HashMap,
6 sync::{
7 Arc, Mutex,
8 atomic::{AtomicUsize, Ordering},
9 },
10 time::Instant,
11};
12
13struct StatsSnapshot {
16 requests_enqueued: usize,
17 requests_sent: usize,
18 requests_succeeded: usize,
19 requests_failed: usize,
20 requests_retried: usize,
21 requests_dropped: usize,
22 responses_received: usize,
23 responses_from_cache: usize,
24 total_bytes_downloaded: usize,
25 items_scraped: usize,
26 items_processed: usize,
27 items_dropped_by_pipeline: usize,
28 response_status_counts: HashMap<u16, usize>,
29 elapsed_duration: std::time::Duration,
30}
31
32impl StatsSnapshot {
33 fn formatted_duration(&self) -> String {
34 format!("{:?}", self.elapsed_duration)
35 }
36
37 fn requests_per_second(&self) -> f64 {
38 let total_seconds = self.elapsed_duration.as_secs();
39 if total_seconds > 0 {
40 self.requests_sent as f64 / total_seconds as f64
41 } else {
42 0.0
43 }
44 }
45
46 fn responses_per_second(&self) -> f64 {
47 let total_seconds = self.elapsed_duration.as_secs();
48 if total_seconds > 0 {
49 self.responses_received as f64 / total_seconds as f64
50 } else {
51 0.0
52 }
53 }
54
55 fn items_per_second(&self) -> f64 {
56 let total_seconds = self.elapsed_duration.as_secs();
57 if total_seconds > 0 {
58 self.items_scraped as f64 / total_seconds as f64
59 } else {
60 0.0
61 }
62 }
63
64 fn formatted_bytes(&self) -> String {
65 const KB: usize = 1024;
66 const MB: usize = 1024 * KB;
67 const GB: usize = 1024 * MB;
68
69 if self.total_bytes_downloaded >= GB {
70 format!("{:.2} GB", self.total_bytes_downloaded as f64 / GB as f64)
71 } else if self.total_bytes_downloaded >= MB {
72 format!("{:.2} MB", self.total_bytes_downloaded as f64 / MB as f64)
73 } else if self.total_bytes_downloaded >= KB {
74 format!("{:.2} KB", self.total_bytes_downloaded as f64 / KB as f64)
75 } else {
76 format!("{} B", self.total_bytes_downloaded)
77 }
78 }
79}
80
81#[derive(Debug, Serialize)]
83pub struct StatCollector {
84 #[serde(skip)]
86 pub start_time: Instant,
87
88 pub requests_enqueued: AtomicUsize,
90 pub requests_sent: AtomicUsize,
91 pub requests_succeeded: AtomicUsize,
92 pub requests_failed: AtomicUsize,
93 pub requests_retried: AtomicUsize,
94 pub requests_dropped: AtomicUsize,
95
96 pub responses_received: AtomicUsize,
98 pub responses_from_cache: AtomicUsize,
99 pub response_status_counts: Arc<Mutex<HashMap<u16, usize>>>, pub total_bytes_downloaded: AtomicUsize,
101
102 pub items_scraped: AtomicUsize,
106 pub items_processed: AtomicUsize,
107 pub items_dropped_by_pipeline: AtomicUsize,
108}
109
110impl StatCollector {
111 pub(crate) fn new() -> Self {
113 StatCollector {
114 start_time: Instant::now(),
115 requests_enqueued: AtomicUsize::new(0),
116 requests_sent: AtomicUsize::new(0),
117 requests_succeeded: AtomicUsize::new(0),
118 requests_failed: AtomicUsize::new(0),
119 requests_retried: AtomicUsize::new(0),
120 requests_dropped: AtomicUsize::new(0),
121 responses_received: AtomicUsize::new(0),
122 responses_from_cache: AtomicUsize::new(0),
123 response_status_counts: Arc::new(Mutex::new(HashMap::new())),
124 total_bytes_downloaded: AtomicUsize::new(0),
125 items_scraped: AtomicUsize::new(0),
126 items_processed: AtomicUsize::new(0),
127 items_dropped_by_pipeline: AtomicUsize::new(0),
128 }
129 }
130
131 fn snapshot(&self) -> StatsSnapshot {
134 let status_counts_guard = self.response_status_counts.lock().unwrap();
135 let status_counts: HashMap<u16, usize> = status_counts_guard.clone();
136
137 StatsSnapshot {
138 requests_enqueued: self.requests_enqueued.load(Ordering::SeqCst),
139 requests_sent: self.requests_sent.load(Ordering::SeqCst),
140 requests_succeeded: self.requests_succeeded.load(Ordering::SeqCst),
141 requests_failed: self.requests_failed.load(Ordering::SeqCst),
142 requests_retried: self.requests_retried.load(Ordering::SeqCst),
143 requests_dropped: self.requests_dropped.load(Ordering::SeqCst),
144 responses_received: self.responses_received.load(Ordering::SeqCst),
145 responses_from_cache: self.responses_from_cache.load(Ordering::SeqCst),
146 total_bytes_downloaded: self.total_bytes_downloaded.load(Ordering::SeqCst),
147 items_scraped: self.items_scraped.load(Ordering::SeqCst),
148 items_processed: self.items_processed.load(Ordering::SeqCst),
149 items_dropped_by_pipeline: self.items_dropped_by_pipeline.load(Ordering::SeqCst),
150 response_status_counts: status_counts,
151 elapsed_duration: self.start_time.elapsed(),
152 }
153 }
154
155 pub(crate) fn increment_requests_enqueued(&self) {
157 self.requests_enqueued.fetch_add(1, Ordering::SeqCst);
158 }
159
160 pub(crate) fn increment_requests_sent(&self) {
162 self.requests_sent.fetch_add(1, Ordering::SeqCst);
163 }
164
165 pub(crate) fn increment_requests_succeeded(&self) {
167 self.requests_succeeded.fetch_add(1, Ordering::SeqCst);
168 }
169
170 pub(crate) fn increment_requests_failed(&self) {
172 self.requests_failed.fetch_add(1, Ordering::SeqCst);
173 }
174
175 pub(crate) fn increment_requests_retried(&self) {
177 self.requests_retried.fetch_add(1, Ordering::SeqCst);
178 }
179
180 pub(crate) fn increment_requests_dropped(&self) {
182 self.requests_dropped.fetch_add(1, Ordering::SeqCst);
183 }
184
185 pub(crate) fn increment_responses_received(&self) {
187 self.responses_received.fetch_add(1, Ordering::SeqCst);
188 }
189
190 pub(crate) fn increment_responses_from_cache(&self) {
192 self.responses_from_cache.fetch_add(1, Ordering::SeqCst);
193 }
194
195 pub(crate) fn record_response_status(&self, status_code: u16) {
197 let mut counts = self.response_status_counts.lock().unwrap();
198 *counts.entry(status_code).or_insert(0) += 1;
199 }
200
201 pub(crate) fn add_bytes_downloaded(&self, bytes: usize) {
203 self.total_bytes_downloaded
204 .fetch_add(bytes, Ordering::SeqCst);
205 }
206
207 pub(crate) fn increment_items_scraped(&self) {
209 self.items_scraped.fetch_add(1, Ordering::SeqCst);
210 }
211
212 pub(crate) fn increment_items_processed(&self) {
214 self.items_processed.fetch_add(1, Ordering::SeqCst);
215 }
216
217 pub(crate) fn increment_items_dropped_by_pipeline(&self) {
219 self.items_dropped_by_pipeline
220 .fetch_add(1, Ordering::SeqCst);
221 }
222
223 pub fn to_json_string(&self) -> Result<String, SpiderError> {
225 Ok(serde_json::to_string(self)?)
226 }
227
228 pub fn to_json_string_pretty(&self) -> Result<String, SpiderError> {
230 Ok(serde_json::to_string_pretty(self)?)
231 }
232
233 pub fn to_markdown_string(&self) -> String {
235 let snapshot = self.snapshot();
236
237 let status_codes_list: String = snapshot
238 .response_status_counts
239 .iter()
240 .map(|(code, count)| format!("- **{}**: {}", code, count))
241 .collect::<Vec<String>>()
242 .join("\n");
243 let status_codes_output = if status_codes_list.is_empty() {
244 "N/A".to_string()
245 } else {
246 status_codes_list
247 };
248
249 format!(
250 r#"# Crawl Statistics Report
251
252- **Duration**: {}
253- **Average Speed**: {:.2} req/s, {:.2} resp/s, {:.2} item/s
254
255## Requests
256| Metric | Count |
257|------------|-------|
258| Enqueued | {} |
259| Sent | {} |
260| Succeeded | {} |
261| Failed | {} |
262| Retried | {} |
263| Dropped | {} |
264
265## Responses
266| Metric | Count |
267|------------|-------|
268| Received | {} |
269 From Cache | {} |
270| Downloaded | {} |
271
272## Items
273| Metric | Count |
274|------------|--------|
275| Scraped | {} |
276| Processed | {} |
277| Dropped | {} |
278
279## Status Codes
280{}
281"#,
282 snapshot.formatted_duration(),
283 snapshot.requests_per_second(),
284 snapshot.responses_per_second(),
285 snapshot.items_per_second(),
286 snapshot.requests_enqueued,
287 snapshot.requests_sent,
288 snapshot.requests_succeeded,
289 snapshot.requests_failed,
290 snapshot.requests_retried,
291 snapshot.requests_dropped,
292 snapshot.responses_received,
293 snapshot.responses_from_cache,
294 snapshot.formatted_bytes(),
295 snapshot.items_scraped,
296 snapshot.items_processed,
297 snapshot.items_dropped_by_pipeline,
298 status_codes_output
299 )
300 }
301}
302
303impl Default for StatCollector {
304 fn default() -> Self {
305 Self::new()
306 }
307}
308
309use serde::Serialize;
310
311impl std::fmt::Display for StatCollector {
312 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313 let snapshot = self.snapshot();
314
315 writeln!(f, "\nCrawl Statistics")?;
316 writeln!(f, "----------------")?;
317 writeln!(f, " duration : {}", snapshot.formatted_duration())?;
318 writeln!(
319 f,
320 " speed : req/s: {:.2}, resp/s: {:.2}, item/s: {:.2}",
321 snapshot.requests_per_second(),
322 snapshot.responses_per_second(),
323 snapshot.items_per_second()
324 )?;
325 writeln!(
326 f,
327 " requests : enqueued: {}, sent: {}, ok: {}, fail: {}, retry: {}, drop: {}",
328 snapshot.requests_enqueued,
329 snapshot.requests_sent,
330 snapshot.requests_succeeded,
331 snapshot.requests_failed,
332 snapshot.requests_retried,
333 snapshot.requests_dropped
334 )?;
335 writeln!(
336 f,
337 " response : received: {}, from_cache: {}, downloaded: {}",
338 snapshot.responses_received,
339 snapshot.responses_from_cache,
340 snapshot.formatted_bytes()
341 )?;
342 writeln!(
343 f,
344 " items : scraped: {}, processed: {}, dropped: {}",
345 snapshot.items_scraped, snapshot.items_processed, snapshot.items_dropped_by_pipeline
346 )?;
347
348 let status_string = if snapshot.response_status_counts.is_empty() {
349 "none".to_string()
350 } else {
351 snapshot
352 .response_status_counts
353 .iter()
354 .map(|(code, count)| format!("{}: {}", code, count))
355 .collect::<Vec<String>>()
356 .join(", ")
357 };
358
359 writeln!(f, " status : {}\n", status_string)
360 }
361}