Skip to main content

spider_core/
stats.rs

1//! Runtime statistics and reporting helpers.
2//!
3//! [`StatCollector`] records request counts, response status codes, cache hits,
4//! timings, bandwidth, and item throughput while a crawl is running.
5//!
6//! ## Example
7//!
8//! ```rust,ignore
9//! use spider_core::StatCollector;
10//!
11//! let stats = StatCollector::new();
12//!
13//! // During crawling, metrics are automatically updated
14//! stats.increment_requests_sent();
15//! stats.increment_items_scraped();
16//!
17//! // Export statistics in various formats
18//! println!("{}", stats.to_json_string_pretty().unwrap());
19//! println!("{}", stats.to_markdown_string());
20//! ```
21
22use parking_lot::RwLock;
23use spider_util::error::SpiderError;
24use spider_util::item::ScrapedItem;
25use spider_util::metrics::{ExpMovingAverage, MetricsSnapshotProvider, format_plain_text_metrics};
26use std::{
27    collections::HashMap,
28    sync::{
29        Arc,
30        atomic::{AtomicU64, AtomicUsize, Ordering},
31    },
32    time::{Duration, Instant},
33};
34
35// A snapshot of the current statistics, used for reporting.
36// This avoids code duplication in the various export/display methods.
37struct StatsSnapshot {
38    requests_enqueued: usize,
39    requests_sent: usize,
40    requests_succeeded: usize,
41    requests_failed: usize,
42    requests_retried: usize,
43    requests_scheduled_for_retry: usize,
44    requests_dropped: usize,
45    retry_delay_in_flight_ms: u64,
46    responses_received: usize,
47    responses_from_cache: usize,
48    total_bytes_downloaded: usize,
49    items_scraped: usize,
50    items_processed: usize,
51    items_dropped_by_pipeline: usize,
52    response_status_counts: HashMap<u16, usize>,
53    elapsed_duration: Duration,
54    average_request_time: Option<Duration>,
55    fastest_request_time: Option<Duration>,
56    slowest_request_time: Option<Duration>,
57    request_time_count: usize,
58    average_parsing_time: Option<Duration>,
59    fastest_parsing_time: Option<Duration>,
60    slowest_parsing_time: Option<Duration>,
61    parsing_time_count: usize,
62
63    // Recent rates from sliding windows
64    recent_requests_per_second: f64,
65    recent_responses_per_second: f64,
66    recent_items_per_second: f64,
67    current_item_preview: String,
68}
69
70impl StatsSnapshot {
71    fn formatted_duration(&self) -> String {
72        let total_secs = self.elapsed_duration.as_secs();
73        let hours = total_secs / 3600;
74        let minutes = (total_secs % 3600) / 60;
75        let seconds = self.elapsed_duration.as_secs_f64() % 60.0;
76
77        if hours > 0 {
78            format!("{hours}h {minutes:02}m {seconds:05.2}s")
79        } else if minutes > 0 {
80            format!("{minutes}m {seconds:05.2}s")
81        } else if total_secs > 0 {
82            format!("{:.2}s", self.elapsed_duration.as_secs_f64())
83        } else if self.elapsed_duration.as_millis() > 0 {
84            format!("{}ms", self.elapsed_duration.as_millis())
85        } else {
86            format!("{}us", self.elapsed_duration.as_micros())
87        }
88    }
89
90    fn formatted_request_time(&self, duration: Option<Duration>) -> String {
91        match duration {
92            Some(d) => {
93                if d.as_millis() < 1000 {
94                    format!("{} ms", d.as_millis())
95                } else {
96                    format!("{:.2} s", d.as_secs_f64())
97                }
98            }
99            None => "N/A".to_string(),
100        }
101    }
102
103    fn requests_per_second(&self) -> f64 {
104        let elapsed = self.elapsed_duration.as_secs_f64();
105        if elapsed > 0.0 {
106            self.requests_sent as f64 / elapsed
107        } else {
108            0.0
109        }
110    }
111
112    fn responses_per_second(&self) -> f64 {
113        let elapsed = self.elapsed_duration.as_secs_f64();
114        if elapsed > 0.0 {
115            self.responses_received as f64 / elapsed
116        } else {
117            0.0
118        }
119    }
120
121    fn items_per_second(&self) -> f64 {
122        let elapsed = self.elapsed_duration.as_secs_f64();
123        if elapsed > 0.0 {
124            self.items_scraped as f64 / elapsed
125        } else {
126            0.0
127        }
128    }
129
130    fn bytes_per_second(&self) -> f64 {
131        let elapsed = self.elapsed_duration.as_secs_f64();
132        if elapsed > 0.0 {
133            self.total_bytes_downloaded as f64 / elapsed
134        } else {
135            0.0
136        }
137    }
138
139    fn formatted_bytes(&self) -> String {
140        const KB: usize = 1024;
141        const MB: usize = 1024 * KB;
142        const GB: usize = 1024 * MB;
143
144        if self.total_bytes_downloaded >= GB {
145            format!("{:.2} GB", self.total_bytes_downloaded as f64 / GB as f64)
146        } else if self.total_bytes_downloaded >= MB {
147            format!("{:.2} MB", self.total_bytes_downloaded as f64 / MB as f64)
148        } else if self.total_bytes_downloaded >= KB {
149            format!("{:.2} KB", self.total_bytes_downloaded as f64 / KB as f64)
150        } else {
151            format!("{} B", self.total_bytes_downloaded)
152        }
153    }
154
155    fn formatted_bytes_per_second(&self) -> String {
156        let bytes_per_second = self.bytes_per_second() as usize;
157        const KB: usize = 1024;
158        const MB: usize = 1024 * KB;
159        const GB: usize = 1024 * MB;
160
161        if bytes_per_second >= GB {
162            format!("{:.2} GB/s", bytes_per_second as f64 / GB as f64)
163        } else if bytes_per_second >= MB {
164            format!("{:.2} MB/s", bytes_per_second as f64 / MB as f64)
165        } else if bytes_per_second >= KB {
166            format!("{:.2} KB/s", bytes_per_second as f64 / KB as f64)
167        } else {
168            format!("{bytes_per_second} B/s")
169        }
170    }
171
172    fn pending_requests(&self) -> usize {
173        self.requests_enqueued
174            .saturating_sub(self.requests_succeeded + self.requests_failed + self.requests_dropped)
175    }
176
177    fn success_ratio(&self) -> f64 {
178        if self.requests_sent == 0 {
179            0.0
180        } else {
181            self.requests_succeeded as f64 / self.requests_sent as f64 * 100.0
182        }
183    }
184
185    fn failure_ratio(&self) -> f64 {
186        if self.requests_sent == 0 {
187            0.0
188        } else {
189            self.requests_failed as f64 / self.requests_sent as f64 * 100.0
190        }
191    }
192
193    fn cache_hit_ratio(&self) -> f64 {
194        if self.responses_received == 0 {
195            0.0
196        } else {
197            self.responses_from_cache as f64 / self.responses_received as f64 * 100.0
198        }
199    }
200}
201
202impl MetricsSnapshotProvider for StatsSnapshot {
203    fn get_requests_enqueued(&self) -> usize {
204        self.requests_enqueued
205    }
206
207    fn get_requests_sent(&self) -> usize {
208        self.requests_sent
209    }
210
211    fn get_requests_succeeded(&self) -> usize {
212        self.requests_succeeded
213    }
214
215    fn get_requests_failed(&self) -> usize {
216        self.requests_failed
217    }
218
219    fn get_requests_retried(&self) -> usize {
220        self.requests_retried
221    }
222
223    fn get_requests_scheduled_for_retry(&self) -> usize {
224        self.requests_scheduled_for_retry
225    }
226
227    fn get_requests_dropped(&self) -> usize {
228        self.requests_dropped
229    }
230
231    fn get_retry_delay_in_flight_ms(&self) -> u64 {
232        self.retry_delay_in_flight_ms
233    }
234
235    fn get_responses_received(&self) -> usize {
236        self.responses_received
237    }
238
239    fn get_responses_from_cache(&self) -> usize {
240        self.responses_from_cache
241    }
242
243    fn get_total_bytes_downloaded(&self) -> usize {
244        self.total_bytes_downloaded
245    }
246
247    fn get_items_scraped(&self) -> usize {
248        self.items_scraped
249    }
250
251    fn get_items_processed(&self) -> usize {
252        self.items_processed
253    }
254
255    fn get_items_dropped_by_pipeline(&self) -> usize {
256        self.items_dropped_by_pipeline
257    }
258
259    fn get_response_status_counts(&self) -> &HashMap<u16, usize> {
260        &self.response_status_counts
261    }
262
263    fn get_elapsed_duration(&self) -> Duration {
264        self.elapsed_duration
265    }
266
267    fn get_average_request_time(&self) -> Option<Duration> {
268        self.average_request_time
269    }
270
271    fn get_fastest_request_time(&self) -> Option<Duration> {
272        self.fastest_request_time
273    }
274
275    fn get_slowest_request_time(&self) -> Option<Duration> {
276        self.slowest_request_time
277    }
278
279    fn get_request_time_count(&self) -> usize {
280        self.request_time_count
281    }
282
283    fn get_average_parsing_time(&self) -> Option<Duration> {
284        self.average_parsing_time
285    }
286
287    fn get_fastest_parsing_time(&self) -> Option<Duration> {
288        self.fastest_parsing_time
289    }
290
291    fn get_slowest_parsing_time(&self) -> Option<Duration> {
292        self.slowest_parsing_time
293    }
294
295    fn get_parsing_time_count(&self) -> usize {
296        self.parsing_time_count
297    }
298
299    fn get_recent_requests_per_second(&self) -> f64 {
300        self.recent_requests_per_second
301    }
302
303    fn get_recent_responses_per_second(&self) -> f64 {
304        self.recent_responses_per_second
305    }
306
307    fn get_recent_items_per_second(&self) -> f64 {
308        self.recent_items_per_second
309    }
310
311    fn get_current_item_preview(&self) -> &str {
312        &self.current_item_preview
313    }
314
315    fn formatted_duration(&self) -> String {
316        self.formatted_duration()
317    }
318
319    fn formatted_request_time(&self, duration: Option<Duration>) -> String {
320        self.formatted_request_time(duration)
321    }
322
323    fn formatted_bytes(&self) -> String {
324        self.formatted_bytes()
325    }
326}
327
328/// Collects and stores various statistics about the crawler's operation.
329#[derive(Debug, serde::Serialize)]
330pub struct StatCollector {
331    // Crawl-related metrics
332    #[serde(skip)]
333    pub start_time: Instant,
334
335    // Request-related metrics
336    pub requests_enqueued: AtomicUsize,
337    pub requests_sent: AtomicUsize,
338    pub requests_succeeded: AtomicUsize,
339    pub requests_failed: AtomicUsize,
340    pub requests_retried: AtomicUsize,
341    pub requests_scheduled_for_retry: AtomicUsize,
342    pub requests_dropped: AtomicUsize,
343    pub retry_delay_in_flight_ms: AtomicU64,
344
345    // Response-related metrics
346    pub responses_received: AtomicUsize,
347    pub responses_from_cache: AtomicUsize,
348    pub response_status_counts: Arc<dashmap::DashMap<u16, usize>>, // e.g., 200, 404, 500
349    pub total_bytes_downloaded: AtomicUsize,
350
351    // Add more advanced response time metrics if needed (e.g., histograms)
352
353    // Item-related metrics
354    pub items_scraped: AtomicUsize,
355    pub items_processed: AtomicUsize,
356    pub items_dropped_by_pipeline: AtomicUsize,
357
358    // Timing metrics - Using bounded LRU caches to prevent memory leaks
359    // Only keeps recent entries (max 10,000 for requests, 1,000 for parsing)
360    #[serde(skip)]
361    request_time_total_nanos: AtomicU64,
362    #[serde(skip)]
363    request_time_fastest_nanos: AtomicU64,
364    #[serde(skip)]
365    request_time_slowest_nanos: AtomicU64,
366    #[serde(skip)]
367    request_time_count_total: AtomicUsize,
368    #[serde(skip)]
369    parsing_time_total_nanos: AtomicU64,
370    #[serde(skip)]
371    parsing_time_fastest_nanos: AtomicU64,
372    #[serde(skip)]
373    parsing_time_slowest_nanos: AtomicU64,
374    #[serde(skip)]
375    parsing_time_count_total: AtomicUsize,
376
377    // Exponential moving average metrics for accurate speed calculations
378    #[serde(skip)]
379    requests_sent_ema: ExpMovingAverage,
380    #[serde(skip)]
381    responses_received_ema: ExpMovingAverage,
382    #[serde(skip)]
383    items_scraped_ema: ExpMovingAverage,
384    #[serde(skip)]
385    current_item_preview: Arc<RwLock<String>>,
386    #[serde(skip)]
387    live_stats_preview_fields: Option<Vec<String>>,
388}
389
390impl StatCollector {
391    /// Creates a new `StatCollector` with all counters initialized to zero.
392    pub(crate) fn new(live_stats_preview_fields: Option<Vec<String>>) -> Self {
393        Self::build(live_stats_preview_fields)
394    }
395
396    fn build(live_stats_preview_fields: Option<Vec<String>>) -> Self {
397        StatCollector {
398            start_time: Instant::now(),
399            requests_enqueued: AtomicUsize::new(0),
400            requests_sent: AtomicUsize::new(0),
401            requests_succeeded: AtomicUsize::new(0),
402            requests_failed: AtomicUsize::new(0),
403            requests_retried: AtomicUsize::new(0),
404            requests_scheduled_for_retry: AtomicUsize::new(0),
405            requests_dropped: AtomicUsize::new(0),
406            retry_delay_in_flight_ms: AtomicU64::new(0),
407            responses_received: AtomicUsize::new(0),
408            responses_from_cache: AtomicUsize::new(0),
409            response_status_counts: Arc::new(dashmap::DashMap::new()),
410            total_bytes_downloaded: AtomicUsize::new(0),
411            items_scraped: AtomicUsize::new(0),
412            items_processed: AtomicUsize::new(0),
413            items_dropped_by_pipeline: AtomicUsize::new(0),
414            request_time_total_nanos: AtomicU64::new(0),
415            request_time_fastest_nanos: AtomicU64::new(u64::MAX),
416            request_time_slowest_nanos: AtomicU64::new(0),
417            request_time_count_total: AtomicUsize::new(0),
418            parsing_time_total_nanos: AtomicU64::new(0),
419            parsing_time_fastest_nanos: AtomicU64::new(u64::MAX),
420            parsing_time_slowest_nanos: AtomicU64::new(0),
421            parsing_time_count_total: AtomicUsize::new(0),
422            // Initialize exponential moving averages for recent speed calculations (alpha = 0.2 for good balance)
423            requests_sent_ema: ExpMovingAverage::new(0.2),
424            responses_received_ema: ExpMovingAverage::new(0.2),
425            items_scraped_ema: ExpMovingAverage::new(0.2),
426            current_item_preview: Arc::new(RwLock::new("none".to_string())),
427            live_stats_preview_fields,
428        }
429    }
430
431    /// Creates a snapshot of the current statistics.
432    /// This is the single source of truth for all presentation logic.
433    fn snapshot(&self) -> StatsSnapshot {
434        let mut status_counts: HashMap<u16, usize> = HashMap::new();
435        for entry in self.response_status_counts.iter() {
436            let (key, value) = entry.pair();
437            status_counts.insert(*key, *value);
438        }
439
440        // Get recent rates from exponential moving averages
441        let recent_requests_per_second = self.requests_sent_ema.get_rate();
442        let recent_responses_per_second = self.responses_received_ema.get_rate();
443        let recent_items_per_second = self.items_scraped_ema.get_rate();
444
445        StatsSnapshot {
446            requests_enqueued: self.requests_enqueued.load(Ordering::Acquire),
447            requests_sent: self.requests_sent.load(Ordering::Acquire),
448            requests_succeeded: self.requests_succeeded.load(Ordering::Acquire),
449            requests_failed: self.requests_failed.load(Ordering::Acquire),
450            requests_retried: self.requests_retried.load(Ordering::Acquire),
451            requests_scheduled_for_retry: self.requests_scheduled_for_retry.load(Ordering::Acquire),
452            requests_dropped: self.requests_dropped.load(Ordering::Acquire),
453            retry_delay_in_flight_ms: self.retry_delay_in_flight_ms.load(Ordering::Acquire),
454            responses_received: self.responses_received.load(Ordering::Acquire),
455            responses_from_cache: self.responses_from_cache.load(Ordering::Acquire),
456            total_bytes_downloaded: self.total_bytes_downloaded.load(Ordering::Acquire),
457            items_scraped: self.items_scraped.load(Ordering::Acquire),
458            items_processed: self.items_processed.load(Ordering::Acquire),
459            items_dropped_by_pipeline: self.items_dropped_by_pipeline.load(Ordering::Acquire),
460            response_status_counts: status_counts,
461            elapsed_duration: self.start_time.elapsed(),
462            average_request_time: self.average_request_time(),
463            fastest_request_time: self.fastest_request_time(),
464            slowest_request_time: self.slowest_request_time(),
465            request_time_count: self.request_time_count(),
466            average_parsing_time: self.average_parsing_time(),
467            fastest_parsing_time: self.fastest_parsing_time(),
468            slowest_parsing_time: self.slowest_parsing_time(),
469            parsing_time_count: self.parsing_time_count(),
470
471            // Recent rates from sliding windows
472            recent_requests_per_second,
473            recent_responses_per_second,
474            recent_items_per_second,
475            current_item_preview: self.current_item_preview.read().clone(),
476        }
477    }
478
479    /// Increments the count of enqueued requests.
480    pub(crate) fn increment_requests_enqueued(&self) {
481        self.requests_enqueued.fetch_add(1, Ordering::AcqRel);
482    }
483
484    /// Increments the count of sent requests.
485    pub(crate) fn increment_requests_sent(&self) {
486        self.requests_sent.fetch_add(1, Ordering::AcqRel);
487        // Update the EMA with a count of 1 for this event
488        self.requests_sent_ema.update(1);
489    }
490
491    /// Increments the count of successful requests.
492    pub(crate) fn increment_requests_succeeded(&self) {
493        self.requests_succeeded.fetch_add(1, Ordering::AcqRel);
494    }
495
496    /// Increments the count of failed requests.
497    pub(crate) fn increment_requests_failed(&self) {
498        self.requests_failed.fetch_add(1, Ordering::AcqRel);
499    }
500
501    /// Increments the count of retried requests.
502    pub(crate) fn increment_requests_retried(&self) {
503        self.requests_retried.fetch_add(1, Ordering::AcqRel);
504    }
505
506    /// Increments the count of dropped requests.
507    pub(crate) fn increment_requests_dropped(&self) {
508        self.requests_dropped.fetch_add(1, Ordering::AcqRel);
509    }
510
511    /// Increments the count of retries scheduled outside the downloader permit path.
512    pub(crate) fn increment_requests_scheduled_for_retry(&self) {
513        self.requests_scheduled_for_retry
514            .fetch_add(1, Ordering::AcqRel);
515    }
516
517    /// Adds the currently scheduled retry delay in milliseconds.
518    pub(crate) fn add_retry_delay_in_flight(&self, delay: Duration) {
519        let millis = delay.as_millis().min(u128::from(u64::MAX)) as u64;
520        self.retry_delay_in_flight_ms
521            .fetch_add(millis, Ordering::AcqRel);
522    }
523
524    /// Removes completed retry delay from the in-flight total.
525    pub(crate) fn remove_retry_delay_in_flight(&self, delay: Duration) {
526        let millis = delay.as_millis().min(u128::from(u64::MAX)) as u64;
527        let mut current = self.retry_delay_in_flight_ms.load(Ordering::Acquire);
528        loop {
529            let next = current.saturating_sub(millis);
530            match self.retry_delay_in_flight_ms.compare_exchange_weak(
531                current,
532                next,
533                Ordering::AcqRel,
534                Ordering::Acquire,
535            ) {
536                Ok(_) => break,
537                Err(actual) => current = actual,
538            }
539        }
540    }
541
542    /// Increments the count of received responses.
543    pub(crate) fn increment_responses_received(&self) {
544        self.responses_received.fetch_add(1, Ordering::AcqRel);
545        // Update the EMA with a count of 1 for this event
546        self.responses_received_ema.update(1);
547    }
548
549    /// Increments the count of responses served from cache.
550    pub(crate) fn increment_responses_from_cache(&self) {
551        self.responses_from_cache.fetch_add(1, Ordering::AcqRel);
552    }
553
554    /// Records a response status code.
555    pub(crate) fn record_response_status(&self, status_code: u16) {
556        *self.response_status_counts.entry(status_code).or_insert(0) += 1;
557    }
558
559    /// Adds to the total bytes downloaded.
560    pub(crate) fn add_bytes_downloaded(&self, bytes: usize) {
561        self.total_bytes_downloaded
562            .fetch_add(bytes, Ordering::AcqRel);
563    }
564
565    /// Adds multiple scraped items to the counter.
566    pub(crate) fn add_items_scraped(&self, count: usize) {
567        if count == 0 {
568            return;
569        }
570        self.items_scraped.fetch_add(count, Ordering::AcqRel);
571        self.items_scraped_ema.update(count);
572    }
573
574    /// Stores a compact single-line preview of the most recently scraped item.
575    pub(crate) fn record_current_item_preview<I: ScrapedItem>(&self, item: &I) {
576        let json = item.to_json_value();
577        let preview = build_item_preview(&json, self.live_stats_preview_fields.as_deref())
578            .unwrap_or_else(|| {
579                serde_json::to_string(&json).unwrap_or_else(|_| format!("{:?}", item))
580            })
581            .replace(['\n', '\r'], " ");
582        let preview = truncate_preview(&preview, 160);
583        *self.current_item_preview.write() = preview;
584    }
585
586    /// Increments the count of processed items.
587    pub(crate) fn increment_items_processed(&self) {
588        self.items_processed.fetch_add(1, Ordering::AcqRel);
589    }
590
591    /// Increments the count of items dropped by pipelines.
592    pub(crate) fn increment_items_dropped_by_pipeline(&self) {
593        self.items_dropped_by_pipeline
594            .fetch_add(1, Ordering::AcqRel);
595    }
596
597    /// Records the time taken for a request.
598    pub fn record_request_time(&self, _url: &str, duration: Duration) {
599        let nanos = duration.as_nanos().min(u128::from(u64::MAX)) as u64;
600        self.request_time_total_nanos
601            .fetch_add(nanos, Ordering::AcqRel);
602        self.request_time_count_total.fetch_add(1, Ordering::AcqRel);
603        update_min(&self.request_time_fastest_nanos, nanos);
604        update_max(&self.request_time_slowest_nanos, nanos);
605    }
606
607    /// Calculates the average request time across all recorded requests.
608    pub fn average_request_time(&self) -> Option<Duration> {
609        average_duration(
610            &self.request_time_total_nanos,
611            self.request_time_count_total.load(Ordering::Acquire),
612        )
613    }
614
615    /// Gets the fastest request time among all recorded requests.
616    pub fn fastest_request_time(&self) -> Option<Duration> {
617        duration_from_extreme(
618            &self.request_time_fastest_nanos,
619            self.request_time_count_total.load(Ordering::Acquire),
620            true,
621        )
622    }
623
624    /// Gets the slowest request time among all recorded requests.
625    pub fn slowest_request_time(&self) -> Option<Duration> {
626        duration_from_extreme(
627            &self.request_time_slowest_nanos,
628            self.request_time_count_total.load(Ordering::Acquire),
629            false,
630        )
631    }
632
633    /// Gets the total number of recorded request times.
634    pub fn request_time_count(&self) -> usize {
635        self.request_time_count_total.load(Ordering::Acquire)
636    }
637
638    /// Gets the request time for a specific URL.
639    pub fn get_request_time(&self, url: &str) -> Option<Duration> {
640        let _ = url;
641        None
642    }
643
644    /// Gets all recorded request times as a vector of (URL, Duration) pairs.
645    pub fn get_all_request_times(&self) -> Vec<(String, Duration)> {
646        Vec::new()
647    }
648
649    /// Records the time taken for parsing a response.
650    pub fn record_parsing_time(&self, duration: Duration) {
651        let nanos = duration.as_nanos().min(u128::from(u64::MAX)) as u64;
652        self.parsing_time_total_nanos
653            .fetch_add(nanos, Ordering::AcqRel);
654        self.parsing_time_count_total.fetch_add(1, Ordering::AcqRel);
655        update_min(&self.parsing_time_fastest_nanos, nanos);
656        update_max(&self.parsing_time_slowest_nanos, nanos);
657    }
658
659    /// Calculates the average parsing time across all recorded parses.
660    pub fn average_parsing_time(&self) -> Option<Duration> {
661        average_duration(
662            &self.parsing_time_total_nanos,
663            self.parsing_time_count_total.load(Ordering::Acquire),
664        )
665    }
666
667    /// Gets the fastest parsing time among all recorded parses.
668    pub fn fastest_parsing_time(&self) -> Option<Duration> {
669        duration_from_extreme(
670            &self.parsing_time_fastest_nanos,
671            self.parsing_time_count_total.load(Ordering::Acquire),
672            true,
673        )
674    }
675
676    /// Gets the slowest parsing time among all recorded parses.
677    pub fn slowest_parsing_time(&self) -> Option<Duration> {
678        duration_from_extreme(
679            &self.parsing_time_slowest_nanos,
680            self.parsing_time_count_total.load(Ordering::Acquire),
681            false,
682        )
683    }
684
685    /// Gets the total number of recorded parsing times.
686    pub fn parsing_time_count(&self) -> usize {
687        self.parsing_time_count_total.load(Ordering::Acquire)
688    }
689
690    /// Clears all recorded request times.
691    pub fn clear_request_times(&self) {
692        self.request_time_total_nanos.store(0, Ordering::Release);
693        self.request_time_fastest_nanos
694            .store(u64::MAX, Ordering::Release);
695        self.request_time_slowest_nanos.store(0, Ordering::Release);
696        self.request_time_count_total.store(0, Ordering::Release);
697    }
698
699    /// Clears all recorded parsing times.
700    pub fn clear_parsing_times(&self) {
701        self.parsing_time_total_nanos.store(0, Ordering::Release);
702        self.parsing_time_fastest_nanos
703            .store(u64::MAX, Ordering::Release);
704        self.parsing_time_slowest_nanos.store(0, Ordering::Release);
705        self.parsing_time_count_total.store(0, Ordering::Release);
706    }
707
708    /// Converts the snapshot into a JSON string.
709    pub fn to_json_string(&self) -> Result<String, SpiderError> {
710        Ok(serde_json::to_string(self)?)
711    }
712
713    /// Converts the snapshot into a pretty-printed JSON string.
714    pub fn to_json_string_pretty(&self) -> Result<String, SpiderError> {
715        Ok(serde_json::to_string_pretty(self)?)
716    }
717
718    /// Exports the current statistics to a Markdown formatted string.
719    pub fn to_markdown_string(&self) -> String {
720        let snapshot = self.snapshot();
721
722        let status_codes_list: String = snapshot
723            .response_status_counts
724            .iter()
725            .map(|(code, count)| format!("- **{}**: {}", code, count))
726            .collect::<Vec<String>>()
727            .join("\n");
728        let status_codes_output = if status_codes_list.is_empty() {
729            "N/A".to_string()
730        } else {
731            status_codes_list
732        };
733
734        format!(
735            r#"# Crawl Statistics Report
736
737- **Duration**: {}
738- **Current Rate** (last 10s): {:.2} req/s, {:.2} resp/s, {:.2} item/s
739- **Overall Rate** (total): {:.2} req/s, {:.2} resp/s, {:.2} item/s
740- **Bytes Per Second**: {}
741- **Request Ratios**: success {:.2}%, failure {:.2}%
742- **Cache Hit Ratio**: {:.2}%
743
744## Requests
745| Metric     | Count |
746|------------|-------|
747| Enqueued   | {}     |
748| Sent       | {}     |
749| Pending    | {}     |
750| Succeeded  | {}     |
751| Failed     | {}     |
752| Retried    | {}     |
753| Retry Scheduled | {} |
754| Dropped    | {}     |
755| Retry Delay In Flight | {} ms |
756
757## Responses
758| Metric     | Count |
759|------------|-------|
760| Received   | {}     |
761| From Cache | {}     |
762| Downloaded | {}     |
763
764## Items
765| Metric     | Count |
766|------------|--------|
767| Scraped    | {}     |
768| Processed  | {}     |
769| Dropped    | {}     |
770
771## Request Times
772| Metric           | Value      |
773|------------------|------------|
774| Average Time     | {}         |
775| Fastest Request  | {}         |
776| Slowest Request  | {}         |
777| Total Recorded   | {}         |
778
779## Parsing Times
780| Metric           | Value      |
781|------------------|------------|
782| Average Time     | {}         |
783| Fastest Parse    | {}         |
784| Slowest Parse    | {}         |
785| Total Recorded   | {}         |
786
787## Status Codes
788{}
789"#,
790            snapshot.formatted_duration(),
791            snapshot.requests_per_second(),
792            snapshot.responses_per_second(),
793            snapshot.items_per_second(),
794            // Calculate cumulative speeds for comparison
795            {
796                let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
797                if total_seconds > 0.0 {
798                    snapshot.requests_sent as f64 / total_seconds
799                } else {
800                    0.0
801                }
802            },
803            {
804                let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
805                if total_seconds > 0.0 {
806                    snapshot.responses_received as f64 / total_seconds
807                } else {
808                    0.0
809                }
810            },
811            {
812                let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
813                if total_seconds > 0.0 {
814                    snapshot.items_scraped as f64 / total_seconds
815                } else {
816                    0.0
817                }
818            },
819            snapshot.formatted_bytes_per_second(),
820            snapshot.success_ratio(),
821            snapshot.failure_ratio(),
822            snapshot.cache_hit_ratio(),
823            snapshot.requests_enqueued,
824            snapshot.requests_sent,
825            snapshot.pending_requests(),
826            snapshot.requests_succeeded,
827            snapshot.requests_failed,
828            snapshot.requests_retried,
829            snapshot.requests_scheduled_for_retry,
830            snapshot.requests_dropped,
831            snapshot.retry_delay_in_flight_ms,
832            snapshot.responses_received,
833            snapshot.responses_from_cache,
834            snapshot.formatted_bytes(),
835            snapshot.items_scraped,
836            snapshot.items_processed,
837            snapshot.items_dropped_by_pipeline,
838            snapshot.formatted_request_time(snapshot.average_request_time),
839            snapshot.formatted_request_time(snapshot.fastest_request_time),
840            snapshot.formatted_request_time(snapshot.slowest_request_time),
841            snapshot.request_time_count,
842            snapshot.formatted_request_time(snapshot.average_parsing_time),
843            snapshot.formatted_request_time(snapshot.fastest_parsing_time),
844            snapshot.formatted_request_time(snapshot.slowest_parsing_time),
845            snapshot.parsing_time_count,
846            status_codes_output
847        )
848    }
849
850    /// Exports current statistics to the text layout used for terminal output.
851    pub fn to_live_report_string(&self) -> String {
852        let snapshot = self.snapshot();
853        format_plain_text_metrics(&snapshot)
854    }
855}
856
857fn truncate_preview(input: &str, max_chars: usize) -> String {
858    let mut chars = input.chars();
859    let truncated: String = chars.by_ref().take(max_chars).collect();
860    if chars.next().is_some() {
861        format!("{truncated}...")
862    } else {
863        truncated
864    }
865}
866
867fn build_item_preview(json: &serde_json::Value, fields: Option<&[String]>) -> Option<String> {
868    let fields = fields?;
869
870    if fields.len() == 1 {
871        let (_, path) = parse_preview_field(&fields[0]);
872        return get_value_by_path(json, path).map(format_preview_value);
873    }
874
875    let mut preview = serde_json::Map::new();
876
877    for field in fields {
878        let (label, path) = parse_preview_field(field);
879        if let Some(value) = get_value_by_path(json, path) {
880            preview.insert(label.to_string(), value.clone());
881        }
882    }
883
884    if preview.is_empty() {
885        None
886    } else {
887        serde_json::to_string(&serde_json::Value::Object(preview)).ok()
888    }
889}
890
891fn format_preview_value(value: &serde_json::Value) -> String {
892    match value {
893        serde_json::Value::Null => "null".to_string(),
894        serde_json::Value::Bool(boolean) => boolean.to_string(),
895        serde_json::Value::Number(number) => number.to_string(),
896        serde_json::Value::String(text) => text.clone(),
897        serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
898            serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
899        }
900    }
901}
902
903fn parse_preview_field(field: &str) -> (&str, &str) {
904    match field.split_once('=') {
905        Some((label, path)) if !label.is_empty() && !path.is_empty() => (label, path),
906        _ => (field, field),
907    }
908}
909
910fn get_value_by_path<'a>(
911    value: &'a serde_json::Value,
912    path: &str,
913) -> Option<&'a serde_json::Value> {
914    let mut current = value;
915    for segment in path.split('.') {
916        current = current.get(segment)?;
917    }
918    Some(current)
919}
920
921impl Default for StatCollector {
922    fn default() -> Self {
923        Self::new(None)
924    }
925}
926
927impl std::fmt::Display for StatCollector {
928    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
929        write!(f, "\n{}\n", self.to_live_report_string())
930    }
931}
932
933fn average_duration(total_nanos: &AtomicU64, count: usize) -> Option<Duration> {
934    if count == 0 {
935        return None;
936    }
937
938    Some(Duration::from_nanos(
939        total_nanos.load(Ordering::Acquire) / count as u64,
940    ))
941}
942
943fn duration_from_extreme(extreme: &AtomicU64, count: usize, is_min: bool) -> Option<Duration> {
944    if count == 0 {
945        return None;
946    }
947
948    let nanos = extreme.load(Ordering::Acquire);
949    if is_min && nanos == u64::MAX {
950        None
951    } else {
952        Some(Duration::from_nanos(nanos))
953    }
954}
955
956fn update_min(target: &AtomicU64, candidate: u64) {
957    let mut current = target.load(Ordering::Acquire);
958    while candidate < current {
959        match target.compare_exchange_weak(current, candidate, Ordering::AcqRel, Ordering::Acquire)
960        {
961            Ok(_) => break,
962            Err(actual) => current = actual,
963        }
964    }
965}
966
967fn update_max(target: &AtomicU64, candidate: u64) {
968    let mut current = target.load(Ordering::Acquire);
969    while candidate > current {
970        match target.compare_exchange_weak(current, candidate, Ordering::AcqRel, Ordering::Acquire)
971        {
972            Ok(_) => break,
973            Err(actual) => current = actual,
974        }
975    }
976}