1use parking_lot::RwLock;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9pub use crate::formatters::{
11 DurationFormatter, ByteFormatter, RateCalculator,
12 DefaultDurationFormatter, DefaultByteFormatter, DefaultRateCalculator,
13};
14
15#[derive(Debug)]
17pub struct ExpMovingAverage {
18 alpha: f64,
19 rate: Arc<RwLock<f64>>,
20 last_update: Arc<RwLock<Instant>>,
21 event_count: Arc<RwLock<usize>>,
22}
23
24impl ExpMovingAverage {
25 pub fn new(alpha: f64) -> Self {
26 ExpMovingAverage {
27 alpha,
28 rate: Arc::new(RwLock::new(0.0)),
29 last_update: Arc::new(RwLock::new(Instant::now())),
30 event_count: Arc::new(RwLock::new(0)),
31 }
32 }
33
34 pub fn update(&self, count: usize) {
35 let now = Instant::now();
36 let mut last_update = self.last_update.write();
37 let mut event_count = self.event_count.write();
38
39 *event_count += count;
40 let time_delta = now.duration_since(*last_update).as_secs_f64();
41
42 if time_delta >= 1.0 {
44 let current_rate = *event_count as f64 / time_delta;
45 let mut rate = self.rate.write();
46 *rate = self.alpha * current_rate + (1.0 - self.alpha) * (*rate);
48
49 *event_count = 0;
51 *last_update = now;
52 }
53 }
54
55 pub fn get_rate(&self) -> f64 {
56 *self.rate.read()
57 }
58}
59
60#[derive(Debug, Clone, serde::Serialize)]
62pub struct MetricsSnapshot {
63 pub requests_enqueued: usize,
64 pub requests_sent: usize,
65 pub requests_succeeded: usize,
66 pub requests_failed: usize,
67 pub requests_retried: usize,
68 pub requests_dropped: usize,
69 pub responses_received: usize,
70 pub responses_from_cache: usize,
71 pub total_bytes_downloaded: usize,
72 pub items_scraped: usize,
73 pub items_processed: usize,
74 pub items_dropped_by_pipeline: usize,
75 pub response_status_counts: std::collections::HashMap<u16, usize>,
76 pub elapsed_duration: Duration,
77 pub average_request_time: Option<Duration>,
78 pub fastest_request_time: Option<Duration>,
79 pub slowest_request_time: Option<Duration>,
80 pub request_time_count: usize,
81 pub average_parsing_time: Option<Duration>,
82 pub fastest_parsing_time: Option<Duration>,
83 pub slowest_parsing_time: Option<Duration>,
84 pub parsing_time_count: usize,
85 pub recent_requests_per_second: f64,
86 pub recent_responses_per_second: f64,
87 pub recent_items_per_second: f64,
88}
89
90impl MetricsSnapshot {
91 pub fn formatted_duration(&self) -> String {
92 DefaultDurationFormatter.formatted_duration(self.elapsed_duration)
93 }
94
95 pub fn formatted_request_time(&self, duration: Option<Duration>) -> String {
96 DefaultDurationFormatter.formatted_request_time(duration)
97 }
98
99 pub fn requests_per_second(&self) -> f64 {
100 DefaultRateCalculator.calculate_rate(self.requests_sent, self.elapsed_duration)
101 }
102
103 pub fn responses_per_second(&self) -> f64 {
104 DefaultRateCalculator.calculate_rate(self.responses_received, self.elapsed_duration)
105 }
106
107 pub fn items_per_second(&self) -> f64 {
108 DefaultRateCalculator.calculate_rate(self.items_scraped, self.elapsed_duration)
109 }
110
111 pub fn formatted_bytes(&self) -> String {
112 DefaultByteFormatter.formatted_bytes(self.total_bytes_downloaded)
113 }
114}
115
116pub trait SnapshotProvider {
118 type Snapshot: Clone;
119 fn create_snapshot(&self) -> Self::Snapshot;
120}
121
122pub trait MetricsExporter<T> {
124 fn to_json_string(&self) -> Result<String, crate::error::SpiderError>;
125 fn to_json_string_pretty(&self) -> Result<String, crate::error::SpiderError>;
126 fn to_markdown_string(&self) -> String;
127 fn to_display_string(&self) -> String;
128}
129
130pub struct MetricsDisplayFormatter;
132
133impl MetricsDisplayFormatter {
134 pub fn format_metrics<T: MetricsSnapshotProvider>(&self, snapshot: &T) -> String {
135 format!(
136 "\nCrawl Statistics\n----------------\nduration : {}\n speed : req/s: {:.2}, resp/s: {:.2}, item/s: {:.2}\nrequests : enqueued: {}, sent: {}, ok: {}, fail: {}, retry: {}, drop: {}\nresponse : received: {}, from_cache: {}, downloaded: {}\nitems : scraped: {}, processed: {}, dropped: {}\nreq time : avg: {}, fastest: {}, slowest: {}, total: {}\nparsing : avg: {}, fastest: {}, slowest: {}, total: {}\nstatus : {}\n",
137 snapshot.formatted_duration(),
138 snapshot.get_recent_requests_per_second(),
139 snapshot.get_recent_responses_per_second(),
140 snapshot.get_recent_items_per_second(),
141 snapshot.get_requests_enqueued(),
142 snapshot.get_requests_sent(),
143 snapshot.get_requests_succeeded(),
144 snapshot.get_requests_failed(),
145 snapshot.get_requests_retried(),
146 snapshot.get_requests_dropped(),
147 snapshot.get_responses_received(),
148 snapshot.get_responses_from_cache(),
149 snapshot.formatted_bytes(),
150 snapshot.get_items_scraped(),
151 snapshot.get_items_processed(),
152 snapshot.get_items_dropped_by_pipeline(),
153 snapshot.formatted_request_time(snapshot.get_average_request_time()),
154 snapshot.formatted_request_time(snapshot.get_fastest_request_time()),
155 snapshot.formatted_request_time(snapshot.get_slowest_request_time()),
156 snapshot.get_request_time_count(),
157 snapshot.formatted_request_time(snapshot.get_average_parsing_time()),
158 snapshot.formatted_request_time(snapshot.get_fastest_parsing_time()),
159 snapshot.formatted_request_time(snapshot.get_slowest_parsing_time()),
160 snapshot.get_parsing_time_count(),
161 if snapshot.get_response_status_counts().is_empty() {
162 "none".to_string()
163 } else {
164 snapshot
165 .get_response_status_counts()
166 .iter()
167 .map(|(code, count)| format!("{}: {}", code, count))
168 .collect::<Vec<String>>()
169 .join(", ")
170 }
171 )
172 }
173}
174
175pub trait MetricsSnapshotProvider {
177 fn get_requests_enqueued(&self) -> usize;
178 fn get_requests_sent(&self) -> usize;
179 fn get_requests_succeeded(&self) -> usize;
180 fn get_requests_failed(&self) -> usize;
181 fn get_requests_retried(&self) -> usize;
182 fn get_requests_dropped(&self) -> usize;
183 fn get_responses_received(&self) -> usize;
184 fn get_responses_from_cache(&self) -> usize;
185 fn get_total_bytes_downloaded(&self) -> usize;
186 fn get_items_scraped(&self) -> usize;
187 fn get_items_processed(&self) -> usize;
188 fn get_items_dropped_by_pipeline(&self) -> usize;
189 fn get_response_status_counts(&self) -> &std::collections::HashMap<u16, usize>;
190 fn get_elapsed_duration(&self) -> Duration;
191 fn get_average_request_time(&self) -> Option<Duration>;
192 fn get_fastest_request_time(&self) -> Option<Duration>;
193 fn get_slowest_request_time(&self) -> Option<Duration>;
194 fn get_request_time_count(&self) -> usize;
195 fn get_average_parsing_time(&self) -> Option<Duration>;
196 fn get_fastest_parsing_time(&self) -> Option<Duration>;
197 fn get_slowest_parsing_time(&self) -> Option<Duration>;
198 fn get_parsing_time_count(&self) -> usize;
199 fn get_recent_requests_per_second(&self) -> f64;
200 fn get_recent_responses_per_second(&self) -> f64;
201 fn get_recent_items_per_second(&self) -> f64;
202 fn formatted_duration(&self) -> String;
203 fn formatted_request_time(&self, duration: Option<Duration>) -> String;
204 fn formatted_bytes(&self) -> String;
205}