1use moka::sync::Cache;
44use spider_util::error::SpiderError;
45use spider_util::metrics::ExpMovingAverage;
46use std::{
47 collections::HashMap,
48 sync::{
49 Arc,
50 atomic::{AtomicUsize, Ordering},
51 },
52 time::{Duration, Instant},
53};
54
55struct StatsSnapshot {
58 requests_enqueued: usize,
59 requests_sent: usize,
60 requests_succeeded: usize,
61 requests_failed: usize,
62 requests_retried: usize,
63 requests_dropped: usize,
64 responses_received: usize,
65 responses_from_cache: usize,
66 total_bytes_downloaded: usize,
67 items_scraped: usize,
68 items_processed: usize,
69 items_dropped_by_pipeline: usize,
70 response_status_counts: HashMap<u16, usize>,
71 elapsed_duration: Duration,
72 average_request_time: Option<Duration>,
73 fastest_request_time: Option<Duration>,
74 slowest_request_time: Option<Duration>,
75 request_time_count: usize,
76 average_parsing_time: Option<Duration>,
77 fastest_parsing_time: Option<Duration>,
78 slowest_parsing_time: Option<Duration>,
79 parsing_time_count: usize,
80
81 recent_requests_per_second: f64,
83 recent_responses_per_second: f64,
84 recent_items_per_second: f64,
85}
86
87impl StatsSnapshot {
88 fn formatted_duration(&self) -> String {
89 format!("{:?}", self.elapsed_duration)
90 }
91
92 fn formatted_request_time(&self, duration: Option<Duration>) -> String {
93 match duration {
94 Some(d) => {
95 if d.as_millis() < 1000 {
96 format!("{} ms", d.as_millis())
97 } else {
98 format!("{:.2} s", d.as_secs_f64())
99 }
100 }
101 None => "N/A".to_string(),
102 }
103 }
104
105 fn requests_per_second(&self) -> f64 {
106 let elapsed = self.elapsed_duration.as_secs_f64();
107 if elapsed > 0.0 {
108 self.requests_sent as f64 / elapsed
109 } else {
110 0.0
111 }
112 }
113
114 fn responses_per_second(&self) -> f64 {
115 let elapsed = self.elapsed_duration.as_secs_f64();
116 if elapsed > 0.0 {
117 self.responses_received as f64 / elapsed
118 } else {
119 0.0
120 }
121 }
122
123 fn items_per_second(&self) -> f64 {
124 let elapsed = self.elapsed_duration.as_secs_f64();
125 if elapsed > 0.0 {
126 self.items_scraped as f64 / elapsed
127 } else {
128 0.0
129 }
130 }
131
132 fn formatted_bytes(&self) -> String {
133 const KB: usize = 1024;
134 const MB: usize = 1024 * KB;
135 const GB: usize = 1024 * MB;
136
137 if self.total_bytes_downloaded >= GB {
138 format!("{:.2} GB", self.total_bytes_downloaded as f64 / GB as f64)
139 } else if self.total_bytes_downloaded >= MB {
140 format!("{:.2} MB", self.total_bytes_downloaded as f64 / MB as f64)
141 } else if self.total_bytes_downloaded >= KB {
142 format!("{:.2} KB", self.total_bytes_downloaded as f64 / KB as f64)
143 } else {
144 format!("{} B", self.total_bytes_downloaded)
145 }
146 }
147}
148
149#[derive(Debug, serde::Serialize)]
151pub struct StatCollector {
152 #[serde(skip)]
154 pub start_time: Instant,
155
156 pub requests_enqueued: AtomicUsize,
158 pub requests_sent: AtomicUsize,
159 pub requests_succeeded: AtomicUsize,
160 pub requests_failed: AtomicUsize,
161 pub requests_retried: AtomicUsize,
162 pub requests_dropped: AtomicUsize,
163
164 pub responses_received: AtomicUsize,
166 pub responses_from_cache: AtomicUsize,
167 pub response_status_counts: Arc<dashmap::DashMap<u16, usize>>, pub total_bytes_downloaded: AtomicUsize,
169
170 pub items_scraped: AtomicUsize,
174 pub items_processed: AtomicUsize,
175 pub items_dropped_by_pipeline: AtomicUsize,
176
177 #[serde(skip)]
180 pub request_times: Cache<String, Duration>,
181 #[serde(skip)]
182 pub parsing_times: Cache<String, Duration>,
183
184 #[serde(skip)]
186 requests_sent_ema: ExpMovingAverage,
187 #[serde(skip)]
188 responses_received_ema: ExpMovingAverage,
189 #[serde(skip)]
190 items_scraped_ema: ExpMovingAverage,
191}
192
193impl StatCollector {
194 pub(crate) fn new() -> Self {
196 StatCollector {
197 start_time: Instant::now(),
198 requests_enqueued: AtomicUsize::new(0),
199 requests_sent: AtomicUsize::new(0),
200 requests_succeeded: AtomicUsize::new(0),
201 requests_failed: AtomicUsize::new(0),
202 requests_retried: AtomicUsize::new(0),
203 requests_dropped: AtomicUsize::new(0),
204 responses_received: AtomicUsize::new(0),
205 responses_from_cache: AtomicUsize::new(0),
206 response_status_counts: Arc::new(dashmap::DashMap::new()),
207 total_bytes_downloaded: AtomicUsize::new(0),
208 items_scraped: AtomicUsize::new(0),
209 items_processed: AtomicUsize::new(0),
210 items_dropped_by_pipeline: AtomicUsize::new(0),
211 request_times: Cache::builder()
214 .max_capacity(10_000)
215 .time_to_idle(Duration::from_secs(300)) .build(),
217 parsing_times: Cache::builder()
218 .max_capacity(1_000)
219 .time_to_idle(Duration::from_secs(60)) .build(),
221 requests_sent_ema: ExpMovingAverage::new(0.2),
223 responses_received_ema: ExpMovingAverage::new(0.2),
224 items_scraped_ema: ExpMovingAverage::new(0.2),
225 }
226 }
227
228 fn snapshot(&self) -> StatsSnapshot {
231 let mut status_counts: HashMap<u16, usize> = HashMap::new();
232 for entry in self.response_status_counts.iter() {
233 let (key, value) = entry.pair();
234 status_counts.insert(*key, *value);
235 }
236
237 let recent_requests_per_second = self.requests_sent_ema.get_rate();
239 let recent_responses_per_second = self.responses_received_ema.get_rate();
240 let recent_items_per_second = self.items_scraped_ema.get_rate();
241
242 StatsSnapshot {
243 requests_enqueued: self.requests_enqueued.load(Ordering::Acquire),
244 requests_sent: self.requests_sent.load(Ordering::Acquire),
245 requests_succeeded: self.requests_succeeded.load(Ordering::Acquire),
246 requests_failed: self.requests_failed.load(Ordering::Acquire),
247 requests_retried: self.requests_retried.load(Ordering::Acquire),
248 requests_dropped: self.requests_dropped.load(Ordering::Acquire),
249 responses_received: self.responses_received.load(Ordering::Acquire),
250 responses_from_cache: self.responses_from_cache.load(Ordering::Acquire),
251 total_bytes_downloaded: self.total_bytes_downloaded.load(Ordering::Acquire),
252 items_scraped: self.items_scraped.load(Ordering::Acquire),
253 items_processed: self.items_processed.load(Ordering::Acquire),
254 items_dropped_by_pipeline: self.items_dropped_by_pipeline.load(Ordering::Acquire),
255 response_status_counts: status_counts,
256 elapsed_duration: self.start_time.elapsed(),
257 average_request_time: self.average_request_time(),
258 fastest_request_time: self.fastest_request_time(),
259 slowest_request_time: self.slowest_request_time(),
260 request_time_count: self.request_time_count(),
261 average_parsing_time: self.average_parsing_time(),
262 fastest_parsing_time: self.fastest_parsing_time(),
263 slowest_parsing_time: self.slowest_parsing_time(),
264 parsing_time_count: self.parsing_time_count(),
265
266 recent_requests_per_second,
268 recent_responses_per_second,
269 recent_items_per_second,
270 }
271 }
272
273 pub(crate) fn increment_requests_enqueued(&self) {
275 self.requests_enqueued.fetch_add(1, Ordering::AcqRel);
276 }
277
278 pub(crate) fn increment_requests_sent(&self) {
280 self.requests_sent.fetch_add(1, Ordering::AcqRel);
281 self.requests_sent_ema.update(1);
283 }
284
285 pub(crate) fn increment_requests_succeeded(&self) {
287 self.requests_succeeded.fetch_add(1, Ordering::AcqRel);
288 }
289
290 pub(crate) fn increment_requests_failed(&self) {
292 self.requests_failed.fetch_add(1, Ordering::AcqRel);
293 }
294
295 pub(crate) fn increment_requests_retried(&self) {
297 self.requests_retried.fetch_add(1, Ordering::AcqRel);
298 }
299
300 pub(crate) fn increment_requests_dropped(&self) {
302 self.requests_dropped.fetch_add(1, Ordering::AcqRel);
303 }
304
305 pub(crate) fn increment_responses_received(&self) {
307 self.responses_received.fetch_add(1, Ordering::AcqRel);
308 self.responses_received_ema.update(1);
310 }
311
312 pub(crate) fn increment_responses_from_cache(&self) {
314 self.responses_from_cache.fetch_add(1, Ordering::AcqRel);
315 }
316
317 pub(crate) fn record_response_status(&self, status_code: u16) {
319 *self.response_status_counts.entry(status_code).or_insert(0) += 1;
320 }
321
322 pub(crate) fn add_bytes_downloaded(&self, bytes: usize) {
324 self.total_bytes_downloaded
325 .fetch_add(bytes, Ordering::AcqRel);
326 }
327
328 pub(crate) fn add_items_scraped(&self, count: usize) {
330 if count == 0 {
331 return;
332 }
333 self.items_scraped.fetch_add(count, Ordering::AcqRel);
334 self.items_scraped_ema.update(count);
335 }
336
337 pub(crate) fn increment_items_processed(&self) {
339 self.items_processed.fetch_add(1, Ordering::AcqRel);
340 }
341
342 pub(crate) fn increment_items_dropped_by_pipeline(&self) {
344 self.items_dropped_by_pipeline
345 .fetch_add(1, Ordering::AcqRel);
346 }
347
348 pub fn record_request_time(&self, url: &str, duration: Duration) {
350 self.request_times.insert(url.to_string(), duration);
351 }
352
353 pub fn average_request_time(&self) -> Option<Duration> {
355 let times: Vec<Duration> = self
356 .request_times
357 .iter()
358 .map(|(_key, value)| value)
359 .collect();
360 if times.is_empty() {
361 None
362 } else {
363 let total_nanos: u128 = times.iter().map(|d| d.as_nanos()).sum();
364 let avg_nanos = total_nanos / times.len() as u128;
365 Some(Duration::from_nanos(avg_nanos as u64))
366 }
367 }
368
369 pub fn fastest_request_time(&self) -> Option<Duration> {
371 self.request_times.iter().map(|(_key, value)| value).min()
372 }
373
374 pub fn slowest_request_time(&self) -> Option<Duration> {
376 self.request_times.iter().map(|(_key, value)| value).max()
377 }
378
379 pub fn request_time_count(&self) -> usize {
381 self.request_times.entry_count() as usize
382 }
383
384 pub fn get_request_time(&self, url: &str) -> Option<Duration> {
386 self.request_times.get(url)
387 }
388
389 pub fn get_all_request_times(&self) -> Vec<(String, Duration)> {
391 self.request_times
392 .iter()
393 .map(|(key, value)| (key.to_string(), value))
394 .collect()
395 }
396
397 pub fn record_parsing_time(&self, duration: Duration) {
399 let id = format!(
400 "parse_{}",
401 match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
402 Ok(duration) => duration.as_nanos(),
403 Err(err) => err.duration().as_nanos(),
404 }
405 );
406 self.parsing_times.insert(id, duration);
407 }
408
409 pub fn average_parsing_time(&self) -> Option<Duration> {
411 let times: Vec<Duration> = self
412 .parsing_times
413 .iter()
414 .map(|(_key, value)| value)
415 .collect();
416 if times.is_empty() {
417 None
418 } else {
419 let total_nanos: u128 = times.iter().map(|d| d.as_nanos()).sum();
420 let avg_nanos = total_nanos / times.len() as u128;
421 Some(Duration::from_nanos(avg_nanos as u64))
422 }
423 }
424
425 pub fn fastest_parsing_time(&self) -> Option<Duration> {
427 self.parsing_times.iter().map(|(_key, value)| value).min()
428 }
429
430 pub fn slowest_parsing_time(&self) -> Option<Duration> {
432 self.parsing_times.iter().map(|(_key, value)| value).max()
433 }
434
435 pub fn parsing_time_count(&self) -> usize {
437 self.parsing_times.entry_count() as usize
438 }
439
440 pub fn clear_request_times(&self) {
442 self.request_times.invalidate_all();
443 }
444
445 pub fn clear_parsing_times(&self) {
447 self.parsing_times.invalidate_all();
448 }
449
450 pub fn to_json_string(&self) -> Result<String, SpiderError> {
452 Ok(serde_json::to_string(self)?)
453 }
454
455 pub fn to_json_string_pretty(&self) -> Result<String, SpiderError> {
457 Ok(serde_json::to_string_pretty(self)?)
458 }
459
460 pub fn to_markdown_string(&self) -> String {
462 let snapshot = self.snapshot();
463
464 let status_codes_list: String = snapshot
465 .response_status_counts
466 .iter()
467 .map(|(code, count)| format!("- **{}**: {}", code, count))
468 .collect::<Vec<String>>()
469 .join("\n");
470 let status_codes_output = if status_codes_list.is_empty() {
471 "N/A".to_string()
472 } else {
473 status_codes_list
474 };
475
476 format!(
477 r#"# Crawl Statistics Report
478
479- **Duration**: {}
480- **Current Rate** (last 10s): {:.2} req/s, {:.2} resp/s, {:.2} item/s
481- **Overall Rate** (total): {:.2} req/s, {:.2} resp/s, {:.2} item/s
482
483## Requests
484| Metric | Count |
485|------------|-------|
486| Enqueued | {} |
487| Sent | {} |
488| Succeeded | {} |
489| Failed | {} |
490| Retried | {} |
491| Dropped | {} |
492
493## Responses
494| Metric | Count |
495|------------|-------|
496| Received | {} |
497 From Cache | {} |
498| Downloaded | {} |
499
500## Items
501| Metric | Count |
502|------------|--------|
503| Scraped | {} |
504| Processed | {} |
505| Dropped | {} |
506
507## Request Times
508| Metric | Value |
509|------------------|------------|
510| Average Time | {} |
511| Fastest Request | {} |
512| Slowest Request | {} |
513| Total Recorded | {} |
514
515## Parsing Times
516| Metric | Value |
517|------------------|------------|
518| Average Time | {} |
519| Fastest Parse | {} |
520| Slowest Parse | {} |
521| Total Recorded | {} |
522
523## Status Codes
524{}
525"#,
526 snapshot.formatted_duration(),
527 snapshot.requests_per_second(),
528 snapshot.responses_per_second(),
529 snapshot.items_per_second(),
530 {
532 let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
533 if total_seconds > 0.0 {
534 snapshot.requests_sent as f64 / total_seconds
535 } else {
536 0.0
537 }
538 },
539 {
540 let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
541 if total_seconds > 0.0 {
542 snapshot.responses_received as f64 / total_seconds
543 } else {
544 0.0
545 }
546 },
547 {
548 let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
549 if total_seconds > 0.0 {
550 snapshot.items_scraped as f64 / total_seconds
551 } else {
552 0.0
553 }
554 },
555 snapshot.requests_enqueued,
556 snapshot.requests_sent,
557 snapshot.requests_succeeded,
558 snapshot.requests_failed,
559 snapshot.requests_retried,
560 snapshot.requests_dropped,
561 snapshot.responses_received,
562 snapshot.responses_from_cache,
563 snapshot.formatted_bytes(),
564 snapshot.items_scraped,
565 snapshot.items_processed,
566 snapshot.items_dropped_by_pipeline,
567 snapshot.formatted_request_time(snapshot.average_request_time),
568 snapshot.formatted_request_time(snapshot.fastest_request_time),
569 snapshot.formatted_request_time(snapshot.slowest_request_time),
570 snapshot.request_time_count,
571 snapshot.formatted_request_time(snapshot.average_parsing_time),
572 snapshot.formatted_request_time(snapshot.fastest_parsing_time),
573 snapshot.formatted_request_time(snapshot.slowest_parsing_time),
574 snapshot.parsing_time_count,
575 status_codes_output
576 )
577 }
578
579 pub fn to_live_report_string(&self) -> String {
581 let snapshot = self.snapshot();
582 let status_string = if snapshot.response_status_counts.is_empty() {
583 "none".to_string()
584 } else {
585 snapshot
586 .response_status_counts
587 .iter()
588 .map(|(code, count)| format!("{}: {}", code, count))
589 .collect::<Vec<String>>()
590 .join(", ")
591 };
592
593 format!(
594 "Crawl Statistics\n\
595 ----------------\n\
596 duration : {}\n\
597 speed : req/s: {:.2}, resp/s: {:.2}, item/s: {:.2}\n\
598 requests : enqueued: {}, sent: {}, ok: {}, fail: {}, retry: {}, drop: {}\n\
599 response : received: {}, from_cache: {}, downloaded: {}\n\
600 items : scraped: {}, processed: {}, dropped: {}\n\
601 req time : avg: {}, fastest: {}, slowest: {}, total: {}\n\
602 parsing : avg: {}, fastest: {}, slowest: {}, total: {}\n\
603 status : {}",
604 snapshot.formatted_duration(),
605 snapshot.recent_requests_per_second,
606 snapshot.recent_responses_per_second,
607 snapshot.recent_items_per_second,
608 snapshot.requests_enqueued,
609 snapshot.requests_sent,
610 snapshot.requests_succeeded,
611 snapshot.requests_failed,
612 snapshot.requests_retried,
613 snapshot.requests_dropped,
614 snapshot.responses_received,
615 snapshot.responses_from_cache,
616 snapshot.formatted_bytes(),
617 snapshot.items_scraped,
618 snapshot.items_processed,
619 snapshot.items_dropped_by_pipeline,
620 snapshot.formatted_request_time(snapshot.average_request_time),
621 snapshot.formatted_request_time(snapshot.fastest_request_time),
622 snapshot.formatted_request_time(snapshot.slowest_request_time),
623 snapshot.request_time_count,
624 snapshot.formatted_request_time(snapshot.average_parsing_time),
625 snapshot.formatted_request_time(snapshot.fastest_parsing_time),
626 snapshot.formatted_request_time(snapshot.slowest_parsing_time),
627 snapshot.parsing_time_count,
628 status_string
629 )
630 }
631}
632
633impl Default for StatCollector {
634 fn default() -> Self {
635 Self::new()
636 }
637}
638
639impl std::fmt::Display for StatCollector {
640 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
641 write!(f, "\n{}\n", self.to_live_report_string())
642 }
643}