1use spider_util::error::SpiderError;
44use std::{
45 collections::HashMap,
46 sync::{
47 Arc,
48 atomic::{AtomicUsize, Ordering},
49 },
50 time::{Duration, Instant},
51};
52
53struct StatsSnapshot {
56 requests_enqueued: usize,
57 requests_sent: usize,
58 requests_succeeded: usize,
59 requests_failed: usize,
60 requests_retried: usize,
61 requests_dropped: usize,
62 responses_received: usize,
63 responses_from_cache: usize,
64 total_bytes_downloaded: usize,
65 items_scraped: usize,
66 items_processed: usize,
67 items_dropped_by_pipeline: usize,
68 response_status_counts: HashMap<u16, usize>,
69 elapsed_duration: Duration,
70}
71
72impl StatsSnapshot {
73 fn formatted_duration(&self) -> String {
74 format!("{:?}", self.elapsed_duration)
75 }
76
77 fn requests_per_second(&self) -> f64 {
78 let total_seconds = self.elapsed_duration.as_secs();
79 if total_seconds > 0 {
80 self.requests_sent as f64 / total_seconds as f64
81 } else {
82 0.0
83 }
84 }
85
86 fn responses_per_second(&self) -> f64 {
87 let total_seconds = self.elapsed_duration.as_secs();
88 if total_seconds > 0 {
89 self.responses_received as f64 / total_seconds as f64
90 } else {
91 0.0
92 }
93 }
94
95 fn items_per_second(&self) -> f64 {
96 let total_seconds = self.elapsed_duration.as_secs();
97 if total_seconds > 0 {
98 self.items_scraped as f64 / total_seconds as f64
99 } else {
100 0.0
101 }
102 }
103
104 fn formatted_bytes(&self) -> String {
105 const KB: usize = 1024;
106 const MB: usize = 1024 * KB;
107 const GB: usize = 1024 * MB;
108
109 if self.total_bytes_downloaded >= GB {
110 format!("{:.2} GB", self.total_bytes_downloaded as f64 / GB as f64)
111 } else if self.total_bytes_downloaded >= MB {
112 format!("{:.2} MB", self.total_bytes_downloaded as f64 / MB as f64)
113 } else if self.total_bytes_downloaded >= KB {
114 format!("{:.2} KB", self.total_bytes_downloaded as f64 / KB as f64)
115 } else {
116 format!("{} B", self.total_bytes_downloaded)
117 }
118 }
119}
120
121#[derive(Debug, serde::Serialize)]
123pub struct StatCollector {
124 #[serde(skip)]
126 pub start_time: Instant,
127
128 pub requests_enqueued: AtomicUsize,
130 pub requests_sent: AtomicUsize,
131 pub requests_succeeded: AtomicUsize,
132 pub requests_failed: AtomicUsize,
133 pub requests_retried: AtomicUsize,
134 pub requests_dropped: AtomicUsize,
135
136 pub responses_received: AtomicUsize,
138 pub responses_from_cache: AtomicUsize,
139 pub response_status_counts: Arc<dashmap::DashMap<u16, usize>>, pub total_bytes_downloaded: AtomicUsize,
141
142 pub items_scraped: AtomicUsize,
146 pub items_processed: AtomicUsize,
147 pub items_dropped_by_pipeline: AtomicUsize,
148
149 pub request_times: Arc<dashmap::DashMap<String, Duration>>,
151}
152
153impl StatCollector {
154 pub(crate) fn new() -> Self {
156 StatCollector {
157 start_time: Instant::now(),
158 requests_enqueued: AtomicUsize::new(0),
159 requests_sent: AtomicUsize::new(0),
160 requests_succeeded: AtomicUsize::new(0),
161 requests_failed: AtomicUsize::new(0),
162 requests_retried: AtomicUsize::new(0),
163 requests_dropped: AtomicUsize::new(0),
164 responses_received: AtomicUsize::new(0),
165 responses_from_cache: AtomicUsize::new(0),
166 response_status_counts: Arc::new(dashmap::DashMap::new()),
167 total_bytes_downloaded: AtomicUsize::new(0),
168 items_scraped: AtomicUsize::new(0),
169 items_processed: AtomicUsize::new(0),
170 items_dropped_by_pipeline: AtomicUsize::new(0),
171 request_times: Arc::new(dashmap::DashMap::new()),
172 }
173 }
174
175 fn snapshot(&self) -> StatsSnapshot {
178 let mut status_counts: HashMap<u16, usize> = HashMap::new();
179 for entry in self.response_status_counts.iter() {
180 let (key, value) = entry.pair();
181 status_counts.insert(*key, *value);
182 }
183
184 StatsSnapshot {
185 requests_enqueued: self.requests_enqueued.load(Ordering::SeqCst),
186 requests_sent: self.requests_sent.load(Ordering::SeqCst),
187 requests_succeeded: self.requests_succeeded.load(Ordering::SeqCst),
188 requests_failed: self.requests_failed.load(Ordering::SeqCst),
189 requests_retried: self.requests_retried.load(Ordering::SeqCst),
190 requests_dropped: self.requests_dropped.load(Ordering::SeqCst),
191 responses_received: self.responses_received.load(Ordering::SeqCst),
192 responses_from_cache: self.responses_from_cache.load(Ordering::SeqCst),
193 total_bytes_downloaded: self.total_bytes_downloaded.load(Ordering::SeqCst),
194 items_scraped: self.items_scraped.load(Ordering::SeqCst),
195 items_processed: self.items_processed.load(Ordering::SeqCst),
196 items_dropped_by_pipeline: self.items_dropped_by_pipeline.load(Ordering::SeqCst),
197 response_status_counts: status_counts,
198 elapsed_duration: self.start_time.elapsed(),
199 }
200 }
201
202 pub(crate) fn increment_requests_enqueued(&self) {
204 self.requests_enqueued.fetch_add(1, Ordering::SeqCst);
205 }
206
207 pub(crate) fn increment_requests_sent(&self) {
209 self.requests_sent.fetch_add(1, Ordering::SeqCst);
210 }
211
212 pub(crate) fn increment_requests_succeeded(&self) {
214 self.requests_succeeded.fetch_add(1, Ordering::SeqCst);
215 }
216
217 pub(crate) fn increment_requests_failed(&self) {
219 self.requests_failed.fetch_add(1, Ordering::SeqCst);
220 }
221
222 pub(crate) fn increment_requests_retried(&self) {
224 self.requests_retried.fetch_add(1, Ordering::SeqCst);
225 }
226
227 pub(crate) fn increment_requests_dropped(&self) {
229 self.requests_dropped.fetch_add(1, Ordering::SeqCst);
230 }
231
232 pub(crate) fn increment_responses_received(&self) {
234 self.responses_received.fetch_add(1, Ordering::SeqCst);
235 }
236
237 pub(crate) fn increment_responses_from_cache(&self) {
239 self.responses_from_cache.fetch_add(1, Ordering::SeqCst);
240 }
241
242 pub(crate) fn record_response_status(&self, status_code: u16) {
244 *self.response_status_counts.entry(status_code).or_insert(0) += 1;
245 }
246
247 pub(crate) fn add_bytes_downloaded(&self, bytes: usize) {
249 self.total_bytes_downloaded
250 .fetch_add(bytes, Ordering::SeqCst);
251 }
252
253 pub(crate) fn increment_items_scraped(&self) {
255 self.items_scraped.fetch_add(1, Ordering::SeqCst);
256 }
257
258 pub(crate) fn increment_items_processed(&self) {
260 self.items_processed.fetch_add(1, Ordering::SeqCst);
261 }
262
263 pub(crate) fn increment_items_dropped_by_pipeline(&self) {
265 self.items_dropped_by_pipeline
266 .fetch_add(1, Ordering::SeqCst);
267 }
268
269 #[allow(dead_code)]
271 pub(crate) fn record_request_time(&self, url: &str, duration: Duration) {
272 self.request_times.insert(url.to_string(), duration);
273 }
274
275 pub fn to_json_string(&self) -> Result<String, SpiderError> {
277 Ok(serde_json::to_string(self)?)
278 }
279
280 pub fn to_json_string_pretty(&self) -> Result<String, SpiderError> {
282 Ok(serde_json::to_string_pretty(self)?)
283 }
284
285 pub fn to_markdown_string(&self) -> String {
287 let snapshot = self.snapshot();
288
289 let status_codes_list: String = snapshot
290 .response_status_counts
291 .iter()
292 .map(|(code, count)| format!("- **{}**: {}", code, count))
293 .collect::<Vec<String>>()
294 .join("\n");
295 let status_codes_output = if status_codes_list.is_empty() {
296 "N/A".to_string()
297 } else {
298 status_codes_list
299 };
300
301 format!(
302 r#"# Crawl Statistics Report
303
304- **Duration**: {}
305- **Average Speed**: {:.2} req/s, {:.2} resp/s, {:.2} item/s
306
307## Requests
308| Metric | Count |
309|------------|-------|
310| Enqueued | {} |
311| Sent | {} |
312| Succeeded | {} |
313| Failed | {} |
314| Retried | {} |
315| Dropped | {} |
316
317## Responses
318| Metric | Count |
319|------------|-------|
320| Received | {} |
321 From Cache | {} |
322| Downloaded | {} |
323
324## Items
325| Metric | Count |
326|------------|--------|
327| Scraped | {} |
328| Processed | {} |
329| Dropped | {} |
330
331## Status Codes
332{}
333"#,
334 snapshot.formatted_duration(),
335 snapshot.requests_per_second(),
336 snapshot.responses_per_second(),
337 snapshot.items_per_second(),
338 snapshot.requests_enqueued,
339 snapshot.requests_sent,
340 snapshot.requests_succeeded,
341 snapshot.requests_failed,
342 snapshot.requests_retried,
343 snapshot.requests_dropped,
344 snapshot.responses_received,
345 snapshot.responses_from_cache,
346 snapshot.formatted_bytes(),
347 snapshot.items_scraped,
348 snapshot.items_processed,
349 snapshot.items_dropped_by_pipeline,
350 status_codes_output
351 )
352 }
353}
354
355impl Default for StatCollector {
356 fn default() -> Self {
357 Self::new()
358 }
359}
360
361impl std::fmt::Display for StatCollector {
362 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
363 let snapshot = self.snapshot();
364
365 writeln!(f, "\nCrawl Statistics")?;
366 writeln!(f, "----------------")?;
367 writeln!(f, " duration : {}", snapshot.formatted_duration())?;
368 writeln!(
369 f,
370 " speed : req/s: {:.2}, resp/s: {:.2}, item/s: {:.2}",
371 snapshot.requests_per_second(),
372 snapshot.responses_per_second(),
373 snapshot.items_per_second()
374 )?;
375 writeln!(
376 f,
377 " requests : enqueued: {}, sent: {}, ok: {}, fail: {}, retry: {}, drop: {}",
378 snapshot.requests_enqueued,
379 snapshot.requests_sent,
380 snapshot.requests_succeeded,
381 snapshot.requests_failed,
382 snapshot.requests_retried,
383 snapshot.requests_dropped
384 )?;
385 writeln!(
386 f,
387 " response : received: {}, from_cache: {}, downloaded: {}",
388 snapshot.responses_received,
389 snapshot.responses_from_cache,
390 snapshot.formatted_bytes()
391 )?;
392 writeln!(
393 f,
394 " items : scraped: {}, processed: {}, dropped: {}",
395 snapshot.items_scraped, snapshot.items_processed, snapshot.items_dropped_by_pipeline
396 )?;
397
398 let status_string = if snapshot.response_status_counts.is_empty() {
399 "none".to_string()
400 } else {
401 snapshot
402 .response_status_counts
403 .iter()
404 .map(|(code, count)| format!("{}: {}", code, count))
405 .collect::<Vec<String>>()
406 .join(", ")
407 };
408
409 writeln!(f, " status : {}\n", status_string)
410 }
411}