1use spider_util::error::SpiderError;
44use spider_util::metrics::ExpMovingAverage;
45use std::{
46 collections::HashMap,
47 sync::{
48 Arc,
49 atomic::{AtomicUsize, Ordering},
50 },
51 time::{Duration, Instant},
52};
53
54struct StatsSnapshot {
57 requests_enqueued: usize,
58 requests_sent: usize,
59 requests_succeeded: usize,
60 requests_failed: usize,
61 requests_retried: usize,
62 requests_dropped: usize,
63 responses_received: usize,
64 responses_from_cache: usize,
65 total_bytes_downloaded: usize,
66 items_scraped: usize,
67 items_processed: usize,
68 items_dropped_by_pipeline: usize,
69 response_status_counts: HashMap<u16, usize>,
70 elapsed_duration: Duration,
71 average_request_time: Option<Duration>,
72 fastest_request_time: Option<Duration>,
73 slowest_request_time: Option<Duration>,
74 request_time_count: usize,
75 average_parsing_time: Option<Duration>,
76 fastest_parsing_time: Option<Duration>,
77 slowest_parsing_time: Option<Duration>,
78 parsing_time_count: usize,
79
80 recent_requests_per_second: f64,
82 recent_responses_per_second: f64,
83 recent_items_per_second: f64,
84}
85
86impl StatsSnapshot {
87 fn formatted_duration(&self) -> String {
88 format!("{:?}", self.elapsed_duration)
89 }
90
91 fn formatted_request_time(&self, duration: Option<Duration>) -> String {
92 match duration {
93 Some(d) => {
94 if d.as_millis() < 1000 {
95 format!("{} ms", d.as_millis())
96 } else {
97 format!("{:.2} s", d.as_secs_f64())
98 }
99 }
100 None => "N/A".to_string(),
101 }
102 }
103
104 fn requests_per_second(&self) -> f64 {
105 let elapsed = self.elapsed_duration.as_secs_f64();
106 if elapsed > 0.0 {
107 self.requests_sent as f64 / elapsed
108 } else {
109 0.0
110 }
111 }
112
113 fn responses_per_second(&self) -> f64 {
114 let elapsed = self.elapsed_duration.as_secs_f64();
115 if elapsed > 0.0 {
116 self.responses_received as f64 / elapsed
117 } else {
118 0.0
119 }
120 }
121
122 fn items_per_second(&self) -> f64 {
123 let elapsed = self.elapsed_duration.as_secs_f64();
124 if elapsed > 0.0 {
125 self.items_scraped as f64 / elapsed
126 } else {
127 0.0
128 }
129 }
130
131 fn formatted_bytes(&self) -> String {
132 const KB: usize = 1024;
133 const MB: usize = 1024 * KB;
134 const GB: usize = 1024 * MB;
135
136 if self.total_bytes_downloaded >= GB {
137 format!("{:.2} GB", self.total_bytes_downloaded as f64 / GB as f64)
138 } else if self.total_bytes_downloaded >= MB {
139 format!("{:.2} MB", self.total_bytes_downloaded as f64 / MB as f64)
140 } else if self.total_bytes_downloaded >= KB {
141 format!("{:.2} KB", self.total_bytes_downloaded as f64 / KB as f64)
142 } else {
143 format!("{} B", self.total_bytes_downloaded)
144 }
145 }
146}
147
148#[derive(Debug, serde::Serialize)]
150pub struct StatCollector {
151 #[serde(skip)]
153 pub start_time: Instant,
154
155 pub requests_enqueued: AtomicUsize,
157 pub requests_sent: AtomicUsize,
158 pub requests_succeeded: AtomicUsize,
159 pub requests_failed: AtomicUsize,
160 pub requests_retried: AtomicUsize,
161 pub requests_dropped: AtomicUsize,
162
163 pub responses_received: AtomicUsize,
165 pub responses_from_cache: AtomicUsize,
166 pub response_status_counts: Arc<dashmap::DashMap<u16, usize>>, pub total_bytes_downloaded: AtomicUsize,
168
169 pub items_scraped: AtomicUsize,
173 pub items_processed: AtomicUsize,
174 pub items_dropped_by_pipeline: AtomicUsize,
175
176 pub request_times: Arc<dashmap::DashMap<String, Duration>>,
178 pub parsing_times: Arc<dashmap::DashMap<String, Duration>>,
179
180 #[serde(skip)]
182 requests_sent_ema: ExpMovingAverage,
183 #[serde(skip)]
184 responses_received_ema: ExpMovingAverage,
185 #[serde(skip)]
186 items_scraped_ema: ExpMovingAverage,
187}
188
189impl StatCollector {
190 pub(crate) fn new() -> Self {
192 StatCollector {
193 start_time: Instant::now(),
194 requests_enqueued: AtomicUsize::new(0),
195 requests_sent: AtomicUsize::new(0),
196 requests_succeeded: AtomicUsize::new(0),
197 requests_failed: AtomicUsize::new(0),
198 requests_retried: AtomicUsize::new(0),
199 requests_dropped: AtomicUsize::new(0),
200 responses_received: AtomicUsize::new(0),
201 responses_from_cache: AtomicUsize::new(0),
202 response_status_counts: Arc::new(dashmap::DashMap::new()),
203 total_bytes_downloaded: AtomicUsize::new(0),
204 items_scraped: AtomicUsize::new(0),
205 items_processed: AtomicUsize::new(0),
206 items_dropped_by_pipeline: AtomicUsize::new(0),
207 request_times: Arc::new(dashmap::DashMap::new()),
208 parsing_times: Arc::new(dashmap::DashMap::new()),
209 requests_sent_ema: ExpMovingAverage::new(0.2),
211 responses_received_ema: ExpMovingAverage::new(0.2),
212 items_scraped_ema: ExpMovingAverage::new(0.2),
213 }
214 }
215
216 fn snapshot(&self) -> StatsSnapshot {
219 let mut status_counts: HashMap<u16, usize> = HashMap::new();
220 for entry in self.response_status_counts.iter() {
221 let (key, value) = entry.pair();
222 status_counts.insert(*key, *value);
223 }
224
225 let recent_requests_per_second = self.requests_sent_ema.get_rate();
227 let recent_responses_per_second = self.responses_received_ema.get_rate();
228 let recent_items_per_second = self.items_scraped_ema.get_rate();
229
230 StatsSnapshot {
231 requests_enqueued: self.requests_enqueued.load(Ordering::SeqCst),
232 requests_sent: self.requests_sent.load(Ordering::SeqCst),
233 requests_succeeded: self.requests_succeeded.load(Ordering::SeqCst),
234 requests_failed: self.requests_failed.load(Ordering::SeqCst),
235 requests_retried: self.requests_retried.load(Ordering::SeqCst),
236 requests_dropped: self.requests_dropped.load(Ordering::SeqCst),
237 responses_received: self.responses_received.load(Ordering::SeqCst),
238 responses_from_cache: self.responses_from_cache.load(Ordering::SeqCst),
239 total_bytes_downloaded: self.total_bytes_downloaded.load(Ordering::SeqCst),
240 items_scraped: self.items_scraped.load(Ordering::SeqCst),
241 items_processed: self.items_processed.load(Ordering::SeqCst),
242 items_dropped_by_pipeline: self.items_dropped_by_pipeline.load(Ordering::SeqCst),
243 response_status_counts: status_counts,
244 elapsed_duration: self.start_time.elapsed(),
245 average_request_time: self.average_request_time(),
246 fastest_request_time: self.fastest_request_time(),
247 slowest_request_time: self.slowest_request_time(),
248 request_time_count: self.request_time_count(),
249 average_parsing_time: self.average_parsing_time(),
250 fastest_parsing_time: self.fastest_parsing_time(),
251 slowest_parsing_time: self.slowest_parsing_time(),
252 parsing_time_count: self.parsing_time_count(),
253
254 recent_requests_per_second,
256 recent_responses_per_second,
257 recent_items_per_second,
258 }
259 }
260
261 pub(crate) fn increment_requests_enqueued(&self) {
263 self.requests_enqueued.fetch_add(1, Ordering::SeqCst);
264 }
265
266 pub(crate) fn increment_requests_sent(&self) {
268 self.requests_sent.fetch_add(1, Ordering::SeqCst);
269 self.requests_sent_ema.update(1);
271 }
272
273 pub(crate) fn increment_requests_succeeded(&self) {
275 self.requests_succeeded.fetch_add(1, Ordering::SeqCst);
276 }
277
278 pub(crate) fn increment_requests_failed(&self) {
280 self.requests_failed.fetch_add(1, Ordering::SeqCst);
281 }
282
283 pub(crate) fn increment_requests_retried(&self) {
285 self.requests_retried.fetch_add(1, Ordering::SeqCst);
286 }
287
288 pub(crate) fn increment_requests_dropped(&self) {
290 self.requests_dropped.fetch_add(1, Ordering::SeqCst);
291 }
292
293 pub(crate) fn increment_responses_received(&self) {
295 self.responses_received.fetch_add(1, Ordering::SeqCst);
296 self.responses_received_ema.update(1);
298 }
299
300 pub(crate) fn increment_responses_from_cache(&self) {
302 self.responses_from_cache.fetch_add(1, Ordering::SeqCst);
303 }
304
305 pub(crate) fn record_response_status(&self, status_code: u16) {
307 *self.response_status_counts.entry(status_code).or_insert(0) += 1;
308 }
309
310 pub(crate) fn add_bytes_downloaded(&self, bytes: usize) {
312 self.total_bytes_downloaded
313 .fetch_add(bytes, Ordering::SeqCst);
314 }
315
316 pub(crate) fn increment_items_scraped(&self) {
318 self.items_scraped.fetch_add(1, Ordering::SeqCst);
319 self.items_scraped_ema.update(1);
321 }
322
323 pub(crate) fn increment_items_processed(&self) {
325 self.items_processed.fetch_add(1, Ordering::SeqCst);
326 }
327
328 pub(crate) fn increment_items_dropped_by_pipeline(&self) {
330 self.items_dropped_by_pipeline
331 .fetch_add(1, Ordering::SeqCst);
332 }
333
334 pub fn record_request_time(&self, url: &str, duration: Duration) {
336 self.request_times.insert(url.to_string(), duration);
337 }
338
339 pub fn average_request_time(&self) -> Option<Duration> {
341 let times: Vec<Duration> = self
342 .request_times
343 .iter()
344 .map(|entry| *entry.value())
345 .collect();
346 if times.is_empty() {
347 None
348 } else {
349 let total_nanos: u128 = times.iter().map(|d| d.as_nanos()).sum();
350 let avg_nanos = total_nanos / times.len() as u128;
351 Some(Duration::from_nanos(avg_nanos as u64))
352 }
353 }
354
355 pub fn fastest_request_time(&self) -> Option<Duration> {
357 self.request_times.iter().map(|entry| *entry.value()).min()
358 }
359
360 pub fn slowest_request_time(&self) -> Option<Duration> {
362 self.request_times.iter().map(|entry| *entry.value()).max()
363 }
364
365 pub fn request_time_count(&self) -> usize {
367 self.request_times.len()
368 }
369
370 pub fn get_request_time(&self, url: &str) -> Option<Duration> {
372 self.request_times
373 .get(url)
374 .map(|duration| *duration.value())
375 }
376
377 pub fn get_all_request_times(&self) -> Vec<(String, Duration)> {
379 self.request_times
380 .iter()
381 .map(|entry| (entry.key().clone(), *entry.value()))
382 .collect()
383 }
384
385 pub fn record_parsing_time(&self, duration: Duration) {
387 let id = format!(
388 "parse_{}",
389 std::time::SystemTime::now()
390 .duration_since(std::time::UNIX_EPOCH)
391 .unwrap()
392 .as_nanos()
393 );
394 self.parsing_times.insert(id, duration);
395 }
396
397 pub fn average_parsing_time(&self) -> Option<Duration> {
399 let times: Vec<Duration> = self
400 .parsing_times
401 .iter()
402 .map(|entry| *entry.value())
403 .collect();
404 if times.is_empty() {
405 None
406 } else {
407 let total_nanos: u128 = times.iter().map(|d| d.as_nanos()).sum();
408 let avg_nanos = total_nanos / times.len() as u128;
409 Some(Duration::from_nanos(avg_nanos as u64))
410 }
411 }
412
413 pub fn fastest_parsing_time(&self) -> Option<Duration> {
415 self.parsing_times.iter().map(|entry| *entry.value()).min()
416 }
417
418 pub fn slowest_parsing_time(&self) -> Option<Duration> {
420 self.parsing_times.iter().map(|entry| *entry.value()).max()
421 }
422
423 pub fn parsing_time_count(&self) -> usize {
425 self.parsing_times.len()
426 }
427
428 pub fn clear_request_times(&self) {
430 self.request_times.clear();
431 }
432
433 pub fn to_json_string(&self) -> Result<String, SpiderError> {
435 Ok(serde_json::to_string(self)?)
436 }
437
438 pub fn to_json_string_pretty(&self) -> Result<String, SpiderError> {
440 Ok(serde_json::to_string_pretty(self)?)
441 }
442
443 pub fn to_markdown_string(&self) -> String {
445 let snapshot = self.snapshot();
446
447 let status_codes_list: String = snapshot
448 .response_status_counts
449 .iter()
450 .map(|(code, count)| format!("- **{}**: {}", code, count))
451 .collect::<Vec<String>>()
452 .join("\n");
453 let status_codes_output = if status_codes_list.is_empty() {
454 "N/A".to_string()
455 } else {
456 status_codes_list
457 };
458
459 format!(
460 r#"# Crawl Statistics Report
461
462- **Duration**: {}
463- **Current Rate** (last 10s): {:.2} req/s, {:.2} resp/s, {:.2} item/s
464- **Overall Rate** (total): {:.2} req/s, {:.2} resp/s, {:.2} item/s
465
466## Requests
467| Metric | Count |
468|------------|-------|
469| Enqueued | {} |
470| Sent | {} |
471| Succeeded | {} |
472| Failed | {} |
473| Retried | {} |
474| Dropped | {} |
475
476## Responses
477| Metric | Count |
478|------------|-------|
479| Received | {} |
480 From Cache | {} |
481| Downloaded | {} |
482
483## Items
484| Metric | Count |
485|------------|--------|
486| Scraped | {} |
487| Processed | {} |
488| Dropped | {} |
489
490## Request Times
491| Metric | Value |
492|------------------|------------|
493| Average Time | {} |
494| Fastest Request | {} |
495| Slowest Request | {} |
496| Total Recorded | {} |
497
498## Parsing Times
499| Metric | Value |
500|------------------|------------|
501| Average Time | {} |
502| Fastest Parse | {} |
503| Slowest Parse | {} |
504| Total Recorded | {} |
505
506## Status Codes
507{}
508"#,
509 snapshot.formatted_duration(),
510 snapshot.requests_per_second(),
511 snapshot.responses_per_second(),
512 snapshot.items_per_second(),
513 {
515 let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
516 if total_seconds > 0.0 {
517 snapshot.requests_sent as f64 / total_seconds
518 } else {
519 0.0
520 }
521 },
522 {
523 let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
524 if total_seconds > 0.0 {
525 snapshot.responses_received as f64 / total_seconds
526 } else {
527 0.0
528 }
529 },
530 {
531 let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
532 if total_seconds > 0.0 {
533 snapshot.items_scraped as f64 / total_seconds
534 } else {
535 0.0
536 }
537 },
538 snapshot.requests_enqueued,
539 snapshot.requests_sent,
540 snapshot.requests_succeeded,
541 snapshot.requests_failed,
542 snapshot.requests_retried,
543 snapshot.requests_dropped,
544 snapshot.responses_received,
545 snapshot.responses_from_cache,
546 snapshot.formatted_bytes(),
547 snapshot.items_scraped,
548 snapshot.items_processed,
549 snapshot.items_dropped_by_pipeline,
550 snapshot.formatted_request_time(snapshot.average_request_time),
551 snapshot.formatted_request_time(snapshot.fastest_request_time),
552 snapshot.formatted_request_time(snapshot.slowest_request_time),
553 snapshot.request_time_count,
554 snapshot.formatted_request_time(snapshot.average_parsing_time),
555 snapshot.formatted_request_time(snapshot.fastest_parsing_time),
556 snapshot.formatted_request_time(snapshot.slowest_parsing_time),
557 snapshot.parsing_time_count,
558 status_codes_output
559 )
560 }
561}
562
563impl Default for StatCollector {
564 fn default() -> Self {
565 Self::new()
566 }
567}
568
569impl std::fmt::Display for StatCollector {
570 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
571 let snapshot = self.snapshot();
572
573 writeln!(f, "\nCrawl Statistics")?;
574 writeln!(f, "----------------")?;
575 writeln!(f, "duration : {}", snapshot.formatted_duration())?;
576 writeln!(
577 f,
578 "speed : req/s: {:.2}, resp/s: {:.2}, item/s: {:.2}",
579 snapshot.recent_requests_per_second,
580 snapshot.recent_responses_per_second,
581 snapshot.recent_items_per_second
582 )?;
583 writeln!(
584 f,
585 "requests : enqueued: {}, sent: {}, ok: {}, fail: {}, retry: {}, drop: {}",
586 snapshot.requests_enqueued,
587 snapshot.requests_sent,
588 snapshot.requests_succeeded,
589 snapshot.requests_failed,
590 snapshot.requests_retried,
591 snapshot.requests_dropped
592 )?;
593 writeln!(
594 f,
595 "response : received: {}, from_cache: {}, downloaded: {}",
596 snapshot.responses_received,
597 snapshot.responses_from_cache,
598 snapshot.formatted_bytes()
599 )?;
600 writeln!(
601 f,
602 "items : scraped: {}, processed: {}, dropped: {}",
603 snapshot.items_scraped, snapshot.items_processed, snapshot.items_dropped_by_pipeline
604 )?;
605 writeln!(
606 f,
607 "req time : avg: {}, fastest: {}, slowest: {}, total: {}",
608 snapshot.formatted_request_time(snapshot.average_request_time),
609 snapshot.formatted_request_time(snapshot.fastest_request_time),
610 snapshot.formatted_request_time(snapshot.slowest_request_time),
611 snapshot.request_time_count
612 )?;
613 writeln!(
614 f,
615 "parsing : avg: {}, fastest: {}, slowest: {}, total: {}",
616 snapshot.formatted_request_time(snapshot.average_parsing_time),
617 snapshot.formatted_request_time(snapshot.fastest_parsing_time),
618 snapshot.formatted_request_time(snapshot.slowest_parsing_time),
619 snapshot.parsing_time_count
620 )?;
621
622 let status_string = if snapshot.response_status_counts.is_empty() {
623 "none".to_string()
624 } else {
625 snapshot
626 .response_status_counts
627 .iter()
628 .map(|(code, count)| format!("{}: {}", code, count))
629 .collect::<Vec<String>>()
630 .join(", ")
631 };
632
633 writeln!(f, "status : {}\n", status_string)
634 }
635}