1use parking_lot::RwLock;
23use spider_util::error::SpiderError;
24use spider_util::item::ScrapedItem;
25use spider_util::metrics::{ExpMovingAverage, MetricsSnapshotProvider, format_plain_text_metrics};
26use std::{
27 collections::HashMap,
28 sync::{
29 Arc,
30 atomic::{AtomicU64, AtomicUsize, Ordering},
31 },
32 time::{Duration, Instant},
33};
34
35struct StatsSnapshot {
38 requests_enqueued: usize,
39 requests_sent: usize,
40 requests_succeeded: usize,
41 requests_failed: usize,
42 requests_retried: usize,
43 requests_scheduled_for_retry: usize,
44 requests_dropped: usize,
45 retry_delay_in_flight_ms: u64,
46 responses_received: usize,
47 responses_from_cache: usize,
48 total_bytes_downloaded: usize,
49 items_scraped: usize,
50 items_processed: usize,
51 items_dropped_by_pipeline: usize,
52 response_status_counts: HashMap<u16, usize>,
53 elapsed_duration: Duration,
54 average_request_time: Option<Duration>,
55 fastest_request_time: Option<Duration>,
56 slowest_request_time: Option<Duration>,
57 request_time_count: usize,
58 average_parsing_time: Option<Duration>,
59 fastest_parsing_time: Option<Duration>,
60 slowest_parsing_time: Option<Duration>,
61 parsing_time_count: usize,
62
63 recent_requests_per_second: f64,
65 recent_responses_per_second: f64,
66 recent_items_per_second: f64,
67 current_item_preview: String,
68}
69
70impl StatsSnapshot {
71 fn formatted_duration(&self) -> String {
72 let total_secs = self.elapsed_duration.as_secs();
73 let hours = total_secs / 3600;
74 let minutes = (total_secs % 3600) / 60;
75 let seconds = self.elapsed_duration.as_secs_f64() % 60.0;
76
77 if hours > 0 {
78 format!("{hours}h {minutes:02}m {seconds:05.2}s")
79 } else if minutes > 0 {
80 format!("{minutes}m {seconds:05.2}s")
81 } else if total_secs > 0 {
82 format!("{:.2}s", self.elapsed_duration.as_secs_f64())
83 } else if self.elapsed_duration.as_millis() > 0 {
84 format!("{}ms", self.elapsed_duration.as_millis())
85 } else {
86 format!("{}us", self.elapsed_duration.as_micros())
87 }
88 }
89
90 fn formatted_request_time(&self, duration: Option<Duration>) -> String {
91 match duration {
92 Some(d) => {
93 if d.as_millis() < 1000 {
94 format!("{} ms", d.as_millis())
95 } else {
96 format!("{:.2} s", d.as_secs_f64())
97 }
98 }
99 None => "N/A".to_string(),
100 }
101 }
102
103 fn requests_per_second(&self) -> f64 {
104 let elapsed = self.elapsed_duration.as_secs_f64();
105 if elapsed > 0.0 {
106 self.requests_sent as f64 / elapsed
107 } else {
108 0.0
109 }
110 }
111
112 fn responses_per_second(&self) -> f64 {
113 let elapsed = self.elapsed_duration.as_secs_f64();
114 if elapsed > 0.0 {
115 self.responses_received as f64 / elapsed
116 } else {
117 0.0
118 }
119 }
120
121 fn items_per_second(&self) -> f64 {
122 let elapsed = self.elapsed_duration.as_secs_f64();
123 if elapsed > 0.0 {
124 self.items_scraped as f64 / elapsed
125 } else {
126 0.0
127 }
128 }
129
130 fn bytes_per_second(&self) -> f64 {
131 let elapsed = self.elapsed_duration.as_secs_f64();
132 if elapsed > 0.0 {
133 self.total_bytes_downloaded as f64 / elapsed
134 } else {
135 0.0
136 }
137 }
138
139 fn formatted_bytes(&self) -> String {
140 const KB: usize = 1024;
141 const MB: usize = 1024 * KB;
142 const GB: usize = 1024 * MB;
143
144 if self.total_bytes_downloaded >= GB {
145 format!("{:.2} GB", self.total_bytes_downloaded as f64 / GB as f64)
146 } else if self.total_bytes_downloaded >= MB {
147 format!("{:.2} MB", self.total_bytes_downloaded as f64 / MB as f64)
148 } else if self.total_bytes_downloaded >= KB {
149 format!("{:.2} KB", self.total_bytes_downloaded as f64 / KB as f64)
150 } else {
151 format!("{} B", self.total_bytes_downloaded)
152 }
153 }
154
155 fn formatted_bytes_per_second(&self) -> String {
156 let bytes_per_second = self.bytes_per_second() as usize;
157 const KB: usize = 1024;
158 const MB: usize = 1024 * KB;
159 const GB: usize = 1024 * MB;
160
161 if bytes_per_second >= GB {
162 format!("{:.2} GB/s", bytes_per_second as f64 / GB as f64)
163 } else if bytes_per_second >= MB {
164 format!("{:.2} MB/s", bytes_per_second as f64 / MB as f64)
165 } else if bytes_per_second >= KB {
166 format!("{:.2} KB/s", bytes_per_second as f64 / KB as f64)
167 } else {
168 format!("{bytes_per_second} B/s")
169 }
170 }
171
172 fn pending_requests(&self) -> usize {
173 self.requests_enqueued
174 .saturating_sub(self.requests_succeeded + self.requests_failed + self.requests_dropped)
175 }
176
177 fn success_ratio(&self) -> f64 {
178 if self.requests_sent == 0 {
179 0.0
180 } else {
181 self.requests_succeeded as f64 / self.requests_sent as f64 * 100.0
182 }
183 }
184
185 fn failure_ratio(&self) -> f64 {
186 if self.requests_sent == 0 {
187 0.0
188 } else {
189 self.requests_failed as f64 / self.requests_sent as f64 * 100.0
190 }
191 }
192
193 fn cache_hit_ratio(&self) -> f64 {
194 if self.responses_received == 0 {
195 0.0
196 } else {
197 self.responses_from_cache as f64 / self.responses_received as f64 * 100.0
198 }
199 }
200}
201
202impl MetricsSnapshotProvider for StatsSnapshot {
203 fn get_requests_enqueued(&self) -> usize {
204 self.requests_enqueued
205 }
206
207 fn get_requests_sent(&self) -> usize {
208 self.requests_sent
209 }
210
211 fn get_requests_succeeded(&self) -> usize {
212 self.requests_succeeded
213 }
214
215 fn get_requests_failed(&self) -> usize {
216 self.requests_failed
217 }
218
219 fn get_requests_retried(&self) -> usize {
220 self.requests_retried
221 }
222
223 fn get_requests_scheduled_for_retry(&self) -> usize {
224 self.requests_scheduled_for_retry
225 }
226
227 fn get_requests_dropped(&self) -> usize {
228 self.requests_dropped
229 }
230
231 fn get_retry_delay_in_flight_ms(&self) -> u64 {
232 self.retry_delay_in_flight_ms
233 }
234
235 fn get_responses_received(&self) -> usize {
236 self.responses_received
237 }
238
239 fn get_responses_from_cache(&self) -> usize {
240 self.responses_from_cache
241 }
242
243 fn get_total_bytes_downloaded(&self) -> usize {
244 self.total_bytes_downloaded
245 }
246
247 fn get_items_scraped(&self) -> usize {
248 self.items_scraped
249 }
250
251 fn get_items_processed(&self) -> usize {
252 self.items_processed
253 }
254
255 fn get_items_dropped_by_pipeline(&self) -> usize {
256 self.items_dropped_by_pipeline
257 }
258
259 fn get_response_status_counts(&self) -> &HashMap<u16, usize> {
260 &self.response_status_counts
261 }
262
263 fn get_elapsed_duration(&self) -> Duration {
264 self.elapsed_duration
265 }
266
267 fn get_average_request_time(&self) -> Option<Duration> {
268 self.average_request_time
269 }
270
271 fn get_fastest_request_time(&self) -> Option<Duration> {
272 self.fastest_request_time
273 }
274
275 fn get_slowest_request_time(&self) -> Option<Duration> {
276 self.slowest_request_time
277 }
278
279 fn get_request_time_count(&self) -> usize {
280 self.request_time_count
281 }
282
283 fn get_average_parsing_time(&self) -> Option<Duration> {
284 self.average_parsing_time
285 }
286
287 fn get_fastest_parsing_time(&self) -> Option<Duration> {
288 self.fastest_parsing_time
289 }
290
291 fn get_slowest_parsing_time(&self) -> Option<Duration> {
292 self.slowest_parsing_time
293 }
294
295 fn get_parsing_time_count(&self) -> usize {
296 self.parsing_time_count
297 }
298
299 fn get_recent_requests_per_second(&self) -> f64 {
300 self.recent_requests_per_second
301 }
302
303 fn get_recent_responses_per_second(&self) -> f64 {
304 self.recent_responses_per_second
305 }
306
307 fn get_recent_items_per_second(&self) -> f64 {
308 self.recent_items_per_second
309 }
310
311 fn get_current_item_preview(&self) -> &str {
312 &self.current_item_preview
313 }
314
315 fn formatted_duration(&self) -> String {
316 self.formatted_duration()
317 }
318
319 fn formatted_request_time(&self, duration: Option<Duration>) -> String {
320 self.formatted_request_time(duration)
321 }
322
323 fn formatted_bytes(&self) -> String {
324 self.formatted_bytes()
325 }
326}
327
328#[derive(Debug, serde::Serialize)]
330pub struct StatCollector {
331 #[serde(skip)]
333 pub start_time: Instant,
334
335 pub requests_enqueued: AtomicUsize,
337 pub requests_sent: AtomicUsize,
338 pub requests_succeeded: AtomicUsize,
339 pub requests_failed: AtomicUsize,
340 pub requests_retried: AtomicUsize,
341 pub requests_scheduled_for_retry: AtomicUsize,
342 pub requests_dropped: AtomicUsize,
343 pub retry_delay_in_flight_ms: AtomicU64,
344
345 pub responses_received: AtomicUsize,
347 pub responses_from_cache: AtomicUsize,
348 pub response_status_counts: Arc<dashmap::DashMap<u16, usize>>, pub total_bytes_downloaded: AtomicUsize,
350
351 pub items_scraped: AtomicUsize,
355 pub items_processed: AtomicUsize,
356 pub items_dropped_by_pipeline: AtomicUsize,
357
358 #[serde(skip)]
361 request_time_total_nanos: AtomicU64,
362 #[serde(skip)]
363 request_time_fastest_nanos: AtomicU64,
364 #[serde(skip)]
365 request_time_slowest_nanos: AtomicU64,
366 #[serde(skip)]
367 request_time_count_total: AtomicUsize,
368 #[serde(skip)]
369 parsing_time_total_nanos: AtomicU64,
370 #[serde(skip)]
371 parsing_time_fastest_nanos: AtomicU64,
372 #[serde(skip)]
373 parsing_time_slowest_nanos: AtomicU64,
374 #[serde(skip)]
375 parsing_time_count_total: AtomicUsize,
376
377 #[serde(skip)]
379 requests_sent_ema: ExpMovingAverage,
380 #[serde(skip)]
381 responses_received_ema: ExpMovingAverage,
382 #[serde(skip)]
383 items_scraped_ema: ExpMovingAverage,
384 #[serde(skip)]
385 current_item_preview: Arc<RwLock<String>>,
386 #[serde(skip)]
387 live_stats_preview_fields: Option<Vec<String>>,
388}
389
390impl StatCollector {
391 pub(crate) fn new(live_stats_preview_fields: Option<Vec<String>>) -> Self {
393 Self::build(live_stats_preview_fields)
394 }
395
396 fn build(live_stats_preview_fields: Option<Vec<String>>) -> Self {
397 StatCollector {
398 start_time: Instant::now(),
399 requests_enqueued: AtomicUsize::new(0),
400 requests_sent: AtomicUsize::new(0),
401 requests_succeeded: AtomicUsize::new(0),
402 requests_failed: AtomicUsize::new(0),
403 requests_retried: AtomicUsize::new(0),
404 requests_scheduled_for_retry: AtomicUsize::new(0),
405 requests_dropped: AtomicUsize::new(0),
406 retry_delay_in_flight_ms: AtomicU64::new(0),
407 responses_received: AtomicUsize::new(0),
408 responses_from_cache: AtomicUsize::new(0),
409 response_status_counts: Arc::new(dashmap::DashMap::new()),
410 total_bytes_downloaded: AtomicUsize::new(0),
411 items_scraped: AtomicUsize::new(0),
412 items_processed: AtomicUsize::new(0),
413 items_dropped_by_pipeline: AtomicUsize::new(0),
414 request_time_total_nanos: AtomicU64::new(0),
415 request_time_fastest_nanos: AtomicU64::new(u64::MAX),
416 request_time_slowest_nanos: AtomicU64::new(0),
417 request_time_count_total: AtomicUsize::new(0),
418 parsing_time_total_nanos: AtomicU64::new(0),
419 parsing_time_fastest_nanos: AtomicU64::new(u64::MAX),
420 parsing_time_slowest_nanos: AtomicU64::new(0),
421 parsing_time_count_total: AtomicUsize::new(0),
422 requests_sent_ema: ExpMovingAverage::new(0.2),
424 responses_received_ema: ExpMovingAverage::new(0.2),
425 items_scraped_ema: ExpMovingAverage::new(0.2),
426 current_item_preview: Arc::new(RwLock::new("none".to_string())),
427 live_stats_preview_fields,
428 }
429 }
430
431 fn snapshot(&self) -> StatsSnapshot {
434 let mut status_counts: HashMap<u16, usize> = HashMap::new();
435 for entry in self.response_status_counts.iter() {
436 let (key, value) = entry.pair();
437 status_counts.insert(*key, *value);
438 }
439
440 let recent_requests_per_second = self.requests_sent_ema.get_rate();
442 let recent_responses_per_second = self.responses_received_ema.get_rate();
443 let recent_items_per_second = self.items_scraped_ema.get_rate();
444
445 StatsSnapshot {
446 requests_enqueued: self.requests_enqueued.load(Ordering::Acquire),
447 requests_sent: self.requests_sent.load(Ordering::Acquire),
448 requests_succeeded: self.requests_succeeded.load(Ordering::Acquire),
449 requests_failed: self.requests_failed.load(Ordering::Acquire),
450 requests_retried: self.requests_retried.load(Ordering::Acquire),
451 requests_scheduled_for_retry: self.requests_scheduled_for_retry.load(Ordering::Acquire),
452 requests_dropped: self.requests_dropped.load(Ordering::Acquire),
453 retry_delay_in_flight_ms: self.retry_delay_in_flight_ms.load(Ordering::Acquire),
454 responses_received: self.responses_received.load(Ordering::Acquire),
455 responses_from_cache: self.responses_from_cache.load(Ordering::Acquire),
456 total_bytes_downloaded: self.total_bytes_downloaded.load(Ordering::Acquire),
457 items_scraped: self.items_scraped.load(Ordering::Acquire),
458 items_processed: self.items_processed.load(Ordering::Acquire),
459 items_dropped_by_pipeline: self.items_dropped_by_pipeline.load(Ordering::Acquire),
460 response_status_counts: status_counts,
461 elapsed_duration: self.start_time.elapsed(),
462 average_request_time: self.average_request_time(),
463 fastest_request_time: self.fastest_request_time(),
464 slowest_request_time: self.slowest_request_time(),
465 request_time_count: self.request_time_count(),
466 average_parsing_time: self.average_parsing_time(),
467 fastest_parsing_time: self.fastest_parsing_time(),
468 slowest_parsing_time: self.slowest_parsing_time(),
469 parsing_time_count: self.parsing_time_count(),
470
471 recent_requests_per_second,
473 recent_responses_per_second,
474 recent_items_per_second,
475 current_item_preview: self.current_item_preview.read().clone(),
476 }
477 }
478
479 pub(crate) fn increment_requests_enqueued(&self) {
481 self.requests_enqueued.fetch_add(1, Ordering::AcqRel);
482 }
483
484 pub(crate) fn increment_requests_sent(&self) {
486 self.requests_sent.fetch_add(1, Ordering::AcqRel);
487 self.requests_sent_ema.update(1);
489 }
490
491 pub(crate) fn increment_requests_succeeded(&self) {
493 self.requests_succeeded.fetch_add(1, Ordering::AcqRel);
494 }
495
496 pub(crate) fn increment_requests_failed(&self) {
498 self.requests_failed.fetch_add(1, Ordering::AcqRel);
499 }
500
501 pub(crate) fn increment_requests_retried(&self) {
503 self.requests_retried.fetch_add(1, Ordering::AcqRel);
504 }
505
506 pub(crate) fn increment_requests_dropped(&self) {
508 self.requests_dropped.fetch_add(1, Ordering::AcqRel);
509 }
510
511 pub(crate) fn increment_requests_scheduled_for_retry(&self) {
513 self.requests_scheduled_for_retry
514 .fetch_add(1, Ordering::AcqRel);
515 }
516
517 pub(crate) fn add_retry_delay_in_flight(&self, delay: Duration) {
519 let millis = delay.as_millis().min(u128::from(u64::MAX)) as u64;
520 self.retry_delay_in_flight_ms
521 .fetch_add(millis, Ordering::AcqRel);
522 }
523
524 pub(crate) fn remove_retry_delay_in_flight(&self, delay: Duration) {
526 let millis = delay.as_millis().min(u128::from(u64::MAX)) as u64;
527 let mut current = self.retry_delay_in_flight_ms.load(Ordering::Acquire);
528 loop {
529 let next = current.saturating_sub(millis);
530 match self.retry_delay_in_flight_ms.compare_exchange_weak(
531 current,
532 next,
533 Ordering::AcqRel,
534 Ordering::Acquire,
535 ) {
536 Ok(_) => break,
537 Err(actual) => current = actual,
538 }
539 }
540 }
541
542 pub(crate) fn increment_responses_received(&self) {
544 self.responses_received.fetch_add(1, Ordering::AcqRel);
545 self.responses_received_ema.update(1);
547 }
548
549 pub(crate) fn increment_responses_from_cache(&self) {
551 self.responses_from_cache.fetch_add(1, Ordering::AcqRel);
552 }
553
554 pub(crate) fn record_response_status(&self, status_code: u16) {
556 *self.response_status_counts.entry(status_code).or_insert(0) += 1;
557 }
558
559 pub(crate) fn add_bytes_downloaded(&self, bytes: usize) {
561 self.total_bytes_downloaded
562 .fetch_add(bytes, Ordering::AcqRel);
563 }
564
565 pub(crate) fn add_items_scraped(&self, count: usize) {
567 if count == 0 {
568 return;
569 }
570 self.items_scraped.fetch_add(count, Ordering::AcqRel);
571 self.items_scraped_ema.update(count);
572 }
573
574 pub(crate) fn record_current_item_preview<I: ScrapedItem>(&self, item: &I) {
576 let json = item.to_json_value();
577 let preview = build_item_preview(&json, self.live_stats_preview_fields.as_deref())
578 .unwrap_or_else(|| {
579 serde_json::to_string(&json).unwrap_or_else(|_| format!("{:?}", item))
580 })
581 .replace(['\n', '\r'], " ");
582 let preview = truncate_preview(&preview, 160);
583 *self.current_item_preview.write() = preview;
584 }
585
586 pub(crate) fn increment_items_processed(&self) {
588 self.items_processed.fetch_add(1, Ordering::AcqRel);
589 }
590
591 pub(crate) fn increment_items_dropped_by_pipeline(&self) {
593 self.items_dropped_by_pipeline
594 .fetch_add(1, Ordering::AcqRel);
595 }
596
597 pub fn record_request_time(&self, _url: &str, duration: Duration) {
599 let nanos = duration.as_nanos().min(u128::from(u64::MAX)) as u64;
600 self.request_time_total_nanos
601 .fetch_add(nanos, Ordering::AcqRel);
602 self.request_time_count_total.fetch_add(1, Ordering::AcqRel);
603 update_min(&self.request_time_fastest_nanos, nanos);
604 update_max(&self.request_time_slowest_nanos, nanos);
605 }
606
607 pub fn average_request_time(&self) -> Option<Duration> {
609 average_duration(
610 &self.request_time_total_nanos,
611 self.request_time_count_total.load(Ordering::Acquire),
612 )
613 }
614
615 pub fn fastest_request_time(&self) -> Option<Duration> {
617 duration_from_extreme(
618 &self.request_time_fastest_nanos,
619 self.request_time_count_total.load(Ordering::Acquire),
620 true,
621 )
622 }
623
624 pub fn slowest_request_time(&self) -> Option<Duration> {
626 duration_from_extreme(
627 &self.request_time_slowest_nanos,
628 self.request_time_count_total.load(Ordering::Acquire),
629 false,
630 )
631 }
632
633 pub fn request_time_count(&self) -> usize {
635 self.request_time_count_total.load(Ordering::Acquire)
636 }
637
638 pub fn get_request_time(&self, url: &str) -> Option<Duration> {
640 let _ = url;
641 None
642 }
643
644 pub fn get_all_request_times(&self) -> Vec<(String, Duration)> {
646 Vec::new()
647 }
648
649 pub fn record_parsing_time(&self, duration: Duration) {
651 let nanos = duration.as_nanos().min(u128::from(u64::MAX)) as u64;
652 self.parsing_time_total_nanos
653 .fetch_add(nanos, Ordering::AcqRel);
654 self.parsing_time_count_total.fetch_add(1, Ordering::AcqRel);
655 update_min(&self.parsing_time_fastest_nanos, nanos);
656 update_max(&self.parsing_time_slowest_nanos, nanos);
657 }
658
659 pub fn average_parsing_time(&self) -> Option<Duration> {
661 average_duration(
662 &self.parsing_time_total_nanos,
663 self.parsing_time_count_total.load(Ordering::Acquire),
664 )
665 }
666
667 pub fn fastest_parsing_time(&self) -> Option<Duration> {
669 duration_from_extreme(
670 &self.parsing_time_fastest_nanos,
671 self.parsing_time_count_total.load(Ordering::Acquire),
672 true,
673 )
674 }
675
676 pub fn slowest_parsing_time(&self) -> Option<Duration> {
678 duration_from_extreme(
679 &self.parsing_time_slowest_nanos,
680 self.parsing_time_count_total.load(Ordering::Acquire),
681 false,
682 )
683 }
684
685 pub fn parsing_time_count(&self) -> usize {
687 self.parsing_time_count_total.load(Ordering::Acquire)
688 }
689
690 pub fn clear_request_times(&self) {
692 self.request_time_total_nanos.store(0, Ordering::Release);
693 self.request_time_fastest_nanos
694 .store(u64::MAX, Ordering::Release);
695 self.request_time_slowest_nanos.store(0, Ordering::Release);
696 self.request_time_count_total.store(0, Ordering::Release);
697 }
698
699 pub fn clear_parsing_times(&self) {
701 self.parsing_time_total_nanos.store(0, Ordering::Release);
702 self.parsing_time_fastest_nanos
703 .store(u64::MAX, Ordering::Release);
704 self.parsing_time_slowest_nanos.store(0, Ordering::Release);
705 self.parsing_time_count_total.store(0, Ordering::Release);
706 }
707
708 pub fn to_json_string(&self) -> Result<String, SpiderError> {
710 Ok(serde_json::to_string(self)?)
711 }
712
713 pub fn to_json_string_pretty(&self) -> Result<String, SpiderError> {
715 Ok(serde_json::to_string_pretty(self)?)
716 }
717
718 pub fn to_markdown_string(&self) -> String {
720 let snapshot = self.snapshot();
721
722 let status_codes_list: String = snapshot
723 .response_status_counts
724 .iter()
725 .map(|(code, count)| format!("- **{}**: {}", code, count))
726 .collect::<Vec<String>>()
727 .join("\n");
728 let status_codes_output = if status_codes_list.is_empty() {
729 "N/A".to_string()
730 } else {
731 status_codes_list
732 };
733
734 format!(
735 r#"# Crawl Statistics Report
736
737- **Duration**: {}
738- **Current Rate** (last 10s): {:.2} req/s, {:.2} resp/s, {:.2} item/s
739- **Overall Rate** (total): {:.2} req/s, {:.2} resp/s, {:.2} item/s
740- **Bytes Per Second**: {}
741- **Request Ratios**: success {:.2}%, failure {:.2}%
742- **Cache Hit Ratio**: {:.2}%
743
744## Requests
745| Metric | Count |
746|------------|-------|
747| Enqueued | {} |
748| Sent | {} |
749| Pending | {} |
750| Succeeded | {} |
751| Failed | {} |
752| Retried | {} |
753| Retry Scheduled | {} |
754| Dropped | {} |
755| Retry Delay In Flight | {} ms |
756
757## Responses
758| Metric | Count |
759|------------|-------|
760| Received | {} |
761| From Cache | {} |
762| Downloaded | {} |
763
764## Items
765| Metric | Count |
766|------------|--------|
767| Scraped | {} |
768| Processed | {} |
769| Dropped | {} |
770
771## Request Times
772| Metric | Value |
773|------------------|------------|
774| Average Time | {} |
775| Fastest Request | {} |
776| Slowest Request | {} |
777| Total Recorded | {} |
778
779## Parsing Times
780| Metric | Value |
781|------------------|------------|
782| Average Time | {} |
783| Fastest Parse | {} |
784| Slowest Parse | {} |
785| Total Recorded | {} |
786
787## Status Codes
788{}
789"#,
790 snapshot.formatted_duration(),
791 snapshot.requests_per_second(),
792 snapshot.responses_per_second(),
793 snapshot.items_per_second(),
794 {
796 let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
797 if total_seconds > 0.0 {
798 snapshot.requests_sent as f64 / total_seconds
799 } else {
800 0.0
801 }
802 },
803 {
804 let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
805 if total_seconds > 0.0 {
806 snapshot.responses_received as f64 / total_seconds
807 } else {
808 0.0
809 }
810 },
811 {
812 let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
813 if total_seconds > 0.0 {
814 snapshot.items_scraped as f64 / total_seconds
815 } else {
816 0.0
817 }
818 },
819 snapshot.formatted_bytes_per_second(),
820 snapshot.success_ratio(),
821 snapshot.failure_ratio(),
822 snapshot.cache_hit_ratio(),
823 snapshot.requests_enqueued,
824 snapshot.requests_sent,
825 snapshot.pending_requests(),
826 snapshot.requests_succeeded,
827 snapshot.requests_failed,
828 snapshot.requests_retried,
829 snapshot.requests_scheduled_for_retry,
830 snapshot.requests_dropped,
831 snapshot.retry_delay_in_flight_ms,
832 snapshot.responses_received,
833 snapshot.responses_from_cache,
834 snapshot.formatted_bytes(),
835 snapshot.items_scraped,
836 snapshot.items_processed,
837 snapshot.items_dropped_by_pipeline,
838 snapshot.formatted_request_time(snapshot.average_request_time),
839 snapshot.formatted_request_time(snapshot.fastest_request_time),
840 snapshot.formatted_request_time(snapshot.slowest_request_time),
841 snapshot.request_time_count,
842 snapshot.formatted_request_time(snapshot.average_parsing_time),
843 snapshot.formatted_request_time(snapshot.fastest_parsing_time),
844 snapshot.formatted_request_time(snapshot.slowest_parsing_time),
845 snapshot.parsing_time_count,
846 status_codes_output
847 )
848 }
849
850 pub fn to_live_report_string(&self) -> String {
852 let snapshot = self.snapshot();
853 format_plain_text_metrics(&snapshot)
854 }
855}
856
857fn truncate_preview(input: &str, max_chars: usize) -> String {
858 let mut chars = input.chars();
859 let truncated: String = chars.by_ref().take(max_chars).collect();
860 if chars.next().is_some() {
861 format!("{truncated}...")
862 } else {
863 truncated
864 }
865}
866
867fn build_item_preview(json: &serde_json::Value, fields: Option<&[String]>) -> Option<String> {
868 let fields = fields?;
869
870 if fields.len() == 1 {
871 let (_, path) = parse_preview_field(&fields[0]);
872 return get_value_by_path(json, path).map(format_preview_value);
873 }
874
875 let mut preview = serde_json::Map::new();
876
877 for field in fields {
878 let (label, path) = parse_preview_field(field);
879 if let Some(value) = get_value_by_path(json, path) {
880 preview.insert(label.to_string(), value.clone());
881 }
882 }
883
884 if preview.is_empty() {
885 None
886 } else {
887 serde_json::to_string(&serde_json::Value::Object(preview)).ok()
888 }
889}
890
891fn format_preview_value(value: &serde_json::Value) -> String {
892 match value {
893 serde_json::Value::Null => "null".to_string(),
894 serde_json::Value::Bool(boolean) => boolean.to_string(),
895 serde_json::Value::Number(number) => number.to_string(),
896 serde_json::Value::String(text) => text.clone(),
897 serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
898 serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
899 }
900 }
901}
902
903fn parse_preview_field(field: &str) -> (&str, &str) {
904 match field.split_once('=') {
905 Some((label, path)) if !label.is_empty() && !path.is_empty() => (label, path),
906 _ => (field, field),
907 }
908}
909
910fn get_value_by_path<'a>(
911 value: &'a serde_json::Value,
912 path: &str,
913) -> Option<&'a serde_json::Value> {
914 let mut current = value;
915 for segment in path.split('.') {
916 current = current.get(segment)?;
917 }
918 Some(current)
919}
920
921impl Default for StatCollector {
922 fn default() -> Self {
923 Self::new(None)
924 }
925}
926
927impl std::fmt::Display for StatCollector {
928 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
929 write!(f, "\n{}\n", self.to_live_report_string())
930 }
931}
932
933fn average_duration(total_nanos: &AtomicU64, count: usize) -> Option<Duration> {
934 if count == 0 {
935 return None;
936 }
937
938 Some(Duration::from_nanos(
939 total_nanos.load(Ordering::Acquire) / count as u64,
940 ))
941}
942
943fn duration_from_extreme(extreme: &AtomicU64, count: usize, is_min: bool) -> Option<Duration> {
944 if count == 0 {
945 return None;
946 }
947
948 let nanos = extreme.load(Ordering::Acquire);
949 if is_min && nanos == u64::MAX {
950 None
951 } else {
952 Some(Duration::from_nanos(nanos))
953 }
954}
955
956fn update_min(target: &AtomicU64, candidate: u64) {
957 let mut current = target.load(Ordering::Acquire);
958 while candidate < current {
959 match target.compare_exchange_weak(current, candidate, Ordering::AcqRel, Ordering::Acquire)
960 {
961 Ok(_) => break,
962 Err(actual) => current = actual,
963 }
964 }
965}
966
967fn update_max(target: &AtomicU64, candidate: u64) {
968 let mut current = target.load(Ordering::Acquire);
969 while candidate > current {
970 match target.compare_exchange_weak(current, candidate, Ordering::AcqRel, Ordering::Acquire)
971 {
972 Ok(_) => break,
973 Err(actual) => current = actual,
974 }
975 }
976}