1use spider_util::error::SpiderError;
44use std::{
45 collections::HashMap,
46 sync::{
47 Arc,
48 atomic::{AtomicUsize, Ordering},
49 },
50 time::{Duration, Instant},
51};
52#[derive(Debug)]
54pub(crate) struct ExpMovingAverage {
55 alpha: f64, rate: Arc<parking_lot::RwLock<f64>>,
57 last_update: Arc<parking_lot::RwLock<Instant>>,
58}
59
60impl ExpMovingAverage {
61 fn new(alpha: f64) -> Self {
62 ExpMovingAverage {
63 alpha,
64 rate: Arc::new(parking_lot::RwLock::new(0.0)),
65 last_update: Arc::new(parking_lot::RwLock::new(Instant::now())),
66 }
67 }
68
69 fn update(&self, count: usize) {
70 let now = Instant::now();
71 let mut last_update = self.last_update.write();
72 let time_delta = now.duration_since(*last_update).as_secs_f64();
73 *last_update = now;
74
75 if time_delta > 0.0 {
76 let current_rate = count as f64 / time_delta;
77 let mut rate = self.rate.write();
78 *rate = self.alpha * current_rate + (1.0 - self.alpha) * (*rate);
80 }
81 }
82
83 fn get_rate(&self) -> f64 {
84 *self.rate.read()
85 }
86}
87
88struct StatsSnapshot {
91 requests_enqueued: usize,
92 requests_sent: usize,
93 requests_succeeded: usize,
94 requests_failed: usize,
95 requests_retried: usize,
96 requests_dropped: usize,
97 responses_received: usize,
98 responses_from_cache: usize,
99 total_bytes_downloaded: usize,
100 items_scraped: usize,
101 items_processed: usize,
102 items_dropped_by_pipeline: usize,
103 response_status_counts: HashMap<u16, usize>,
104 elapsed_duration: Duration,
105 average_request_time: Option<Duration>,
106 fastest_request_time: Option<Duration>,
107 slowest_request_time: Option<Duration>,
108 request_time_count: usize,
109
110 recent_requests_per_second: f64,
112 recent_responses_per_second: f64,
113 recent_items_per_second: f64,
114}
115
116impl StatsSnapshot {
117 fn formatted_duration(&self) -> String {
118 format!("{:?}", self.elapsed_duration)
119 }
120
121 fn formatted_request_time(&self, duration: Option<Duration>) -> String {
122 match duration {
123 Some(d) => {
124 if d.as_millis() < 1000 {
125 format!("{} ms", d.as_millis())
126 } else {
127 format!("{:.2} s", d.as_secs_f64())
128 }
129 }
130 None => "N/A".to_string(),
131 }
132 }
133
134 fn requests_per_second(&self) -> f64 {
135 self.recent_requests_per_second
136 }
137
138 fn responses_per_second(&self) -> f64 {
139 self.recent_responses_per_second
140 }
141
142 fn items_per_second(&self) -> f64 {
143 self.recent_items_per_second
144 }
145
146 fn formatted_bytes(&self) -> String {
147 const KB: usize = 1024;
148 const MB: usize = 1024 * KB;
149 const GB: usize = 1024 * MB;
150
151 if self.total_bytes_downloaded >= GB {
152 format!("{:.2} GB", self.total_bytes_downloaded as f64 / GB as f64)
153 } else if self.total_bytes_downloaded >= MB {
154 format!("{:.2} MB", self.total_bytes_downloaded as f64 / MB as f64)
155 } else if self.total_bytes_downloaded >= KB {
156 format!("{:.2} KB", self.total_bytes_downloaded as f64 / KB as f64)
157 } else {
158 format!("{} B", self.total_bytes_downloaded)
159 }
160 }
161}
162
163#[derive(Debug, serde::Serialize)]
165pub struct StatCollector {
166 #[serde(skip)]
168 pub start_time: Instant,
169
170 pub requests_enqueued: AtomicUsize,
172 pub requests_sent: AtomicUsize,
173 pub requests_succeeded: AtomicUsize,
174 pub requests_failed: AtomicUsize,
175 pub requests_retried: AtomicUsize,
176 pub requests_dropped: AtomicUsize,
177
178 pub responses_received: AtomicUsize,
180 pub responses_from_cache: AtomicUsize,
181 pub response_status_counts: Arc<dashmap::DashMap<u16, usize>>, pub total_bytes_downloaded: AtomicUsize,
183
184 pub items_scraped: AtomicUsize,
188 pub items_processed: AtomicUsize,
189 pub items_dropped_by_pipeline: AtomicUsize,
190
191 pub request_times: Arc<dashmap::DashMap<String, Duration>>,
193
194 #[serde(skip)]
196 requests_sent_ema: ExpMovingAverage,
197 #[serde(skip)]
198 responses_received_ema: ExpMovingAverage,
199 #[serde(skip)]
200 items_scraped_ema: ExpMovingAverage,
201
202 #[serde(skip)]
204 prev_requests_sent: AtomicUsize,
205 #[serde(skip)]
206 prev_responses_received: AtomicUsize,
207 #[serde(skip)]
208 prev_items_scraped: AtomicUsize,
209}
210
211impl StatCollector {
212 pub(crate) fn new() -> Self {
214 StatCollector {
215 start_time: Instant::now(),
216 requests_enqueued: AtomicUsize::new(0),
217 requests_sent: AtomicUsize::new(0),
218 requests_succeeded: AtomicUsize::new(0),
219 requests_failed: AtomicUsize::new(0),
220 requests_retried: AtomicUsize::new(0),
221 requests_dropped: AtomicUsize::new(0),
222 responses_received: AtomicUsize::new(0),
223 responses_from_cache: AtomicUsize::new(0),
224 response_status_counts: Arc::new(dashmap::DashMap::new()),
225 total_bytes_downloaded: AtomicUsize::new(0),
226 items_scraped: AtomicUsize::new(0),
227 items_processed: AtomicUsize::new(0),
228 items_dropped_by_pipeline: AtomicUsize::new(0),
229 request_times: Arc::new(dashmap::DashMap::new()),
230 requests_sent_ema: ExpMovingAverage::new(0.2),
232 responses_received_ema: ExpMovingAverage::new(0.2),
233 items_scraped_ema: ExpMovingAverage::new(0.2),
234
235 prev_requests_sent: AtomicUsize::new(0),
237 prev_responses_received: AtomicUsize::new(0),
238 prev_items_scraped: AtomicUsize::new(0),
239 }
240 }
241
242 fn snapshot(&self) -> StatsSnapshot {
245 let mut status_counts: HashMap<u16, usize> = HashMap::new();
246 for entry in self.response_status_counts.iter() {
247 let (key, value) = entry.pair();
248 status_counts.insert(*key, *value);
249 }
250
251 let recent_requests_per_second = self.requests_sent_ema.get_rate();
253 let recent_responses_per_second = self.responses_received_ema.get_rate();
254 let recent_items_per_second = self.items_scraped_ema.get_rate();
255
256 StatsSnapshot {
257 requests_enqueued: self.requests_enqueued.load(Ordering::SeqCst),
258 requests_sent: self.requests_sent.load(Ordering::SeqCst),
259 requests_succeeded: self.requests_succeeded.load(Ordering::SeqCst),
260 requests_failed: self.requests_failed.load(Ordering::SeqCst),
261 requests_retried: self.requests_retried.load(Ordering::SeqCst),
262 requests_dropped: self.requests_dropped.load(Ordering::SeqCst),
263 responses_received: self.responses_received.load(Ordering::SeqCst),
264 responses_from_cache: self.responses_from_cache.load(Ordering::SeqCst),
265 total_bytes_downloaded: self.total_bytes_downloaded.load(Ordering::SeqCst),
266 items_scraped: self.items_scraped.load(Ordering::SeqCst),
267 items_processed: self.items_processed.load(Ordering::SeqCst),
268 items_dropped_by_pipeline: self.items_dropped_by_pipeline.load(Ordering::SeqCst),
269 response_status_counts: status_counts,
270 elapsed_duration: self.start_time.elapsed(),
271 average_request_time: self.average_request_time(),
272 fastest_request_time: self.fastest_request_time(),
273 slowest_request_time: self.slowest_request_time(),
274 request_time_count: self.request_time_count(),
275
276 recent_requests_per_second,
278 recent_responses_per_second,
279 recent_items_per_second,
280 }
281 }
282
283 pub(crate) fn increment_requests_enqueued(&self) {
285 self.requests_enqueued.fetch_add(1, Ordering::SeqCst);
286 }
287
288 pub(crate) fn increment_requests_sent(&self) {
290 let new_count = self.requests_sent.fetch_add(1, Ordering::SeqCst) + 1;
291 let prev_count = self.prev_requests_sent.load(Ordering::SeqCst);
293 let delta = new_count - prev_count;
294 self.requests_sent_ema.update(delta);
295 self.prev_requests_sent.store(new_count, Ordering::SeqCst);
297 }
298
299 pub(crate) fn increment_requests_succeeded(&self) {
301 self.requests_succeeded.fetch_add(1, Ordering::SeqCst);
302 }
303
304 pub(crate) fn increment_requests_failed(&self) {
306 self.requests_failed.fetch_add(1, Ordering::SeqCst);
307 }
308
309 pub(crate) fn increment_requests_retried(&self) {
311 self.requests_retried.fetch_add(1, Ordering::SeqCst);
312 }
313
314 pub(crate) fn increment_requests_dropped(&self) {
316 self.requests_dropped.fetch_add(1, Ordering::SeqCst);
317 }
318
319 pub(crate) fn increment_responses_received(&self) {
321 let new_count = self.responses_received.fetch_add(1, Ordering::SeqCst) + 1;
322 let prev_count = self.prev_responses_received.load(Ordering::SeqCst);
324 let delta = new_count - prev_count;
325 self.responses_received_ema.update(delta);
326 self.prev_responses_received.store(new_count, Ordering::SeqCst);
328 }
329
330 pub(crate) fn increment_responses_from_cache(&self) {
332 self.responses_from_cache.fetch_add(1, Ordering::SeqCst);
333 }
334
335 pub(crate) fn record_response_status(&self, status_code: u16) {
337 *self.response_status_counts.entry(status_code).or_insert(0) += 1;
338 }
339
340 #[cfg(not(feature = "stream"))]
342 pub(crate) fn add_bytes_downloaded(&self, bytes: usize) {
343 self.total_bytes_downloaded
344 .fetch_add(bytes, Ordering::SeqCst);
345 }
346
347 pub(crate) fn increment_items_scraped(&self) {
349 let new_count = self.items_scraped.fetch_add(1, Ordering::SeqCst) + 1;
350 let prev_count = self.prev_items_scraped.load(Ordering::SeqCst);
352 let delta = new_count - prev_count;
353 self.items_scraped_ema.update(delta);
354 self.prev_items_scraped.store(new_count, Ordering::SeqCst);
356 }
357
358 pub(crate) fn increment_items_processed(&self) {
360 self.items_processed.fetch_add(1, Ordering::SeqCst);
361 }
362
363 pub(crate) fn increment_items_dropped_by_pipeline(&self) {
365 self.items_dropped_by_pipeline
366 .fetch_add(1, Ordering::SeqCst);
367 }
368
369 pub fn record_request_time(&self, url: &str, duration: Duration) {
371 self.request_times.insert(url.to_string(), duration);
372 }
373
374 pub fn average_request_time(&self) -> Option<Duration> {
376 let times: Vec<Duration> = self
377 .request_times
378 .iter()
379 .map(|entry| *entry.value())
380 .collect();
381 if times.is_empty() {
382 None
383 } else {
384 let total_nanos: u128 = times.iter().map(|d| d.as_nanos()).sum();
385 let avg_nanos = total_nanos / times.len() as u128;
386 Some(Duration::from_nanos(avg_nanos as u64))
387 }
388 }
389
390 pub fn fastest_request_time(&self) -> Option<Duration> {
392 self.request_times.iter().map(|entry| *entry.value()).min()
393 }
394
395 pub fn slowest_request_time(&self) -> Option<Duration> {
397 self.request_times.iter().map(|entry| *entry.value()).max()
398 }
399
400 pub fn request_time_count(&self) -> usize {
402 self.request_times.len()
403 }
404
405 pub fn get_request_time(&self, url: &str) -> Option<Duration> {
407 self.request_times
408 .get(url)
409 .map(|duration| *duration.value())
410 }
411
412 pub fn get_all_request_times(&self) -> Vec<(String, Duration)> {
414 self.request_times
415 .iter()
416 .map(|entry| (entry.key().clone(), *entry.value()))
417 .collect()
418 }
419
420 pub fn clear_request_times(&self) {
422 self.request_times.clear();
423 }
424
425 pub fn to_json_string(&self) -> Result<String, SpiderError> {
427 Ok(serde_json::to_string(self)?)
428 }
429
430 pub fn to_json_string_pretty(&self) -> Result<String, SpiderError> {
432 Ok(serde_json::to_string_pretty(self)?)
433 }
434
435 pub fn to_markdown_string(&self) -> String {
437 let snapshot = self.snapshot();
438
439 let status_codes_list: String = snapshot
440 .response_status_counts
441 .iter()
442 .map(|(code, count)| format!("- **{}**: {}", code, count))
443 .collect::<Vec<String>>()
444 .join("\n");
445 let status_codes_output = if status_codes_list.is_empty() {
446 "N/A".to_string()
447 } else {
448 status_codes_list
449 };
450
451 format!(
452 r#"# Crawl Statistics Report
453
454- **Duration**: {}
455- **Current Rate** (last 10s): {:.2} req/s, {:.2} resp/s, {:.2} item/s
456- **Overall Rate** (total): {:.2} req/s, {:.2} resp/s, {:.2} item/s
457
458## Requests
459| Metric | Count |
460|------------|-------|
461| Enqueued | {} |
462| Sent | {} |
463| Succeeded | {} |
464| Failed | {} |
465| Retried | {} |
466| Dropped | {} |
467
468## Responses
469| Metric | Count |
470|------------|-------|
471| Received | {} |
472 From Cache | {} |
473| Downloaded | {} |
474
475## Items
476| Metric | Count |
477|------------|--------|
478| Scraped | {} |
479| Processed | {} |
480| Dropped | {} |
481
482## Request Times
483| Metric | Value |
484|------------------|------------|
485| Average Time | {} |
486| Fastest Request | {} |
487| Slowest Request | {} |
488| Total Recorded | {} |
489
490## Status Codes
491{}
492"#,
493 snapshot.formatted_duration(),
494 snapshot.requests_per_second(),
495 snapshot.responses_per_second(),
496 snapshot.items_per_second(),
497 {
499 let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
500 if total_seconds > 0.0 { snapshot.requests_sent as f64 / total_seconds } else { 0.0 }
501 },
502 {
503 let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
504 if total_seconds > 0.0 { snapshot.responses_received as f64 / total_seconds } else { 0.0 }
505 },
506 {
507 let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
508 if total_seconds > 0.0 { snapshot.items_scraped as f64 / total_seconds } else { 0.0 }
509 },
510 snapshot.requests_enqueued,
511 snapshot.requests_sent,
512 snapshot.requests_succeeded,
513 snapshot.requests_failed,
514 snapshot.requests_retried,
515 snapshot.requests_dropped,
516 snapshot.responses_received,
517 snapshot.responses_from_cache,
518 snapshot.formatted_bytes(),
519 snapshot.items_scraped,
520 snapshot.items_processed,
521 snapshot.items_dropped_by_pipeline,
522 snapshot.formatted_request_time(snapshot.average_request_time),
523 snapshot.formatted_request_time(snapshot.fastest_request_time),
524 snapshot.formatted_request_time(snapshot.slowest_request_time),
525 snapshot.request_time_count,
526 status_codes_output
527 )
528 }
529}
530
531impl Default for StatCollector {
532 fn default() -> Self {
533 Self::new()
534 }
535}
536
537impl std::fmt::Display for StatCollector {
538 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
539 let snapshot = self.snapshot();
540
541 writeln!(f, "\nCrawl Statistics")?;
542 writeln!(f, "----------------")?;
543 writeln!(f, " duration : {}", snapshot.formatted_duration())?;
544 writeln!(
545 f,
546 " speed : req/s: {:.2}, resp/s: {:.2}, item/s: {:.2}",
547 snapshot.requests_per_second(),
548 snapshot.responses_per_second(),
549 snapshot.items_per_second()
550 )?;
551 writeln!(
552 f,
553 " requests : enqueued: {}, sent: {}, ok: {}, fail: {}, retry: {}, drop: {}",
554 snapshot.requests_enqueued,
555 snapshot.requests_sent,
556 snapshot.requests_succeeded,
557 snapshot.requests_failed,
558 snapshot.requests_retried,
559 snapshot.requests_dropped
560 )?;
561 writeln!(
562 f,
563 " response : received: {}, from_cache: {}, downloaded: {}",
564 snapshot.responses_received,
565 snapshot.responses_from_cache,
566 snapshot.formatted_bytes()
567 )?;
568 writeln!(
569 f,
570 " items : scraped: {}, processed: {}, dropped: {}",
571 snapshot.items_scraped, snapshot.items_processed, snapshot.items_dropped_by_pipeline
572 )?;
573 writeln!(
574 f,
575 " times : avg: {}, fastest: {}, slowest: {}, total: {}",
576 snapshot.formatted_request_time(snapshot.average_request_time),
577 snapshot.formatted_request_time(snapshot.fastest_request_time),
578 snapshot.formatted_request_time(snapshot.slowest_request_time),
579 snapshot.request_time_count
580 )?;
581
582 let status_string = if snapshot.response_status_counts.is_empty() {
583 "none".to_string()
584 } else {
585 snapshot
586 .response_status_counts
587 .iter()
588 .map(|(code, count)| format!("{}: {}", code, count))
589 .collect::<Vec<String>>()
590 .join(", ")
591 };
592
593 writeln!(f, " status : {}\n", status_string)
594 }
595}