1use parking_lot::RwLock;
23use spider_util::error::SpiderError;
24use spider_util::item::ScrapedItem;
25use spider_util::metrics::{
26 ExpMovingAverage, MetricsSnapshot, MetricsSnapshotProvider, format_plain_text_metrics,
27};
28use std::{
29 collections::HashMap,
30 sync::{
31 Arc,
32 atomic::{AtomicU64, AtomicUsize, Ordering},
33 },
34 time::{Duration, Instant},
35};
36
37struct StatsSnapshot {
40 requests_enqueued: usize,
41 requests_sent: usize,
42 requests_succeeded: usize,
43 requests_failed: usize,
44 requests_retried: usize,
45 requests_scheduled_for_retry: usize,
46 requests_dropped: usize,
47 retry_delay_in_flight_ms: u64,
48 responses_received: usize,
49 responses_from_cache: usize,
50 total_bytes_downloaded: usize,
51 items_scraped: usize,
52 items_processed: usize,
53 items_dropped_by_pipeline: usize,
54 queue_depth: usize,
55 parser_backlog: usize,
56 pipeline_backlog: usize,
57 retry_backlog: usize,
58 response_status_counts: HashMap<u16, usize>,
59 elapsed_duration: Duration,
60 average_request_time: Option<Duration>,
61 fastest_request_time: Option<Duration>,
62 slowest_request_time: Option<Duration>,
63 request_time_count: usize,
64 average_parsing_time: Option<Duration>,
65 fastest_parsing_time: Option<Duration>,
66 slowest_parsing_time: Option<Duration>,
67 parsing_time_count: usize,
68
69 recent_requests_per_second: f64,
71 recent_responses_per_second: f64,
72 recent_items_per_second: f64,
73 current_item_preview: String,
74}
75
76impl StatsSnapshot {
77 fn formatted_duration(&self) -> String {
78 let total_secs = self.elapsed_duration.as_secs();
79 let hours = total_secs / 3600;
80 let minutes = (total_secs % 3600) / 60;
81 let seconds = self.elapsed_duration.as_secs_f64() % 60.0;
82
83 if hours > 0 {
84 format!("{hours}h {minutes:02}m {seconds:05.2}s")
85 } else if minutes > 0 {
86 format!("{minutes}m {seconds:05.2}s")
87 } else if total_secs > 0 {
88 format!("{:.2}s", self.elapsed_duration.as_secs_f64())
89 } else if self.elapsed_duration.as_millis() > 0 {
90 format!("{}ms", self.elapsed_duration.as_millis())
91 } else {
92 format!("{}us", self.elapsed_duration.as_micros())
93 }
94 }
95
96 fn formatted_request_time(&self, duration: Option<Duration>) -> String {
97 match duration {
98 Some(d) => {
99 if d.as_millis() < 1000 {
100 format!("{} ms", d.as_millis())
101 } else {
102 format!("{:.2} s", d.as_secs_f64())
103 }
104 }
105 None => "N/A".to_string(),
106 }
107 }
108
109 fn requests_per_second(&self) -> f64 {
110 let elapsed = self.elapsed_duration.as_secs_f64();
111 if elapsed > 0.0 {
112 self.requests_sent as f64 / elapsed
113 } else {
114 0.0
115 }
116 }
117
118 fn responses_per_second(&self) -> f64 {
119 let elapsed = self.elapsed_duration.as_secs_f64();
120 if elapsed > 0.0 {
121 self.responses_received as f64 / elapsed
122 } else {
123 0.0
124 }
125 }
126
127 fn items_per_second(&self) -> f64 {
128 let elapsed = self.elapsed_duration.as_secs_f64();
129 if elapsed > 0.0 {
130 self.items_scraped as f64 / elapsed
131 } else {
132 0.0
133 }
134 }
135
136 fn bytes_per_second(&self) -> f64 {
137 let elapsed = self.elapsed_duration.as_secs_f64();
138 if elapsed > 0.0 {
139 self.total_bytes_downloaded as f64 / elapsed
140 } else {
141 0.0
142 }
143 }
144
145 fn formatted_bytes(&self) -> String {
146 const KB: usize = 1024;
147 const MB: usize = 1024 * KB;
148 const GB: usize = 1024 * MB;
149
150 if self.total_bytes_downloaded >= GB {
151 format!("{:.2} GB", self.total_bytes_downloaded as f64 / GB as f64)
152 } else if self.total_bytes_downloaded >= MB {
153 format!("{:.2} MB", self.total_bytes_downloaded as f64 / MB as f64)
154 } else if self.total_bytes_downloaded >= KB {
155 format!("{:.2} KB", self.total_bytes_downloaded as f64 / KB as f64)
156 } else {
157 format!("{} B", self.total_bytes_downloaded)
158 }
159 }
160
161 fn formatted_bytes_per_second(&self) -> String {
162 let bytes_per_second = self.bytes_per_second() as usize;
163 const KB: usize = 1024;
164 const MB: usize = 1024 * KB;
165 const GB: usize = 1024 * MB;
166
167 if bytes_per_second >= GB {
168 format!("{:.2} GB/s", bytes_per_second as f64 / GB as f64)
169 } else if bytes_per_second >= MB {
170 format!("{:.2} MB/s", bytes_per_second as f64 / MB as f64)
171 } else if bytes_per_second >= KB {
172 format!("{:.2} KB/s", bytes_per_second as f64 / KB as f64)
173 } else {
174 format!("{bytes_per_second} B/s")
175 }
176 }
177
178 fn pending_requests(&self) -> usize {
179 self.requests_enqueued
180 .saturating_sub(self.requests_succeeded + self.requests_failed + self.requests_dropped)
181 }
182
183 fn success_ratio(&self) -> f64 {
184 if self.requests_sent == 0 {
185 0.0
186 } else {
187 self.requests_succeeded as f64 / self.requests_sent as f64 * 100.0
188 }
189 }
190
191 fn failure_ratio(&self) -> f64 {
192 if self.requests_sent == 0 {
193 0.0
194 } else {
195 self.requests_failed as f64 / self.requests_sent as f64 * 100.0
196 }
197 }
198
199 fn cache_hit_ratio(&self) -> f64 {
200 if self.responses_received == 0 {
201 0.0
202 } else {
203 self.responses_from_cache as f64 / self.responses_received as f64 * 100.0
204 }
205 }
206}
207
208impl MetricsSnapshotProvider for StatsSnapshot {
209 fn get_requests_enqueued(&self) -> usize {
210 self.requests_enqueued
211 }
212
213 fn get_requests_sent(&self) -> usize {
214 self.requests_sent
215 }
216
217 fn get_requests_succeeded(&self) -> usize {
218 self.requests_succeeded
219 }
220
221 fn get_requests_failed(&self) -> usize {
222 self.requests_failed
223 }
224
225 fn get_requests_retried(&self) -> usize {
226 self.requests_retried
227 }
228
229 fn get_requests_scheduled_for_retry(&self) -> usize {
230 self.requests_scheduled_for_retry
231 }
232
233 fn get_requests_dropped(&self) -> usize {
234 self.requests_dropped
235 }
236
237 fn get_retry_delay_in_flight_ms(&self) -> u64 {
238 self.retry_delay_in_flight_ms
239 }
240
241 fn get_responses_received(&self) -> usize {
242 self.responses_received
243 }
244
245 fn get_responses_from_cache(&self) -> usize {
246 self.responses_from_cache
247 }
248
249 fn get_total_bytes_downloaded(&self) -> usize {
250 self.total_bytes_downloaded
251 }
252
253 fn get_items_scraped(&self) -> usize {
254 self.items_scraped
255 }
256
257 fn get_items_processed(&self) -> usize {
258 self.items_processed
259 }
260
261 fn get_items_dropped_by_pipeline(&self) -> usize {
262 self.items_dropped_by_pipeline
263 }
264
265 fn get_queue_depth(&self) -> usize {
266 self.queue_depth
267 }
268
269 fn get_parser_backlog(&self) -> usize {
270 self.parser_backlog
271 }
272
273 fn get_pipeline_backlog(&self) -> usize {
274 self.pipeline_backlog
275 }
276
277 fn get_retry_backlog(&self) -> usize {
278 self.retry_backlog
279 }
280
281 fn get_response_status_counts(&self) -> &HashMap<u16, usize> {
282 &self.response_status_counts
283 }
284
285 fn get_elapsed_duration(&self) -> Duration {
286 self.elapsed_duration
287 }
288
289 fn get_average_request_time(&self) -> Option<Duration> {
290 self.average_request_time
291 }
292
293 fn get_fastest_request_time(&self) -> Option<Duration> {
294 self.fastest_request_time
295 }
296
297 fn get_slowest_request_time(&self) -> Option<Duration> {
298 self.slowest_request_time
299 }
300
301 fn get_request_time_count(&self) -> usize {
302 self.request_time_count
303 }
304
305 fn get_average_parsing_time(&self) -> Option<Duration> {
306 self.average_parsing_time
307 }
308
309 fn get_fastest_parsing_time(&self) -> Option<Duration> {
310 self.fastest_parsing_time
311 }
312
313 fn get_slowest_parsing_time(&self) -> Option<Duration> {
314 self.slowest_parsing_time
315 }
316
317 fn get_parsing_time_count(&self) -> usize {
318 self.parsing_time_count
319 }
320
321 fn get_recent_requests_per_second(&self) -> f64 {
322 self.recent_requests_per_second
323 }
324
325 fn get_recent_responses_per_second(&self) -> f64 {
326 self.recent_responses_per_second
327 }
328
329 fn get_recent_items_per_second(&self) -> f64 {
330 self.recent_items_per_second
331 }
332
333 fn get_current_item_preview(&self) -> &str {
334 &self.current_item_preview
335 }
336
337 fn formatted_duration(&self) -> String {
338 self.formatted_duration()
339 }
340
341 fn formatted_request_time(&self, duration: Option<Duration>) -> String {
342 self.formatted_request_time(duration)
343 }
344
345 fn formatted_bytes(&self) -> String {
346 self.formatted_bytes()
347 }
348}
349
350#[derive(Debug, serde::Serialize)]
352pub struct StatCollector {
353 #[serde(skip)]
355 pub start_time: Instant,
356
357 pub requests_enqueued: AtomicUsize,
359 pub requests_sent: AtomicUsize,
360 pub requests_succeeded: AtomicUsize,
361 pub requests_failed: AtomicUsize,
362 pub requests_retried: AtomicUsize,
363 pub requests_scheduled_for_retry: AtomicUsize,
364 pub requests_dropped: AtomicUsize,
365 pub retry_delay_in_flight_ms: AtomicU64,
366
367 pub responses_received: AtomicUsize,
369 pub responses_from_cache: AtomicUsize,
370 pub response_status_counts: Arc<dashmap::DashMap<u16, usize>>, pub total_bytes_downloaded: AtomicUsize,
372
373 pub items_scraped: AtomicUsize,
377 pub items_processed: AtomicUsize,
378 pub items_dropped_by_pipeline: AtomicUsize,
379 pub queue_depth: AtomicUsize,
380 pub parser_backlog: AtomicUsize,
381 pub pipeline_backlog: AtomicUsize,
382 pub retry_backlog: AtomicUsize,
383
384 #[serde(skip)]
387 request_time_total_nanos: AtomicU64,
388 #[serde(skip)]
389 request_time_fastest_nanos: AtomicU64,
390 #[serde(skip)]
391 request_time_slowest_nanos: AtomicU64,
392 #[serde(skip)]
393 request_time_count_total: AtomicUsize,
394 #[serde(skip)]
395 parsing_time_total_nanos: AtomicU64,
396 #[serde(skip)]
397 parsing_time_fastest_nanos: AtomicU64,
398 #[serde(skip)]
399 parsing_time_slowest_nanos: AtomicU64,
400 #[serde(skip)]
401 parsing_time_count_total: AtomicUsize,
402
403 #[serde(skip)]
405 requests_sent_ema: ExpMovingAverage,
406 #[serde(skip)]
407 responses_received_ema: ExpMovingAverage,
408 #[serde(skip)]
409 items_scraped_ema: ExpMovingAverage,
410 #[serde(skip)]
411 current_item_preview: Arc<RwLock<String>>,
412 #[serde(skip)]
413 live_stats_preview_fields: Option<Vec<String>>,
414}
415
416impl StatCollector {
417 pub(crate) fn new(live_stats_preview_fields: Option<Vec<String>>) -> Self {
419 Self::build(live_stats_preview_fields)
420 }
421
422 fn build(live_stats_preview_fields: Option<Vec<String>>) -> Self {
423 StatCollector {
424 start_time: Instant::now(),
425 requests_enqueued: AtomicUsize::new(0),
426 requests_sent: AtomicUsize::new(0),
427 requests_succeeded: AtomicUsize::new(0),
428 requests_failed: AtomicUsize::new(0),
429 requests_retried: AtomicUsize::new(0),
430 requests_scheduled_for_retry: AtomicUsize::new(0),
431 requests_dropped: AtomicUsize::new(0),
432 retry_delay_in_flight_ms: AtomicU64::new(0),
433 responses_received: AtomicUsize::new(0),
434 responses_from_cache: AtomicUsize::new(0),
435 response_status_counts: Arc::new(dashmap::DashMap::new()),
436 total_bytes_downloaded: AtomicUsize::new(0),
437 items_scraped: AtomicUsize::new(0),
438 items_processed: AtomicUsize::new(0),
439 items_dropped_by_pipeline: AtomicUsize::new(0),
440 queue_depth: AtomicUsize::new(0),
441 parser_backlog: AtomicUsize::new(0),
442 pipeline_backlog: AtomicUsize::new(0),
443 retry_backlog: AtomicUsize::new(0),
444 request_time_total_nanos: AtomicU64::new(0),
445 request_time_fastest_nanos: AtomicU64::new(u64::MAX),
446 request_time_slowest_nanos: AtomicU64::new(0),
447 request_time_count_total: AtomicUsize::new(0),
448 parsing_time_total_nanos: AtomicU64::new(0),
449 parsing_time_fastest_nanos: AtomicU64::new(u64::MAX),
450 parsing_time_slowest_nanos: AtomicU64::new(0),
451 parsing_time_count_total: AtomicUsize::new(0),
452 requests_sent_ema: ExpMovingAverage::new(0.2),
454 responses_received_ema: ExpMovingAverage::new(0.2),
455 items_scraped_ema: ExpMovingAverage::new(0.2),
456 current_item_preview: Arc::new(RwLock::new("none".to_string())),
457 live_stats_preview_fields,
458 }
459 }
460
461 fn internal_snapshot(&self) -> StatsSnapshot {
464 let mut status_counts: HashMap<u16, usize> = HashMap::new();
465 for entry in self.response_status_counts.iter() {
466 let (key, value) = entry.pair();
467 status_counts.insert(*key, *value);
468 }
469
470 let recent_requests_per_second = self.requests_sent_ema.get_rate();
472 let recent_responses_per_second = self.responses_received_ema.get_rate();
473 let recent_items_per_second = self.items_scraped_ema.get_rate();
474
475 StatsSnapshot {
476 requests_enqueued: self.requests_enqueued.load(Ordering::Acquire),
477 requests_sent: self.requests_sent.load(Ordering::Acquire),
478 requests_succeeded: self.requests_succeeded.load(Ordering::Acquire),
479 requests_failed: self.requests_failed.load(Ordering::Acquire),
480 requests_retried: self.requests_retried.load(Ordering::Acquire),
481 requests_scheduled_for_retry: self.requests_scheduled_for_retry.load(Ordering::Acquire),
482 requests_dropped: self.requests_dropped.load(Ordering::Acquire),
483 retry_delay_in_flight_ms: self.retry_delay_in_flight_ms.load(Ordering::Acquire),
484 responses_received: self.responses_received.load(Ordering::Acquire),
485 responses_from_cache: self.responses_from_cache.load(Ordering::Acquire),
486 total_bytes_downloaded: self.total_bytes_downloaded.load(Ordering::Acquire),
487 items_scraped: self.items_scraped.load(Ordering::Acquire),
488 items_processed: self.items_processed.load(Ordering::Acquire),
489 items_dropped_by_pipeline: self.items_dropped_by_pipeline.load(Ordering::Acquire),
490 queue_depth: self.queue_depth.load(Ordering::Acquire),
491 parser_backlog: self.parser_backlog.load(Ordering::Acquire),
492 pipeline_backlog: self.pipeline_backlog.load(Ordering::Acquire),
493 retry_backlog: self.retry_backlog.load(Ordering::Acquire),
494 response_status_counts: status_counts,
495 elapsed_duration: self.start_time.elapsed(),
496 average_request_time: self.average_request_time(),
497 fastest_request_time: self.fastest_request_time(),
498 slowest_request_time: self.slowest_request_time(),
499 request_time_count: self.request_time_count(),
500 average_parsing_time: self.average_parsing_time(),
501 fastest_parsing_time: self.fastest_parsing_time(),
502 slowest_parsing_time: self.slowest_parsing_time(),
503 parsing_time_count: self.parsing_time_count(),
504
505 recent_requests_per_second,
507 recent_responses_per_second,
508 recent_items_per_second,
509 current_item_preview: self.current_item_preview.read().clone(),
510 }
511 }
512
513 pub fn snapshot(&self) -> MetricsSnapshot {
515 let snapshot = self.internal_snapshot();
516 MetricsSnapshot {
517 requests_enqueued: snapshot.requests_enqueued,
518 requests_sent: snapshot.requests_sent,
519 requests_succeeded: snapshot.requests_succeeded,
520 requests_failed: snapshot.requests_failed,
521 requests_retried: snapshot.requests_retried,
522 requests_scheduled_for_retry: snapshot.requests_scheduled_for_retry,
523 requests_dropped: snapshot.requests_dropped,
524 retry_delay_in_flight_ms: snapshot.retry_delay_in_flight_ms,
525 responses_received: snapshot.responses_received,
526 responses_from_cache: snapshot.responses_from_cache,
527 total_bytes_downloaded: snapshot.total_bytes_downloaded,
528 items_scraped: snapshot.items_scraped,
529 items_processed: snapshot.items_processed,
530 items_dropped_by_pipeline: snapshot.items_dropped_by_pipeline,
531 queue_depth: snapshot.queue_depth,
532 parser_backlog: snapshot.parser_backlog,
533 pipeline_backlog: snapshot.pipeline_backlog,
534 retry_backlog: snapshot.retry_backlog,
535 response_status_counts: snapshot.response_status_counts,
536 elapsed_duration: snapshot.elapsed_duration,
537 average_request_time: snapshot.average_request_time,
538 fastest_request_time: snapshot.fastest_request_time,
539 slowest_request_time: snapshot.slowest_request_time,
540 request_time_count: snapshot.request_time_count,
541 average_parsing_time: snapshot.average_parsing_time,
542 fastest_parsing_time: snapshot.fastest_parsing_time,
543 slowest_parsing_time: snapshot.slowest_parsing_time,
544 parsing_time_count: snapshot.parsing_time_count,
545 recent_requests_per_second: snapshot.recent_requests_per_second,
546 recent_responses_per_second: snapshot.recent_responses_per_second,
547 recent_items_per_second: snapshot.recent_items_per_second,
548 current_item_preview: snapshot.current_item_preview,
549 }
550 }
551
552 pub(crate) fn increment_requests_enqueued(&self) {
554 self.requests_enqueued.fetch_add(1, Ordering::AcqRel);
555 }
556
557 pub(crate) fn increment_requests_sent(&self) {
559 self.requests_sent.fetch_add(1, Ordering::AcqRel);
560 self.requests_sent_ema.update(1);
562 }
563
564 pub(crate) fn increment_requests_succeeded(&self) {
566 self.requests_succeeded.fetch_add(1, Ordering::AcqRel);
567 }
568
569 pub(crate) fn increment_requests_failed(&self) {
571 self.requests_failed.fetch_add(1, Ordering::AcqRel);
572 }
573
574 pub(crate) fn increment_requests_retried(&self) {
576 self.requests_retried.fetch_add(1, Ordering::AcqRel);
577 }
578
579 pub(crate) fn increment_requests_dropped(&self) {
581 self.requests_dropped.fetch_add(1, Ordering::AcqRel);
582 }
583
584 pub(crate) fn increment_requests_scheduled_for_retry(&self) {
586 self.requests_scheduled_for_retry
587 .fetch_add(1, Ordering::AcqRel);
588 self.retry_backlog.fetch_add(1, Ordering::AcqRel);
589 }
590
591 pub(crate) fn complete_scheduled_retry(&self) {
593 let mut current = self.retry_backlog.load(Ordering::Acquire);
594 loop {
595 let next = current.saturating_sub(1);
596 match self.retry_backlog.compare_exchange_weak(
597 current,
598 next,
599 Ordering::AcqRel,
600 Ordering::Acquire,
601 ) {
602 Ok(_) => break,
603 Err(actual) => current = actual,
604 }
605 }
606 }
607
608 pub(crate) fn add_retry_delay_in_flight(&self, delay: Duration) {
610 let millis = delay.as_millis().min(u128::from(u64::MAX)) as u64;
611 self.retry_delay_in_flight_ms
612 .fetch_add(millis, Ordering::AcqRel);
613 }
614
615 pub(crate) fn remove_retry_delay_in_flight(&self, delay: Duration) {
617 let millis = delay.as_millis().min(u128::from(u64::MAX)) as u64;
618 let mut current = self.retry_delay_in_flight_ms.load(Ordering::Acquire);
619 loop {
620 let next = current.saturating_sub(millis);
621 match self.retry_delay_in_flight_ms.compare_exchange_weak(
622 current,
623 next,
624 Ordering::AcqRel,
625 Ordering::Acquire,
626 ) {
627 Ok(_) => break,
628 Err(actual) => current = actual,
629 }
630 }
631 }
632
633 pub(crate) fn increment_responses_received(&self) {
635 self.responses_received.fetch_add(1, Ordering::AcqRel);
636 self.responses_received_ema.update(1);
638 }
639
640 pub(crate) fn increment_responses_from_cache(&self) {
642 self.responses_from_cache.fetch_add(1, Ordering::AcqRel);
643 }
644
645 pub(crate) fn record_response_status(&self, status_code: u16) {
647 *self.response_status_counts.entry(status_code).or_insert(0) += 1;
648 }
649
650 pub(crate) fn add_bytes_downloaded(&self, bytes: usize) {
652 self.total_bytes_downloaded
653 .fetch_add(bytes, Ordering::AcqRel);
654 }
655
656 pub(crate) fn add_items_scraped(&self, count: usize) {
658 if count == 0 {
659 return;
660 }
661 self.items_scraped.fetch_add(count, Ordering::AcqRel);
662 self.items_scraped_ema.update(count);
663 }
664
665 pub(crate) fn record_current_item_preview<I: ScrapedItem>(&self, item: &I) {
667 let json = item.to_json_value();
668 let preview = build_item_preview(&json, self.live_stats_preview_fields.as_deref())
669 .unwrap_or_else(|| {
670 serde_json::to_string(&json).unwrap_or_else(|_| format!("{:?}", item))
671 })
672 .replace(['\n', '\r'], " ");
673 let preview = truncate_preview(&preview, 160);
674 *self.current_item_preview.write() = preview;
675 }
676
677 pub(crate) fn increment_items_processed(&self) {
679 self.items_processed.fetch_add(1, Ordering::AcqRel);
680 }
681
682 pub(crate) fn increment_items_dropped_by_pipeline(&self) {
684 self.items_dropped_by_pipeline
685 .fetch_add(1, Ordering::AcqRel);
686 }
687
688 pub(crate) fn update_runtime_backlog(
690 &self,
691 queue_depth: usize,
692 parser_backlog: usize,
693 pipeline_backlog: usize,
694 ) {
695 self.queue_depth.store(queue_depth, Ordering::Release);
696 self.parser_backlog.store(parser_backlog, Ordering::Release);
697 self.pipeline_backlog
698 .store(pipeline_backlog, Ordering::Release);
699 }
700
701 pub fn record_request_time(&self, _url: &str, duration: Duration) {
703 let nanos = duration.as_nanos().min(u128::from(u64::MAX)) as u64;
704 self.request_time_total_nanos
705 .fetch_add(nanos, Ordering::AcqRel);
706 self.request_time_count_total.fetch_add(1, Ordering::AcqRel);
707 update_min(&self.request_time_fastest_nanos, nanos);
708 update_max(&self.request_time_slowest_nanos, nanos);
709 }
710
711 pub fn average_request_time(&self) -> Option<Duration> {
713 average_duration(
714 &self.request_time_total_nanos,
715 self.request_time_count_total.load(Ordering::Acquire),
716 )
717 }
718
719 pub fn fastest_request_time(&self) -> Option<Duration> {
721 duration_from_extreme(
722 &self.request_time_fastest_nanos,
723 self.request_time_count_total.load(Ordering::Acquire),
724 true,
725 )
726 }
727
728 pub fn slowest_request_time(&self) -> Option<Duration> {
730 duration_from_extreme(
731 &self.request_time_slowest_nanos,
732 self.request_time_count_total.load(Ordering::Acquire),
733 false,
734 )
735 }
736
737 pub fn request_time_count(&self) -> usize {
739 self.request_time_count_total.load(Ordering::Acquire)
740 }
741
742 pub fn get_request_time(&self, url: &str) -> Option<Duration> {
744 let _ = url;
745 None
746 }
747
748 pub fn get_all_request_times(&self) -> Vec<(String, Duration)> {
750 Vec::new()
751 }
752
753 pub fn record_parsing_time(&self, duration: Duration) {
755 let nanos = duration.as_nanos().min(u128::from(u64::MAX)) as u64;
756 self.parsing_time_total_nanos
757 .fetch_add(nanos, Ordering::AcqRel);
758 self.parsing_time_count_total.fetch_add(1, Ordering::AcqRel);
759 update_min(&self.parsing_time_fastest_nanos, nanos);
760 update_max(&self.parsing_time_slowest_nanos, nanos);
761 }
762
763 pub fn average_parsing_time(&self) -> Option<Duration> {
765 average_duration(
766 &self.parsing_time_total_nanos,
767 self.parsing_time_count_total.load(Ordering::Acquire),
768 )
769 }
770
771 pub fn fastest_parsing_time(&self) -> Option<Duration> {
773 duration_from_extreme(
774 &self.parsing_time_fastest_nanos,
775 self.parsing_time_count_total.load(Ordering::Acquire),
776 true,
777 )
778 }
779
780 pub fn slowest_parsing_time(&self) -> Option<Duration> {
782 duration_from_extreme(
783 &self.parsing_time_slowest_nanos,
784 self.parsing_time_count_total.load(Ordering::Acquire),
785 false,
786 )
787 }
788
789 pub fn parsing_time_count(&self) -> usize {
791 self.parsing_time_count_total.load(Ordering::Acquire)
792 }
793
794 pub fn clear_request_times(&self) {
796 self.request_time_total_nanos.store(0, Ordering::Release);
797 self.request_time_fastest_nanos
798 .store(u64::MAX, Ordering::Release);
799 self.request_time_slowest_nanos.store(0, Ordering::Release);
800 self.request_time_count_total.store(0, Ordering::Release);
801 }
802
803 pub fn clear_parsing_times(&self) {
805 self.parsing_time_total_nanos.store(0, Ordering::Release);
806 self.parsing_time_fastest_nanos
807 .store(u64::MAX, Ordering::Release);
808 self.parsing_time_slowest_nanos.store(0, Ordering::Release);
809 self.parsing_time_count_total.store(0, Ordering::Release);
810 }
811
812 pub fn to_json_string(&self) -> Result<String, SpiderError> {
814 Ok(serde_json::to_string(&self.snapshot())?)
815 }
816
817 pub fn to_json_string_pretty(&self) -> Result<String, SpiderError> {
819 Ok(serde_json::to_string_pretty(&self.snapshot())?)
820 }
821
822 pub fn to_markdown_string(&self) -> String {
824 let snapshot = self.internal_snapshot();
825
826 let status_codes_list: String = snapshot
827 .response_status_counts
828 .iter()
829 .map(|(code, count)| format!("- **{}**: {}", code, count))
830 .collect::<Vec<String>>()
831 .join("\n");
832 let status_codes_output = if status_codes_list.is_empty() {
833 "N/A".to_string()
834 } else {
835 status_codes_list
836 };
837
838 format!(
839 r#"# Crawl Statistics Report
840
841- **Duration**: {}
842- **Current Rate** (last 10s): {:.2} req/s, {:.2} resp/s, {:.2} item/s
843- **Overall Rate** (total): {:.2} req/s, {:.2} resp/s, {:.2} item/s
844- **Bytes Per Second**: {}
845- **Request Ratios**: success {:.2}%, failure {:.2}%
846- **Cache Hit Ratio**: {:.2}%
847
848## Requests
849| Metric | Count |
850|------------|-------|
851| Enqueued | {} |
852| Sent | {} |
853| Pending | {} |
854| Succeeded | {} |
855| Failed | {} |
856| Retried | {} |
857| Retry Scheduled | {} |
858| Dropped | {} |
859| Retry Delay In Flight | {} ms |
860
861## Responses
862| Metric | Count |
863|------------|-------|
864| Received | {} |
865| From Cache | {} |
866| Downloaded | {} |
867
868## Items
869| Metric | Count |
870|------------|--------|
871| Scraped | {} |
872| Processed | {} |
873| Dropped | {} |
874
875## Request Times
876| Metric | Value |
877|------------------|------------|
878| Average Time | {} |
879| Fastest Request | {} |
880| Slowest Request | {} |
881| Total Recorded | {} |
882
883## Parsing Times
884| Metric | Value |
885|------------------|------------|
886| Average Time | {} |
887| Fastest Parse | {} |
888| Slowest Parse | {} |
889| Total Recorded | {} |
890
891## Status Codes
892{}
893"#,
894 snapshot.formatted_duration(),
895 snapshot.requests_per_second(),
896 snapshot.responses_per_second(),
897 snapshot.items_per_second(),
898 {
900 let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
901 if total_seconds > 0.0 {
902 snapshot.requests_sent as f64 / total_seconds
903 } else {
904 0.0
905 }
906 },
907 {
908 let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
909 if total_seconds > 0.0 {
910 snapshot.responses_received as f64 / total_seconds
911 } else {
912 0.0
913 }
914 },
915 {
916 let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
917 if total_seconds > 0.0 {
918 snapshot.items_scraped as f64 / total_seconds
919 } else {
920 0.0
921 }
922 },
923 snapshot.formatted_bytes_per_second(),
924 snapshot.success_ratio(),
925 snapshot.failure_ratio(),
926 snapshot.cache_hit_ratio(),
927 snapshot.requests_enqueued,
928 snapshot.requests_sent,
929 snapshot.pending_requests(),
930 snapshot.requests_succeeded,
931 snapshot.requests_failed,
932 snapshot.requests_retried,
933 snapshot.requests_scheduled_for_retry,
934 snapshot.requests_dropped,
935 snapshot.retry_delay_in_flight_ms,
936 snapshot.responses_received,
937 snapshot.responses_from_cache,
938 snapshot.formatted_bytes(),
939 snapshot.items_scraped,
940 snapshot.items_processed,
941 snapshot.items_dropped_by_pipeline,
942 snapshot.formatted_request_time(snapshot.average_request_time),
943 snapshot.formatted_request_time(snapshot.fastest_request_time),
944 snapshot.formatted_request_time(snapshot.slowest_request_time),
945 snapshot.request_time_count,
946 snapshot.formatted_request_time(snapshot.average_parsing_time),
947 snapshot.formatted_request_time(snapshot.fastest_parsing_time),
948 snapshot.formatted_request_time(snapshot.slowest_parsing_time),
949 snapshot.parsing_time_count,
950 status_codes_output
951 )
952 }
953
954 pub fn to_live_report_string(&self) -> String {
956 let snapshot = self.internal_snapshot();
957 format_plain_text_metrics(&snapshot)
958 }
959}
960
961fn truncate_preview(input: &str, max_chars: usize) -> String {
962 let mut chars = input.chars();
963 let truncated: String = chars.by_ref().take(max_chars).collect();
964 if chars.next().is_some() {
965 format!("{truncated}...")
966 } else {
967 truncated
968 }
969}
970
971fn build_item_preview(json: &serde_json::Value, fields: Option<&[String]>) -> Option<String> {
972 let fields = fields?;
973
974 if fields.len() == 1 {
975 let (_, path) = parse_preview_field(&fields[0]);
976 return get_value_by_path(json, path).map(format_preview_value);
977 }
978
979 let mut preview = serde_json::Map::new();
980
981 for field in fields {
982 let (label, path) = parse_preview_field(field);
983 if let Some(value) = get_value_by_path(json, path) {
984 preview.insert(label.to_string(), value.clone());
985 }
986 }
987
988 if preview.is_empty() {
989 None
990 } else {
991 serde_json::to_string(&serde_json::Value::Object(preview)).ok()
992 }
993}
994
995fn format_preview_value(value: &serde_json::Value) -> String {
996 match value {
997 serde_json::Value::Null => "null".to_string(),
998 serde_json::Value::Bool(boolean) => boolean.to_string(),
999 serde_json::Value::Number(number) => number.to_string(),
1000 serde_json::Value::String(text) => text.clone(),
1001 serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
1002 serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
1003 }
1004 }
1005}
1006
1007fn parse_preview_field(field: &str) -> (&str, &str) {
1008 match field.split_once('=') {
1009 Some((label, path)) if !label.is_empty() && !path.is_empty() => (label, path),
1010 _ => (field, field),
1011 }
1012}
1013
1014fn get_value_by_path<'a>(
1015 value: &'a serde_json::Value,
1016 path: &str,
1017) -> Option<&'a serde_json::Value> {
1018 let mut current = value;
1019 for segment in path.split('.') {
1020 current = current.get(segment)?;
1021 }
1022 Some(current)
1023}
1024
1025impl Default for StatCollector {
1026 fn default() -> Self {
1027 Self::new(None)
1028 }
1029}
1030
1031impl std::fmt::Display for StatCollector {
1032 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1033 write!(f, "\n{}\n", self.to_live_report_string())
1034 }
1035}
1036
1037fn average_duration(total_nanos: &AtomicU64, count: usize) -> Option<Duration> {
1038 if count == 0 {
1039 return None;
1040 }
1041
1042 Some(Duration::from_nanos(
1043 total_nanos.load(Ordering::Acquire) / count as u64,
1044 ))
1045}
1046
1047fn duration_from_extreme(extreme: &AtomicU64, count: usize, is_min: bool) -> Option<Duration> {
1048 if count == 0 {
1049 return None;
1050 }
1051
1052 let nanos = extreme.load(Ordering::Acquire);
1053 if is_min && nanos == u64::MAX {
1054 None
1055 } else {
1056 Some(Duration::from_nanos(nanos))
1057 }
1058}
1059
1060fn update_min(target: &AtomicU64, candidate: u64) {
1061 let mut current = target.load(Ordering::Acquire);
1062 while candidate < current {
1063 match target.compare_exchange_weak(current, candidate, Ordering::AcqRel, Ordering::Acquire)
1064 {
1065 Ok(_) => break,
1066 Err(actual) => current = actual,
1067 }
1068 }
1069}
1070
1071fn update_max(target: &AtomicU64, candidate: u64) {
1072 let mut current = target.load(Ordering::Acquire);
1073 while candidate > current {
1074 match target.compare_exchange_weak(current, candidate, Ordering::AcqRel, Ordering::Acquire)
1075 {
1076 Ok(_) => break,
1077 Err(actual) => current = actual,
1078 }
1079 }
1080}