1use spider_util::error::SpiderError;
44use spider_util::metrics::ExpMovingAverage;
45use std::{
46 collections::HashMap,
47 sync::{
48 Arc,
49 atomic::{AtomicUsize, Ordering},
50 },
51 time::{Duration, Instant},
52};
53
54struct StatsSnapshot {
57 requests_enqueued: usize,
58 requests_sent: usize,
59 requests_succeeded: usize,
60 requests_failed: usize,
61 requests_retried: usize,
62 requests_dropped: usize,
63 responses_received: usize,
64 responses_from_cache: usize,
65 total_bytes_downloaded: usize,
66 items_scraped: usize,
67 items_processed: usize,
68 items_dropped_by_pipeline: usize,
69 response_status_counts: HashMap<u16, usize>,
70 elapsed_duration: Duration,
71 average_request_time: Option<Duration>,
72 fastest_request_time: Option<Duration>,
73 slowest_request_time: Option<Duration>,
74 request_time_count: usize,
75 average_parsing_time: Option<Duration>,
76 fastest_parsing_time: Option<Duration>,
77 slowest_parsing_time: Option<Duration>,
78 parsing_time_count: usize,
79
80 recent_requests_per_second: f64,
82 recent_responses_per_second: f64,
83 recent_items_per_second: f64,
84}
85
86impl StatsSnapshot {
87 fn formatted_duration(&self) -> String {
88 format!("{:?}", self.elapsed_duration)
89 }
90
91 fn formatted_request_time(&self, duration: Option<Duration>) -> String {
92 match duration {
93 Some(d) => {
94 if d.as_millis() < 1000 {
95 format!("{} ms", d.as_millis())
96 } else {
97 format!("{:.2} s", d.as_secs_f64())
98 }
99 }
100 None => "N/A".to_string(),
101 }
102 }
103
104 fn requests_per_second(&self) -> f64 {
105 let elapsed = self.elapsed_duration.as_secs_f64();
106 if elapsed > 0.0 {
107 self.requests_sent as f64 / elapsed
108 } else {
109 0.0
110 }
111 }
112
113 fn responses_per_second(&self) -> f64 {
114 let elapsed = self.elapsed_duration.as_secs_f64();
115 if elapsed > 0.0 {
116 self.responses_received as f64 / elapsed
117 } else {
118 0.0
119 }
120 }
121
122 fn items_per_second(&self) -> f64 {
123 let elapsed = self.elapsed_duration.as_secs_f64();
124 if elapsed > 0.0 {
125 self.items_scraped as f64 / elapsed
126 } else {
127 0.0
128 }
129 }
130
131 fn formatted_bytes(&self) -> String {
132 const KB: usize = 1024;
133 const MB: usize = 1024 * KB;
134 const GB: usize = 1024 * MB;
135
136 if self.total_bytes_downloaded >= GB {
137 format!("{:.2} GB", self.total_bytes_downloaded as f64 / GB as f64)
138 } else if self.total_bytes_downloaded >= MB {
139 format!("{:.2} MB", self.total_bytes_downloaded as f64 / MB as f64)
140 } else if self.total_bytes_downloaded >= KB {
141 format!("{:.2} KB", self.total_bytes_downloaded as f64 / KB as f64)
142 } else {
143 format!("{} B", self.total_bytes_downloaded)
144 }
145 }
146}
147
148#[derive(Debug, serde::Serialize)]
150pub struct StatCollector {
151 #[serde(skip)]
153 pub start_time: Instant,
154
155 pub requests_enqueued: AtomicUsize,
157 pub requests_sent: AtomicUsize,
158 pub requests_succeeded: AtomicUsize,
159 pub requests_failed: AtomicUsize,
160 pub requests_retried: AtomicUsize,
161 pub requests_dropped: AtomicUsize,
162
163 pub responses_received: AtomicUsize,
165 pub responses_from_cache: AtomicUsize,
166 pub response_status_counts: Arc<dashmap::DashMap<u16, usize>>, pub total_bytes_downloaded: AtomicUsize,
168
169 pub items_scraped: AtomicUsize,
173 pub items_processed: AtomicUsize,
174 pub items_dropped_by_pipeline: AtomicUsize,
175
176 pub request_times: Arc<dashmap::DashMap<String, Duration>>,
178 pub parsing_times: Arc<dashmap::DashMap<String, Duration>>,
179
180 #[serde(skip)]
182 requests_sent_ema: ExpMovingAverage,
183 #[serde(skip)]
184 responses_received_ema: ExpMovingAverage,
185 #[serde(skip)]
186 items_scraped_ema: ExpMovingAverage,
187}
188
189impl StatCollector {
190 pub(crate) fn new() -> Self {
192 StatCollector {
193 start_time: Instant::now(),
194 requests_enqueued: AtomicUsize::new(0),
195 requests_sent: AtomicUsize::new(0),
196 requests_succeeded: AtomicUsize::new(0),
197 requests_failed: AtomicUsize::new(0),
198 requests_retried: AtomicUsize::new(0),
199 requests_dropped: AtomicUsize::new(0),
200 responses_received: AtomicUsize::new(0),
201 responses_from_cache: AtomicUsize::new(0),
202 response_status_counts: Arc::new(dashmap::DashMap::new()),
203 total_bytes_downloaded: AtomicUsize::new(0),
204 items_scraped: AtomicUsize::new(0),
205 items_processed: AtomicUsize::new(0),
206 items_dropped_by_pipeline: AtomicUsize::new(0),
207 request_times: Arc::new(dashmap::DashMap::new()),
208 parsing_times: Arc::new(dashmap::DashMap::new()),
209 requests_sent_ema: ExpMovingAverage::new(0.2),
211 responses_received_ema: ExpMovingAverage::new(0.2),
212 items_scraped_ema: ExpMovingAverage::new(0.2),
213 }
214 }
215
216 fn snapshot(&self) -> StatsSnapshot {
219 let mut status_counts: HashMap<u16, usize> = HashMap::new();
220 for entry in self.response_status_counts.iter() {
221 let (key, value) = entry.pair();
222 status_counts.insert(*key, *value);
223 }
224
225 let recent_requests_per_second = self.requests_sent_ema.get_rate();
227 let recent_responses_per_second = self.responses_received_ema.get_rate();
228 let recent_items_per_second = self.items_scraped_ema.get_rate();
229
230 StatsSnapshot {
231 requests_enqueued: self.requests_enqueued.load(Ordering::SeqCst),
232 requests_sent: self.requests_sent.load(Ordering::SeqCst),
233 requests_succeeded: self.requests_succeeded.load(Ordering::SeqCst),
234 requests_failed: self.requests_failed.load(Ordering::SeqCst),
235 requests_retried: self.requests_retried.load(Ordering::SeqCst),
236 requests_dropped: self.requests_dropped.load(Ordering::SeqCst),
237 responses_received: self.responses_received.load(Ordering::SeqCst),
238 responses_from_cache: self.responses_from_cache.load(Ordering::SeqCst),
239 total_bytes_downloaded: self.total_bytes_downloaded.load(Ordering::SeqCst),
240 items_scraped: self.items_scraped.load(Ordering::SeqCst),
241 items_processed: self.items_processed.load(Ordering::SeqCst),
242 items_dropped_by_pipeline: self.items_dropped_by_pipeline.load(Ordering::SeqCst),
243 response_status_counts: status_counts,
244 elapsed_duration: self.start_time.elapsed(),
245 average_request_time: self.average_request_time(),
246 fastest_request_time: self.fastest_request_time(),
247 slowest_request_time: self.slowest_request_time(),
248 request_time_count: self.request_time_count(),
249 average_parsing_time: self.average_parsing_time(),
250 fastest_parsing_time: self.fastest_parsing_time(),
251 slowest_parsing_time: self.slowest_parsing_time(),
252 parsing_time_count: self.parsing_time_count(),
253
254 recent_requests_per_second,
256 recent_responses_per_second,
257 recent_items_per_second,
258 }
259 }
260
261 pub(crate) fn increment_requests_enqueued(&self) {
263 self.requests_enqueued.fetch_add(1, Ordering::SeqCst);
264 }
265
266 pub(crate) fn increment_requests_sent(&self) {
268 self.requests_sent.fetch_add(1, Ordering::SeqCst);
269 self.requests_sent_ema.update(1);
271 }
272
273 pub(crate) fn increment_requests_succeeded(&self) {
275 self.requests_succeeded.fetch_add(1, Ordering::SeqCst);
276 }
277
278 pub(crate) fn increment_requests_failed(&self) {
280 self.requests_failed.fetch_add(1, Ordering::SeqCst);
281 }
282
283 pub(crate) fn increment_requests_retried(&self) {
285 self.requests_retried.fetch_add(1, Ordering::SeqCst);
286 }
287
288 pub(crate) fn increment_requests_dropped(&self) {
290 self.requests_dropped.fetch_add(1, Ordering::SeqCst);
291 }
292
293 pub(crate) fn increment_responses_received(&self) {
295 self.responses_received.fetch_add(1, Ordering::SeqCst);
296 self.responses_received_ema.update(1);
298 }
299
300 pub(crate) fn increment_responses_from_cache(&self) {
302 self.responses_from_cache.fetch_add(1, Ordering::SeqCst);
303 }
304
305 pub(crate) fn record_response_status(&self, status_code: u16) {
307 *self.response_status_counts.entry(status_code).or_insert(0) += 1;
308 }
309
310 pub(crate) fn add_bytes_downloaded(&self, bytes: usize) {
312 self.total_bytes_downloaded
313 .fetch_add(bytes, Ordering::SeqCst);
314 }
315
316 pub(crate) fn increment_items_scraped(&self) {
318 self.items_scraped.fetch_add(1, Ordering::SeqCst);
319 self.items_scraped_ema.update(1);
321 }
322
323 pub(crate) fn increment_items_processed(&self) {
325 self.items_processed.fetch_add(1, Ordering::SeqCst);
326 }
327
328 pub(crate) fn increment_items_dropped_by_pipeline(&self) {
330 self.items_dropped_by_pipeline
331 .fetch_add(1, Ordering::SeqCst);
332 }
333
334 pub fn record_request_time(&self, url: &str, duration: Duration) {
336 self.request_times.insert(url.to_string(), duration);
337 }
338
339 pub fn average_request_time(&self) -> Option<Duration> {
341 let times: Vec<Duration> = self
342 .request_times
343 .iter()
344 .map(|entry| *entry.value())
345 .collect();
346 if times.is_empty() {
347 None
348 } else {
349 let total_nanos: u128 = times.iter().map(|d| d.as_nanos()).sum();
350 let avg_nanos = total_nanos / times.len() as u128;
351 Some(Duration::from_nanos(avg_nanos as u64))
352 }
353 }
354
355 pub fn fastest_request_time(&self) -> Option<Duration> {
357 self.request_times.iter().map(|entry| *entry.value()).min()
358 }
359
360 pub fn slowest_request_time(&self) -> Option<Duration> {
362 self.request_times.iter().map(|entry| *entry.value()).max()
363 }
364
365 pub fn request_time_count(&self) -> usize {
367 self.request_times.len()
368 }
369
370 pub fn get_request_time(&self, url: &str) -> Option<Duration> {
372 self.request_times
373 .get(url)
374 .map(|duration| *duration.value())
375 }
376
377 pub fn get_all_request_times(&self) -> Vec<(String, Duration)> {
379 self.request_times
380 .iter()
381 .map(|entry| (entry.key().clone(), *entry.value()))
382 .collect()
383 }
384
385 pub fn record_parsing_time(&self, duration: Duration) {
387 let id = format!("parse_{}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos());
388 self.parsing_times.insert(id, duration);
389 }
390
391 pub fn average_parsing_time(&self) -> Option<Duration> {
393 let times: Vec<Duration> = self
394 .parsing_times
395 .iter()
396 .map(|entry| *entry.value())
397 .collect();
398 if times.is_empty() {
399 None
400 } else {
401 let total_nanos: u128 = times.iter().map(|d| d.as_nanos()).sum();
402 let avg_nanos = total_nanos / times.len() as u128;
403 Some(Duration::from_nanos(avg_nanos as u64))
404 }
405 }
406
407 pub fn fastest_parsing_time(&self) -> Option<Duration> {
409 self.parsing_times.iter().map(|entry| *entry.value()).min()
410 }
411
412 pub fn slowest_parsing_time(&self) -> Option<Duration> {
414 self.parsing_times.iter().map(|entry| *entry.value()).max()
415 }
416
417 pub fn parsing_time_count(&self) -> usize {
419 self.parsing_times.len()
420 }
421
422 pub fn clear_request_times(&self) {
424 self.request_times.clear();
425 }
426
427 pub fn to_json_string(&self) -> Result<String, SpiderError> {
429 Ok(serde_json::to_string(self)?)
430 }
431
432 pub fn to_json_string_pretty(&self) -> Result<String, SpiderError> {
434 Ok(serde_json::to_string_pretty(self)?)
435 }
436
437 pub fn to_markdown_string(&self) -> String {
439 let snapshot = self.snapshot();
440
441 let status_codes_list: String = snapshot
442 .response_status_counts
443 .iter()
444 .map(|(code, count)| format!("- **{}**: {}", code, count))
445 .collect::<Vec<String>>()
446 .join("\n");
447 let status_codes_output = if status_codes_list.is_empty() {
448 "N/A".to_string()
449 } else {
450 status_codes_list
451 };
452
453 format!(
454 r#"# Crawl Statistics Report
455
456- **Duration**: {}
457- **Current Rate** (last 10s): {:.2} req/s, {:.2} resp/s, {:.2} item/s
458- **Overall Rate** (total): {:.2} req/s, {:.2} resp/s, {:.2} item/s
459
460## Requests
461| Metric | Count |
462|------------|-------|
463| Enqueued | {} |
464| Sent | {} |
465| Succeeded | {} |
466| Failed | {} |
467| Retried | {} |
468| Dropped | {} |
469
470## Responses
471| Metric | Count |
472|------------|-------|
473| Received | {} |
474 From Cache | {} |
475| Downloaded | {} |
476
477## Items
478| Metric | Count |
479|------------|--------|
480| Scraped | {} |
481| Processed | {} |
482| Dropped | {} |
483
484## Request Times
485| Metric | Value |
486|------------------|------------|
487| Average Time | {} |
488| Fastest Request | {} |
489| Slowest Request | {} |
490| Total Recorded | {} |
491
492## Parsing Times
493| Metric | Value |
494|------------------|------------|
495| Average Time | {} |
496| Fastest Parse | {} |
497| Slowest Parse | {} |
498| Total Recorded | {} |
499
500## Status Codes
501{}
502"#,
503 snapshot.formatted_duration(),
504 snapshot.requests_per_second(),
505 snapshot.responses_per_second(),
506 snapshot.items_per_second(),
507 {
509 let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
510 if total_seconds > 0.0 { snapshot.requests_sent as f64 / total_seconds } else { 0.0 }
511 },
512 {
513 let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
514 if total_seconds > 0.0 { snapshot.responses_received as f64 / total_seconds } else { 0.0 }
515 },
516 {
517 let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
518 if total_seconds > 0.0 { snapshot.items_scraped as f64 / total_seconds } else { 0.0 }
519 },
520 snapshot.requests_enqueued,
521 snapshot.requests_sent,
522 snapshot.requests_succeeded,
523 snapshot.requests_failed,
524 snapshot.requests_retried,
525 snapshot.requests_dropped,
526 snapshot.responses_received,
527 snapshot.responses_from_cache,
528 snapshot.formatted_bytes(),
529 snapshot.items_scraped,
530 snapshot.items_processed,
531 snapshot.items_dropped_by_pipeline,
532 snapshot.formatted_request_time(snapshot.average_request_time),
533 snapshot.formatted_request_time(snapshot.fastest_request_time),
534 snapshot.formatted_request_time(snapshot.slowest_request_time),
535 snapshot.request_time_count,
536 snapshot.formatted_request_time(snapshot.average_parsing_time),
537 snapshot.formatted_request_time(snapshot.fastest_parsing_time),
538 snapshot.formatted_request_time(snapshot.slowest_parsing_time),
539 snapshot.parsing_time_count,
540 status_codes_output
541 )
542 }
543}
544
545impl Default for StatCollector {
546 fn default() -> Self {
547 Self::new()
548 }
549}
550
551impl std::fmt::Display for StatCollector {
552 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
553 let snapshot = self.snapshot();
554
555 writeln!(f, "\nCrawl Statistics")?;
556 writeln!(f, "----------------")?;
557 writeln!(f, " duration : {}", snapshot.formatted_duration())?;
558 writeln!(
559 f,
560 " speed : req/s: {:.2}, resp/s: {:.2}, item/s: {:.2}",
561 snapshot.recent_requests_per_second,
562 snapshot.recent_responses_per_second,
563 snapshot.recent_items_per_second
564 )?;
565 writeln!(
566 f,
567 " requests : enqueued: {}, sent: {}, ok: {}, fail: {}, retry: {}, drop: {}",
568 snapshot.requests_enqueued,
569 snapshot.requests_sent,
570 snapshot.requests_succeeded,
571 snapshot.requests_failed,
572 snapshot.requests_retried,
573 snapshot.requests_dropped
574 )?;
575 writeln!(
576 f,
577 " response : received: {}, from_cache: {}, downloaded: {}",
578 snapshot.responses_received,
579 snapshot.responses_from_cache,
580 snapshot.formatted_bytes()
581 )?;
582 writeln!(
583 f,
584 " items : scraped: {}, processed: {}, dropped: {}",
585 snapshot.items_scraped, snapshot.items_processed, snapshot.items_dropped_by_pipeline
586 )?;
587 writeln!(
588 f,
589 " req time : avg: {}, fastest: {}, slowest: {}, total: {}",
590 snapshot.formatted_request_time(snapshot.average_request_time),
591 snapshot.formatted_request_time(snapshot.fastest_request_time),
592 snapshot.formatted_request_time(snapshot.slowest_request_time),
593 snapshot.request_time_count
594 )?;
595 writeln!(
596 f,
597 " parsing : avg: {}, fastest: {}, slowest: {}, total: {}",
598 snapshot.formatted_request_time(snapshot.average_parsing_time),
599 snapshot.formatted_request_time(snapshot.fastest_parsing_time),
600 snapshot.formatted_request_time(snapshot.slowest_parsing_time),
601 snapshot.parsing_time_count
602 )?;
603
604 let status_string = if snapshot.response_status_counts.is_empty() {
605 "none".to_string()
606 } else {
607 snapshot
608 .response_status_counts
609 .iter()
610 .map(|(code, count)| format!("{}: {}", code, count))
611 .collect::<Vec<String>>()
612 .join(", ")
613 };
614
615 writeln!(f, " status : {}\n", status_string)
616 }
617}