Skip to main content

spider_core/
stats.rs

1//! Runtime statistics and reporting helpers.
2//!
3//! [`StatCollector`] records request counts, response status codes, cache hits,
4//! timings, bandwidth, and item throughput while a crawl is running.
5//!
6//! ## Example
7//!
8//! ```rust,ignore
9//! use spider_core::StatCollector;
10//!
11//! let stats = StatCollector::new();
12//!
13//! // During crawling, metrics are automatically updated
14//! stats.increment_requests_sent();
15//! stats.increment_items_scraped();
16//!
17//! // Export statistics in various formats
18//! println!("{}", stats.to_json_string_pretty().unwrap());
19//! println!("{}", stats.to_markdown_string());
20//! ```
21
22use parking_lot::RwLock;
23use spider_util::error::SpiderError;
24use spider_util::item::ScrapedItem;
25use spider_util::metrics::{
26    ExpMovingAverage, MetricsSnapshot, MetricsSnapshotProvider, format_plain_text_metrics,
27};
28use std::{
29    collections::HashMap,
30    sync::{
31        Arc,
32        atomic::{AtomicU64, AtomicUsize, Ordering},
33    },
34    time::{Duration, Instant},
35};
36
37// A snapshot of the current statistics, used for reporting.
38// This avoids code duplication in the various export/display methods.
39struct StatsSnapshot {
40    requests_enqueued: usize,
41    requests_sent: usize,
42    requests_succeeded: usize,
43    requests_failed: usize,
44    requests_retried: usize,
45    requests_scheduled_for_retry: usize,
46    requests_dropped: usize,
47    retry_delay_in_flight_ms: u64,
48    responses_received: usize,
49    responses_from_cache: usize,
50    total_bytes_downloaded: usize,
51    items_scraped: usize,
52    items_processed: usize,
53    items_dropped_by_pipeline: usize,
54    queue_depth: usize,
55    parser_backlog: usize,
56    pipeline_backlog: usize,
57    retry_backlog: usize,
58    response_status_counts: HashMap<u16, usize>,
59    elapsed_duration: Duration,
60    average_request_time: Option<Duration>,
61    fastest_request_time: Option<Duration>,
62    slowest_request_time: Option<Duration>,
63    request_time_count: usize,
64    average_parsing_time: Option<Duration>,
65    fastest_parsing_time: Option<Duration>,
66    slowest_parsing_time: Option<Duration>,
67    parsing_time_count: usize,
68
69    // Recent rates from sliding windows
70    recent_requests_per_second: f64,
71    recent_responses_per_second: f64,
72    recent_items_per_second: f64,
73    current_item_preview: String,
74}
75
76impl StatsSnapshot {
77    fn formatted_duration(&self) -> String {
78        let total_secs = self.elapsed_duration.as_secs();
79        let hours = total_secs / 3600;
80        let minutes = (total_secs % 3600) / 60;
81        let seconds = self.elapsed_duration.as_secs_f64() % 60.0;
82
83        if hours > 0 {
84            format!("{hours}h {minutes:02}m {seconds:05.2}s")
85        } else if minutes > 0 {
86            format!("{minutes}m {seconds:05.2}s")
87        } else if total_secs > 0 {
88            format!("{:.2}s", self.elapsed_duration.as_secs_f64())
89        } else if self.elapsed_duration.as_millis() > 0 {
90            format!("{}ms", self.elapsed_duration.as_millis())
91        } else {
92            format!("{}us", self.elapsed_duration.as_micros())
93        }
94    }
95
96    fn formatted_request_time(&self, duration: Option<Duration>) -> String {
97        match duration {
98            Some(d) => {
99                if d.as_millis() < 1000 {
100                    format!("{} ms", d.as_millis())
101                } else {
102                    format!("{:.2} s", d.as_secs_f64())
103                }
104            }
105            None => "N/A".to_string(),
106        }
107    }
108
109    fn requests_per_second(&self) -> f64 {
110        let elapsed = self.elapsed_duration.as_secs_f64();
111        if elapsed > 0.0 {
112            self.requests_sent as f64 / elapsed
113        } else {
114            0.0
115        }
116    }
117
118    fn responses_per_second(&self) -> f64 {
119        let elapsed = self.elapsed_duration.as_secs_f64();
120        if elapsed > 0.0 {
121            self.responses_received as f64 / elapsed
122        } else {
123            0.0
124        }
125    }
126
127    fn items_per_second(&self) -> f64 {
128        let elapsed = self.elapsed_duration.as_secs_f64();
129        if elapsed > 0.0 {
130            self.items_scraped as f64 / elapsed
131        } else {
132            0.0
133        }
134    }
135
136    fn bytes_per_second(&self) -> f64 {
137        let elapsed = self.elapsed_duration.as_secs_f64();
138        if elapsed > 0.0 {
139            self.total_bytes_downloaded as f64 / elapsed
140        } else {
141            0.0
142        }
143    }
144
145    fn formatted_bytes(&self) -> String {
146        const KB: usize = 1024;
147        const MB: usize = 1024 * KB;
148        const GB: usize = 1024 * MB;
149
150        if self.total_bytes_downloaded >= GB {
151            format!("{:.2} GB", self.total_bytes_downloaded as f64 / GB as f64)
152        } else if self.total_bytes_downloaded >= MB {
153            format!("{:.2} MB", self.total_bytes_downloaded as f64 / MB as f64)
154        } else if self.total_bytes_downloaded >= KB {
155            format!("{:.2} KB", self.total_bytes_downloaded as f64 / KB as f64)
156        } else {
157            format!("{} B", self.total_bytes_downloaded)
158        }
159    }
160
161    fn formatted_bytes_per_second(&self) -> String {
162        let bytes_per_second = self.bytes_per_second() as usize;
163        const KB: usize = 1024;
164        const MB: usize = 1024 * KB;
165        const GB: usize = 1024 * MB;
166
167        if bytes_per_second >= GB {
168            format!("{:.2} GB/s", bytes_per_second as f64 / GB as f64)
169        } else if bytes_per_second >= MB {
170            format!("{:.2} MB/s", bytes_per_second as f64 / MB as f64)
171        } else if bytes_per_second >= KB {
172            format!("{:.2} KB/s", bytes_per_second as f64 / KB as f64)
173        } else {
174            format!("{bytes_per_second} B/s")
175        }
176    }
177
178    fn pending_requests(&self) -> usize {
179        self.requests_enqueued
180            .saturating_sub(self.requests_succeeded + self.requests_failed + self.requests_dropped)
181    }
182
183    fn success_ratio(&self) -> f64 {
184        if self.requests_sent == 0 {
185            0.0
186        } else {
187            self.requests_succeeded as f64 / self.requests_sent as f64 * 100.0
188        }
189    }
190
191    fn failure_ratio(&self) -> f64 {
192        if self.requests_sent == 0 {
193            0.0
194        } else {
195            self.requests_failed as f64 / self.requests_sent as f64 * 100.0
196        }
197    }
198
199    fn cache_hit_ratio(&self) -> f64 {
200        if self.responses_received == 0 {
201            0.0
202        } else {
203            self.responses_from_cache as f64 / self.responses_received as f64 * 100.0
204        }
205    }
206}
207
208impl MetricsSnapshotProvider for StatsSnapshot {
209    fn get_requests_enqueued(&self) -> usize {
210        self.requests_enqueued
211    }
212
213    fn get_requests_sent(&self) -> usize {
214        self.requests_sent
215    }
216
217    fn get_requests_succeeded(&self) -> usize {
218        self.requests_succeeded
219    }
220
221    fn get_requests_failed(&self) -> usize {
222        self.requests_failed
223    }
224
225    fn get_requests_retried(&self) -> usize {
226        self.requests_retried
227    }
228
229    fn get_requests_scheduled_for_retry(&self) -> usize {
230        self.requests_scheduled_for_retry
231    }
232
233    fn get_requests_dropped(&self) -> usize {
234        self.requests_dropped
235    }
236
237    fn get_retry_delay_in_flight_ms(&self) -> u64 {
238        self.retry_delay_in_flight_ms
239    }
240
241    fn get_responses_received(&self) -> usize {
242        self.responses_received
243    }
244
245    fn get_responses_from_cache(&self) -> usize {
246        self.responses_from_cache
247    }
248
249    fn get_total_bytes_downloaded(&self) -> usize {
250        self.total_bytes_downloaded
251    }
252
253    fn get_items_scraped(&self) -> usize {
254        self.items_scraped
255    }
256
257    fn get_items_processed(&self) -> usize {
258        self.items_processed
259    }
260
261    fn get_items_dropped_by_pipeline(&self) -> usize {
262        self.items_dropped_by_pipeline
263    }
264
265    fn get_queue_depth(&self) -> usize {
266        self.queue_depth
267    }
268
269    fn get_parser_backlog(&self) -> usize {
270        self.parser_backlog
271    }
272
273    fn get_pipeline_backlog(&self) -> usize {
274        self.pipeline_backlog
275    }
276
277    fn get_retry_backlog(&self) -> usize {
278        self.retry_backlog
279    }
280
281    fn get_response_status_counts(&self) -> &HashMap<u16, usize> {
282        &self.response_status_counts
283    }
284
285    fn get_elapsed_duration(&self) -> Duration {
286        self.elapsed_duration
287    }
288
289    fn get_average_request_time(&self) -> Option<Duration> {
290        self.average_request_time
291    }
292
293    fn get_fastest_request_time(&self) -> Option<Duration> {
294        self.fastest_request_time
295    }
296
297    fn get_slowest_request_time(&self) -> Option<Duration> {
298        self.slowest_request_time
299    }
300
301    fn get_request_time_count(&self) -> usize {
302        self.request_time_count
303    }
304
305    fn get_average_parsing_time(&self) -> Option<Duration> {
306        self.average_parsing_time
307    }
308
309    fn get_fastest_parsing_time(&self) -> Option<Duration> {
310        self.fastest_parsing_time
311    }
312
313    fn get_slowest_parsing_time(&self) -> Option<Duration> {
314        self.slowest_parsing_time
315    }
316
317    fn get_parsing_time_count(&self) -> usize {
318        self.parsing_time_count
319    }
320
321    fn get_recent_requests_per_second(&self) -> f64 {
322        self.recent_requests_per_second
323    }
324
325    fn get_recent_responses_per_second(&self) -> f64 {
326        self.recent_responses_per_second
327    }
328
329    fn get_recent_items_per_second(&self) -> f64 {
330        self.recent_items_per_second
331    }
332
333    fn get_current_item_preview(&self) -> &str {
334        &self.current_item_preview
335    }
336
337    fn formatted_duration(&self) -> String {
338        self.formatted_duration()
339    }
340
341    fn formatted_request_time(&self, duration: Option<Duration>) -> String {
342        self.formatted_request_time(duration)
343    }
344
345    fn formatted_bytes(&self) -> String {
346        self.formatted_bytes()
347    }
348}
349
350/// Collects and stores various statistics about the crawler's operation.
351#[derive(Debug, serde::Serialize)]
352pub struct StatCollector {
353    // Crawl-related metrics
354    #[serde(skip)]
355    pub start_time: Instant,
356
357    // Request-related metrics
358    pub requests_enqueued: AtomicUsize,
359    pub requests_sent: AtomicUsize,
360    pub requests_succeeded: AtomicUsize,
361    pub requests_failed: AtomicUsize,
362    pub requests_retried: AtomicUsize,
363    pub requests_scheduled_for_retry: AtomicUsize,
364    pub requests_dropped: AtomicUsize,
365    pub retry_delay_in_flight_ms: AtomicU64,
366
367    // Response-related metrics
368    pub responses_received: AtomicUsize,
369    pub responses_from_cache: AtomicUsize,
370    pub response_status_counts: Arc<dashmap::DashMap<u16, usize>>, // e.g., 200, 404, 500
371    pub total_bytes_downloaded: AtomicUsize,
372
373    // Add more advanced response time metrics if needed (e.g., histograms)
374
375    // Item-related metrics
376    pub items_scraped: AtomicUsize,
377    pub items_processed: AtomicUsize,
378    pub items_dropped_by_pipeline: AtomicUsize,
379    pub queue_depth: AtomicUsize,
380    pub parser_backlog: AtomicUsize,
381    pub pipeline_backlog: AtomicUsize,
382    pub retry_backlog: AtomicUsize,
383
384    // Timing metrics - Using bounded LRU caches to prevent memory leaks
385    // Only keeps recent entries (max 10,000 for requests, 1,000 for parsing)
386    #[serde(skip)]
387    request_time_total_nanos: AtomicU64,
388    #[serde(skip)]
389    request_time_fastest_nanos: AtomicU64,
390    #[serde(skip)]
391    request_time_slowest_nanos: AtomicU64,
392    #[serde(skip)]
393    request_time_count_total: AtomicUsize,
394    #[serde(skip)]
395    parsing_time_total_nanos: AtomicU64,
396    #[serde(skip)]
397    parsing_time_fastest_nanos: AtomicU64,
398    #[serde(skip)]
399    parsing_time_slowest_nanos: AtomicU64,
400    #[serde(skip)]
401    parsing_time_count_total: AtomicUsize,
402
403    // Exponential moving average metrics for accurate speed calculations
404    #[serde(skip)]
405    requests_sent_ema: ExpMovingAverage,
406    #[serde(skip)]
407    responses_received_ema: ExpMovingAverage,
408    #[serde(skip)]
409    items_scraped_ema: ExpMovingAverage,
410    #[serde(skip)]
411    current_item_preview: Arc<RwLock<String>>,
412    #[serde(skip)]
413    live_stats_preview_fields: Option<Vec<String>>,
414}
415
416impl StatCollector {
417    /// Creates a new `StatCollector` with all counters initialized to zero.
418    pub(crate) fn new(live_stats_preview_fields: Option<Vec<String>>) -> Self {
419        Self::build(live_stats_preview_fields)
420    }
421
422    fn build(live_stats_preview_fields: Option<Vec<String>>) -> Self {
423        StatCollector {
424            start_time: Instant::now(),
425            requests_enqueued: AtomicUsize::new(0),
426            requests_sent: AtomicUsize::new(0),
427            requests_succeeded: AtomicUsize::new(0),
428            requests_failed: AtomicUsize::new(0),
429            requests_retried: AtomicUsize::new(0),
430            requests_scheduled_for_retry: AtomicUsize::new(0),
431            requests_dropped: AtomicUsize::new(0),
432            retry_delay_in_flight_ms: AtomicU64::new(0),
433            responses_received: AtomicUsize::new(0),
434            responses_from_cache: AtomicUsize::new(0),
435            response_status_counts: Arc::new(dashmap::DashMap::new()),
436            total_bytes_downloaded: AtomicUsize::new(0),
437            items_scraped: AtomicUsize::new(0),
438            items_processed: AtomicUsize::new(0),
439            items_dropped_by_pipeline: AtomicUsize::new(0),
440            queue_depth: AtomicUsize::new(0),
441            parser_backlog: AtomicUsize::new(0),
442            pipeline_backlog: AtomicUsize::new(0),
443            retry_backlog: AtomicUsize::new(0),
444            request_time_total_nanos: AtomicU64::new(0),
445            request_time_fastest_nanos: AtomicU64::new(u64::MAX),
446            request_time_slowest_nanos: AtomicU64::new(0),
447            request_time_count_total: AtomicUsize::new(0),
448            parsing_time_total_nanos: AtomicU64::new(0),
449            parsing_time_fastest_nanos: AtomicU64::new(u64::MAX),
450            parsing_time_slowest_nanos: AtomicU64::new(0),
451            parsing_time_count_total: AtomicUsize::new(0),
452            // Initialize exponential moving averages for recent speed calculations (alpha = 0.2 for good balance)
453            requests_sent_ema: ExpMovingAverage::new(0.2),
454            responses_received_ema: ExpMovingAverage::new(0.2),
455            items_scraped_ema: ExpMovingAverage::new(0.2),
456            current_item_preview: Arc::new(RwLock::new("none".to_string())),
457            live_stats_preview_fields,
458        }
459    }
460
461    /// Creates a snapshot of the current statistics.
462    /// This is the single source of truth for all presentation logic.
463    fn internal_snapshot(&self) -> StatsSnapshot {
464        let mut status_counts: HashMap<u16, usize> = HashMap::new();
465        for entry in self.response_status_counts.iter() {
466            let (key, value) = entry.pair();
467            status_counts.insert(*key, *value);
468        }
469
470        // Get recent rates from exponential moving averages
471        let recent_requests_per_second = self.requests_sent_ema.get_rate();
472        let recent_responses_per_second = self.responses_received_ema.get_rate();
473        let recent_items_per_second = self.items_scraped_ema.get_rate();
474
475        StatsSnapshot {
476            requests_enqueued: self.requests_enqueued.load(Ordering::Acquire),
477            requests_sent: self.requests_sent.load(Ordering::Acquire),
478            requests_succeeded: self.requests_succeeded.load(Ordering::Acquire),
479            requests_failed: self.requests_failed.load(Ordering::Acquire),
480            requests_retried: self.requests_retried.load(Ordering::Acquire),
481            requests_scheduled_for_retry: self.requests_scheduled_for_retry.load(Ordering::Acquire),
482            requests_dropped: self.requests_dropped.load(Ordering::Acquire),
483            retry_delay_in_flight_ms: self.retry_delay_in_flight_ms.load(Ordering::Acquire),
484            responses_received: self.responses_received.load(Ordering::Acquire),
485            responses_from_cache: self.responses_from_cache.load(Ordering::Acquire),
486            total_bytes_downloaded: self.total_bytes_downloaded.load(Ordering::Acquire),
487            items_scraped: self.items_scraped.load(Ordering::Acquire),
488            items_processed: self.items_processed.load(Ordering::Acquire),
489            items_dropped_by_pipeline: self.items_dropped_by_pipeline.load(Ordering::Acquire),
490            queue_depth: self.queue_depth.load(Ordering::Acquire),
491            parser_backlog: self.parser_backlog.load(Ordering::Acquire),
492            pipeline_backlog: self.pipeline_backlog.load(Ordering::Acquire),
493            retry_backlog: self.retry_backlog.load(Ordering::Acquire),
494            response_status_counts: status_counts,
495            elapsed_duration: self.start_time.elapsed(),
496            average_request_time: self.average_request_time(),
497            fastest_request_time: self.fastest_request_time(),
498            slowest_request_time: self.slowest_request_time(),
499            request_time_count: self.request_time_count(),
500            average_parsing_time: self.average_parsing_time(),
501            fastest_parsing_time: self.fastest_parsing_time(),
502            slowest_parsing_time: self.slowest_parsing_time(),
503            parsing_time_count: self.parsing_time_count(),
504
505            // Recent rates from sliding windows
506            recent_requests_per_second,
507            recent_responses_per_second,
508            recent_items_per_second,
509            current_item_preview: self.current_item_preview.read().clone(),
510        }
511    }
512
513    /// Returns a public immutable snapshot of the current crawl metrics.
514    pub fn snapshot(&self) -> MetricsSnapshot {
515        let snapshot = self.internal_snapshot();
516        MetricsSnapshot {
517            requests_enqueued: snapshot.requests_enqueued,
518            requests_sent: snapshot.requests_sent,
519            requests_succeeded: snapshot.requests_succeeded,
520            requests_failed: snapshot.requests_failed,
521            requests_retried: snapshot.requests_retried,
522            requests_scheduled_for_retry: snapshot.requests_scheduled_for_retry,
523            requests_dropped: snapshot.requests_dropped,
524            retry_delay_in_flight_ms: snapshot.retry_delay_in_flight_ms,
525            responses_received: snapshot.responses_received,
526            responses_from_cache: snapshot.responses_from_cache,
527            total_bytes_downloaded: snapshot.total_bytes_downloaded,
528            items_scraped: snapshot.items_scraped,
529            items_processed: snapshot.items_processed,
530            items_dropped_by_pipeline: snapshot.items_dropped_by_pipeline,
531            queue_depth: snapshot.queue_depth,
532            parser_backlog: snapshot.parser_backlog,
533            pipeline_backlog: snapshot.pipeline_backlog,
534            retry_backlog: snapshot.retry_backlog,
535            response_status_counts: snapshot.response_status_counts,
536            elapsed_duration: snapshot.elapsed_duration,
537            average_request_time: snapshot.average_request_time,
538            fastest_request_time: snapshot.fastest_request_time,
539            slowest_request_time: snapshot.slowest_request_time,
540            request_time_count: snapshot.request_time_count,
541            average_parsing_time: snapshot.average_parsing_time,
542            fastest_parsing_time: snapshot.fastest_parsing_time,
543            slowest_parsing_time: snapshot.slowest_parsing_time,
544            parsing_time_count: snapshot.parsing_time_count,
545            recent_requests_per_second: snapshot.recent_requests_per_second,
546            recent_responses_per_second: snapshot.recent_responses_per_second,
547            recent_items_per_second: snapshot.recent_items_per_second,
548            current_item_preview: snapshot.current_item_preview,
549        }
550    }
551
552    /// Increments the count of enqueued requests.
553    pub(crate) fn increment_requests_enqueued(&self) {
554        self.requests_enqueued.fetch_add(1, Ordering::AcqRel);
555    }
556
557    /// Increments the count of sent requests.
558    pub(crate) fn increment_requests_sent(&self) {
559        self.requests_sent.fetch_add(1, Ordering::AcqRel);
560        // Update the EMA with a count of 1 for this event
561        self.requests_sent_ema.update(1);
562    }
563
564    /// Increments the count of successful requests.
565    pub(crate) fn increment_requests_succeeded(&self) {
566        self.requests_succeeded.fetch_add(1, Ordering::AcqRel);
567    }
568
569    /// Increments the count of failed requests.
570    pub(crate) fn increment_requests_failed(&self) {
571        self.requests_failed.fetch_add(1, Ordering::AcqRel);
572    }
573
574    /// Increments the count of retried requests.
575    pub(crate) fn increment_requests_retried(&self) {
576        self.requests_retried.fetch_add(1, Ordering::AcqRel);
577    }
578
579    /// Increments the count of dropped requests.
580    pub(crate) fn increment_requests_dropped(&self) {
581        self.requests_dropped.fetch_add(1, Ordering::AcqRel);
582    }
583
584    /// Increments the count of retries scheduled outside the downloader permit path.
585    pub(crate) fn increment_requests_scheduled_for_retry(&self) {
586        self.requests_scheduled_for_retry
587            .fetch_add(1, Ordering::AcqRel);
588        self.retry_backlog.fetch_add(1, Ordering::AcqRel);
589    }
590
591    /// Marks a scheduled retry as no longer waiting.
592    pub(crate) fn complete_scheduled_retry(&self) {
593        let mut current = self.retry_backlog.load(Ordering::Acquire);
594        loop {
595            let next = current.saturating_sub(1);
596            match self.retry_backlog.compare_exchange_weak(
597                current,
598                next,
599                Ordering::AcqRel,
600                Ordering::Acquire,
601            ) {
602                Ok(_) => break,
603                Err(actual) => current = actual,
604            }
605        }
606    }
607
608    /// Adds the currently scheduled retry delay in milliseconds.
609    pub(crate) fn add_retry_delay_in_flight(&self, delay: Duration) {
610        let millis = delay.as_millis().min(u128::from(u64::MAX)) as u64;
611        self.retry_delay_in_flight_ms
612            .fetch_add(millis, Ordering::AcqRel);
613    }
614
615    /// Removes completed retry delay from the in-flight total.
616    pub(crate) fn remove_retry_delay_in_flight(&self, delay: Duration) {
617        let millis = delay.as_millis().min(u128::from(u64::MAX)) as u64;
618        let mut current = self.retry_delay_in_flight_ms.load(Ordering::Acquire);
619        loop {
620            let next = current.saturating_sub(millis);
621            match self.retry_delay_in_flight_ms.compare_exchange_weak(
622                current,
623                next,
624                Ordering::AcqRel,
625                Ordering::Acquire,
626            ) {
627                Ok(_) => break,
628                Err(actual) => current = actual,
629            }
630        }
631    }
632
633    /// Increments the count of received responses.
634    pub(crate) fn increment_responses_received(&self) {
635        self.responses_received.fetch_add(1, Ordering::AcqRel);
636        // Update the EMA with a count of 1 for this event
637        self.responses_received_ema.update(1);
638    }
639
640    /// Increments the count of responses served from cache.
641    pub(crate) fn increment_responses_from_cache(&self) {
642        self.responses_from_cache.fetch_add(1, Ordering::AcqRel);
643    }
644
645    /// Records a response status code.
646    pub(crate) fn record_response_status(&self, status_code: u16) {
647        *self.response_status_counts.entry(status_code).or_insert(0) += 1;
648    }
649
650    /// Adds to the total bytes downloaded.
651    pub(crate) fn add_bytes_downloaded(&self, bytes: usize) {
652        self.total_bytes_downloaded
653            .fetch_add(bytes, Ordering::AcqRel);
654    }
655
656    /// Adds multiple scraped items to the counter.
657    pub(crate) fn add_items_scraped(&self, count: usize) {
658        if count == 0 {
659            return;
660        }
661        self.items_scraped.fetch_add(count, Ordering::AcqRel);
662        self.items_scraped_ema.update(count);
663    }
664
665    /// Stores a compact single-line preview of the most recently scraped item.
666    pub(crate) fn record_current_item_preview<I: ScrapedItem>(&self, item: &I) {
667        let json = item.to_json_value();
668        let preview = build_item_preview(&json, self.live_stats_preview_fields.as_deref())
669            .unwrap_or_else(|| {
670                serde_json::to_string(&json).unwrap_or_else(|_| format!("{:?}", item))
671            })
672            .replace(['\n', '\r'], " ");
673        let preview = truncate_preview(&preview, 160);
674        *self.current_item_preview.write() = preview;
675    }
676
677    /// Increments the count of processed items.
678    pub(crate) fn increment_items_processed(&self) {
679        self.items_processed.fetch_add(1, Ordering::AcqRel);
680    }
681
682    /// Increments the count of items dropped by pipelines.
683    pub(crate) fn increment_items_dropped_by_pipeline(&self) {
684        self.items_dropped_by_pipeline
685            .fetch_add(1, Ordering::AcqRel);
686    }
687
688    /// Updates queue and worker backlog gauges used by snapshots and live stats.
689    pub(crate) fn update_runtime_backlog(
690        &self,
691        queue_depth: usize,
692        parser_backlog: usize,
693        pipeline_backlog: usize,
694    ) {
695        self.queue_depth.store(queue_depth, Ordering::Release);
696        self.parser_backlog.store(parser_backlog, Ordering::Release);
697        self.pipeline_backlog
698            .store(pipeline_backlog, Ordering::Release);
699    }
700
701    /// Records the time taken for a request.
702    pub fn record_request_time(&self, _url: &str, duration: Duration) {
703        let nanos = duration.as_nanos().min(u128::from(u64::MAX)) as u64;
704        self.request_time_total_nanos
705            .fetch_add(nanos, Ordering::AcqRel);
706        self.request_time_count_total.fetch_add(1, Ordering::AcqRel);
707        update_min(&self.request_time_fastest_nanos, nanos);
708        update_max(&self.request_time_slowest_nanos, nanos);
709    }
710
711    /// Calculates the average request time across all recorded requests.
712    pub fn average_request_time(&self) -> Option<Duration> {
713        average_duration(
714            &self.request_time_total_nanos,
715            self.request_time_count_total.load(Ordering::Acquire),
716        )
717    }
718
719    /// Gets the fastest request time among all recorded requests.
720    pub fn fastest_request_time(&self) -> Option<Duration> {
721        duration_from_extreme(
722            &self.request_time_fastest_nanos,
723            self.request_time_count_total.load(Ordering::Acquire),
724            true,
725        )
726    }
727
728    /// Gets the slowest request time among all recorded requests.
729    pub fn slowest_request_time(&self) -> Option<Duration> {
730        duration_from_extreme(
731            &self.request_time_slowest_nanos,
732            self.request_time_count_total.load(Ordering::Acquire),
733            false,
734        )
735    }
736
737    /// Gets the total number of recorded request times.
738    pub fn request_time_count(&self) -> usize {
739        self.request_time_count_total.load(Ordering::Acquire)
740    }
741
742    /// Gets the request time for a specific URL.
743    pub fn get_request_time(&self, url: &str) -> Option<Duration> {
744        let _ = url;
745        None
746    }
747
748    /// Gets all recorded request times as a vector of (URL, Duration) pairs.
749    pub fn get_all_request_times(&self) -> Vec<(String, Duration)> {
750        Vec::new()
751    }
752
753    /// Records the time taken for parsing a response.
754    pub fn record_parsing_time(&self, duration: Duration) {
755        let nanos = duration.as_nanos().min(u128::from(u64::MAX)) as u64;
756        self.parsing_time_total_nanos
757            .fetch_add(nanos, Ordering::AcqRel);
758        self.parsing_time_count_total.fetch_add(1, Ordering::AcqRel);
759        update_min(&self.parsing_time_fastest_nanos, nanos);
760        update_max(&self.parsing_time_slowest_nanos, nanos);
761    }
762
763    /// Calculates the average parsing time across all recorded parses.
764    pub fn average_parsing_time(&self) -> Option<Duration> {
765        average_duration(
766            &self.parsing_time_total_nanos,
767            self.parsing_time_count_total.load(Ordering::Acquire),
768        )
769    }
770
771    /// Gets the fastest parsing time among all recorded parses.
772    pub fn fastest_parsing_time(&self) -> Option<Duration> {
773        duration_from_extreme(
774            &self.parsing_time_fastest_nanos,
775            self.parsing_time_count_total.load(Ordering::Acquire),
776            true,
777        )
778    }
779
780    /// Gets the slowest parsing time among all recorded parses.
781    pub fn slowest_parsing_time(&self) -> Option<Duration> {
782        duration_from_extreme(
783            &self.parsing_time_slowest_nanos,
784            self.parsing_time_count_total.load(Ordering::Acquire),
785            false,
786        )
787    }
788
789    /// Gets the total number of recorded parsing times.
790    pub fn parsing_time_count(&self) -> usize {
791        self.parsing_time_count_total.load(Ordering::Acquire)
792    }
793
794    /// Clears all recorded request times.
795    pub fn clear_request_times(&self) {
796        self.request_time_total_nanos.store(0, Ordering::Release);
797        self.request_time_fastest_nanos
798            .store(u64::MAX, Ordering::Release);
799        self.request_time_slowest_nanos.store(0, Ordering::Release);
800        self.request_time_count_total.store(0, Ordering::Release);
801    }
802
803    /// Clears all recorded parsing times.
804    pub fn clear_parsing_times(&self) {
805        self.parsing_time_total_nanos.store(0, Ordering::Release);
806        self.parsing_time_fastest_nanos
807            .store(u64::MAX, Ordering::Release);
808        self.parsing_time_slowest_nanos.store(0, Ordering::Release);
809        self.parsing_time_count_total.store(0, Ordering::Release);
810    }
811
812    /// Converts the snapshot into a JSON string.
813    pub fn to_json_string(&self) -> Result<String, SpiderError> {
814        Ok(serde_json::to_string(&self.snapshot())?)
815    }
816
817    /// Converts the snapshot into a pretty-printed JSON string.
818    pub fn to_json_string_pretty(&self) -> Result<String, SpiderError> {
819        Ok(serde_json::to_string_pretty(&self.snapshot())?)
820    }
821
822    /// Exports the current statistics to a Markdown formatted string.
823    pub fn to_markdown_string(&self) -> String {
824        let snapshot = self.internal_snapshot();
825
826        let status_codes_list: String = snapshot
827            .response_status_counts
828            .iter()
829            .map(|(code, count)| format!("- **{}**: {}", code, count))
830            .collect::<Vec<String>>()
831            .join("\n");
832        let status_codes_output = if status_codes_list.is_empty() {
833            "N/A".to_string()
834        } else {
835            status_codes_list
836        };
837
838        format!(
839            r#"# Crawl Statistics Report
840
841- **Duration**: {}
842- **Current Rate** (last 10s): {:.2} req/s, {:.2} resp/s, {:.2} item/s
843- **Overall Rate** (total): {:.2} req/s, {:.2} resp/s, {:.2} item/s
844- **Bytes Per Second**: {}
845- **Request Ratios**: success {:.2}%, failure {:.2}%
846- **Cache Hit Ratio**: {:.2}%
847
848## Requests
849| Metric     | Count |
850|------------|-------|
851| Enqueued   | {}     |
852| Sent       | {}     |
853| Pending    | {}     |
854| Succeeded  | {}     |
855| Failed     | {}     |
856| Retried    | {}     |
857| Retry Scheduled | {} |
858| Dropped    | {}     |
859| Retry Delay In Flight | {} ms |
860
861## Responses
862| Metric     | Count |
863|------------|-------|
864| Received   | {}     |
865| From Cache | {}     |
866| Downloaded | {}     |
867
868## Items
869| Metric     | Count |
870|------------|--------|
871| Scraped    | {}     |
872| Processed  | {}     |
873| Dropped    | {}     |
874
875## Request Times
876| Metric           | Value      |
877|------------------|------------|
878| Average Time     | {}         |
879| Fastest Request  | {}         |
880| Slowest Request  | {}         |
881| Total Recorded   | {}         |
882
883## Parsing Times
884| Metric           | Value      |
885|------------------|------------|
886| Average Time     | {}         |
887| Fastest Parse    | {}         |
888| Slowest Parse    | {}         |
889| Total Recorded   | {}         |
890
891## Status Codes
892{}
893"#,
894            snapshot.formatted_duration(),
895            snapshot.requests_per_second(),
896            snapshot.responses_per_second(),
897            snapshot.items_per_second(),
898            // Calculate cumulative speeds for comparison
899            {
900                let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
901                if total_seconds > 0.0 {
902                    snapshot.requests_sent as f64 / total_seconds
903                } else {
904                    0.0
905                }
906            },
907            {
908                let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
909                if total_seconds > 0.0 {
910                    snapshot.responses_received as f64 / total_seconds
911                } else {
912                    0.0
913                }
914            },
915            {
916                let total_seconds = snapshot.elapsed_duration.as_secs() as f64;
917                if total_seconds > 0.0 {
918                    snapshot.items_scraped as f64 / total_seconds
919                } else {
920                    0.0
921                }
922            },
923            snapshot.formatted_bytes_per_second(),
924            snapshot.success_ratio(),
925            snapshot.failure_ratio(),
926            snapshot.cache_hit_ratio(),
927            snapshot.requests_enqueued,
928            snapshot.requests_sent,
929            snapshot.pending_requests(),
930            snapshot.requests_succeeded,
931            snapshot.requests_failed,
932            snapshot.requests_retried,
933            snapshot.requests_scheduled_for_retry,
934            snapshot.requests_dropped,
935            snapshot.retry_delay_in_flight_ms,
936            snapshot.responses_received,
937            snapshot.responses_from_cache,
938            snapshot.formatted_bytes(),
939            snapshot.items_scraped,
940            snapshot.items_processed,
941            snapshot.items_dropped_by_pipeline,
942            snapshot.formatted_request_time(snapshot.average_request_time),
943            snapshot.formatted_request_time(snapshot.fastest_request_time),
944            snapshot.formatted_request_time(snapshot.slowest_request_time),
945            snapshot.request_time_count,
946            snapshot.formatted_request_time(snapshot.average_parsing_time),
947            snapshot.formatted_request_time(snapshot.fastest_parsing_time),
948            snapshot.formatted_request_time(snapshot.slowest_parsing_time),
949            snapshot.parsing_time_count,
950            status_codes_output
951        )
952    }
953
954    /// Exports current statistics to the text layout used for terminal output.
955    pub fn to_live_report_string(&self) -> String {
956        let snapshot = self.internal_snapshot();
957        format_plain_text_metrics(&snapshot)
958    }
959}
960
961fn truncate_preview(input: &str, max_chars: usize) -> String {
962    let mut chars = input.chars();
963    let truncated: String = chars.by_ref().take(max_chars).collect();
964    if chars.next().is_some() {
965        format!("{truncated}...")
966    } else {
967        truncated
968    }
969}
970
971fn build_item_preview(json: &serde_json::Value, fields: Option<&[String]>) -> Option<String> {
972    let fields = fields?;
973
974    if fields.len() == 1 {
975        let (_, path) = parse_preview_field(&fields[0]);
976        return get_value_by_path(json, path).map(format_preview_value);
977    }
978
979    let mut preview = serde_json::Map::new();
980
981    for field in fields {
982        let (label, path) = parse_preview_field(field);
983        if let Some(value) = get_value_by_path(json, path) {
984            preview.insert(label.to_string(), value.clone());
985        }
986    }
987
988    if preview.is_empty() {
989        None
990    } else {
991        serde_json::to_string(&serde_json::Value::Object(preview)).ok()
992    }
993}
994
995fn format_preview_value(value: &serde_json::Value) -> String {
996    match value {
997        serde_json::Value::Null => "null".to_string(),
998        serde_json::Value::Bool(boolean) => boolean.to_string(),
999        serde_json::Value::Number(number) => number.to_string(),
1000        serde_json::Value::String(text) => text.clone(),
1001        serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
1002            serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
1003        }
1004    }
1005}
1006
1007fn parse_preview_field(field: &str) -> (&str, &str) {
1008    match field.split_once('=') {
1009        Some((label, path)) if !label.is_empty() && !path.is_empty() => (label, path),
1010        _ => (field, field),
1011    }
1012}
1013
1014fn get_value_by_path<'a>(
1015    value: &'a serde_json::Value,
1016    path: &str,
1017) -> Option<&'a serde_json::Value> {
1018    let mut current = value;
1019    for segment in path.split('.') {
1020        current = current.get(segment)?;
1021    }
1022    Some(current)
1023}
1024
1025impl Default for StatCollector {
1026    fn default() -> Self {
1027        Self::new(None)
1028    }
1029}
1030
1031impl std::fmt::Display for StatCollector {
1032    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1033        write!(f, "\n{}\n", self.to_live_report_string())
1034    }
1035}
1036
1037fn average_duration(total_nanos: &AtomicU64, count: usize) -> Option<Duration> {
1038    if count == 0 {
1039        return None;
1040    }
1041
1042    Some(Duration::from_nanos(
1043        total_nanos.load(Ordering::Acquire) / count as u64,
1044    ))
1045}
1046
1047fn duration_from_extreme(extreme: &AtomicU64, count: usize, is_min: bool) -> Option<Duration> {
1048    if count == 0 {
1049        return None;
1050    }
1051
1052    let nanos = extreme.load(Ordering::Acquire);
1053    if is_min && nanos == u64::MAX {
1054        None
1055    } else {
1056        Some(Duration::from_nanos(nanos))
1057    }
1058}
1059
1060fn update_min(target: &AtomicU64, candidate: u64) {
1061    let mut current = target.load(Ordering::Acquire);
1062    while candidate < current {
1063        match target.compare_exchange_weak(current, candidate, Ordering::AcqRel, Ordering::Acquire)
1064        {
1065            Ok(_) => break,
1066            Err(actual) => current = actual,
1067        }
1068    }
1069}
1070
1071fn update_max(target: &AtomicU64, candidate: u64) {
1072    let mut current = target.load(Ordering::Acquire);
1073    while candidate > current {
1074        match target.compare_exchange_weak(current, candidate, Ordering::AcqRel, Ordering::Acquire)
1075        {
1076            Ok(_) => break,
1077            Err(actual) => current = actual,
1078        }
1079    }
1080}