1use parking_lot::RwLock;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9#[derive(Debug)]
11pub struct ExpMovingAverage {
12 alpha: f64,
13 rate: Arc<RwLock<f64>>,
14 last_update: Arc<RwLock<Instant>>,
15 event_count: Arc<RwLock<usize>>,
16}
17
18impl ExpMovingAverage {
19 pub fn new(alpha: f64) -> Self {
20 ExpMovingAverage {
21 alpha,
22 rate: Arc::new(RwLock::new(0.0)),
23 last_update: Arc::new(RwLock::new(Instant::now())),
24 event_count: Arc::new(RwLock::new(0)),
25 }
26 }
27
28 pub fn update(&self, count: usize) {
29 let now = Instant::now();
30 let mut last_update = self.last_update.write();
31 let mut event_count = self.event_count.write();
32
33 *event_count += count;
34 let time_delta = now.duration_since(*last_update).as_secs_f64();
35
36 if time_delta >= 1.0 {
38 let current_rate = *event_count as f64 / time_delta;
39 let mut rate = self.rate.write();
40 *rate = self.alpha * current_rate + (1.0 - self.alpha) * (*rate);
42
43 *event_count = 0;
45 *last_update = now;
46 }
47 }
48
49 pub fn get_rate(&self) -> f64 {
50 *self.rate.read()
51 }
52}
53
54pub trait DurationFormatter {
56 fn formatted_duration(&self, duration: Duration) -> String;
57 fn formatted_request_time(&self, duration: Option<Duration>) -> String;
58}
59
60pub struct DefaultDurationFormatter;
62
63impl DurationFormatter for DefaultDurationFormatter {
64 fn formatted_duration(&self, duration: Duration) -> String {
65 format!("{:?}", duration)
66 }
67
68 fn formatted_request_time(&self, duration: Option<Duration>) -> String {
69 match duration {
70 Some(d) => {
71 if d.as_millis() < 1000 {
72 format!("{} ms", d.as_millis())
73 } else {
74 format!("{:.2} s", d.as_secs_f64())
75 }
76 }
77 None => "N/A".to_string(),
78 }
79 }
80}
81
82pub trait ByteFormatter {
84 fn formatted_bytes(&self, bytes: usize) -> String;
85}
86
87pub struct DefaultByteFormatter;
89
90impl ByteFormatter for DefaultByteFormatter {
91 fn formatted_bytes(&self, bytes: usize) -> String {
92 const KB: usize = 1024;
93 const MB: usize = 1024 * KB;
94 const GB: usize = 1024 * MB;
95
96 if bytes >= GB {
97 format!("{:.2} GB", bytes as f64 / GB as f64)
98 } else if bytes >= MB {
99 format!("{:.2} MB", bytes as f64 / MB as f64)
100 } else if bytes >= KB {
101 format!("{:.2} KB", bytes as f64 / KB as f64)
102 } else {
103 format!("{} B", bytes)
104 }
105 }
106}
107
108pub trait RateCalculator {
110 fn calculate_rate(&self, count: usize, elapsed: Duration) -> f64;
111}
112
113pub struct DefaultRateCalculator;
115
116impl RateCalculator for DefaultRateCalculator {
117 fn calculate_rate(&self, count: usize, elapsed: Duration) -> f64 {
118 let elapsed = elapsed.as_secs_f64();
119 if elapsed > 0.0 {
120 count as f64 / elapsed
121 } else {
122 0.0
123 }
124 }
125}
126
127#[derive(Debug, Clone, serde::Serialize)]
129pub struct MetricsSnapshot {
130 pub requests_enqueued: usize,
131 pub requests_sent: usize,
132 pub requests_succeeded: usize,
133 pub requests_failed: usize,
134 pub requests_retried: usize,
135 pub requests_dropped: usize,
136 pub responses_received: usize,
137 pub responses_from_cache: usize,
138 pub total_bytes_downloaded: usize,
139 pub items_scraped: usize,
140 pub items_processed: usize,
141 pub items_dropped_by_pipeline: usize,
142 pub response_status_counts: std::collections::HashMap<u16, usize>,
143 pub elapsed_duration: Duration,
144 pub average_request_time: Option<Duration>,
145 pub fastest_request_time: Option<Duration>,
146 pub slowest_request_time: Option<Duration>,
147 pub request_time_count: usize,
148 pub average_parsing_time: Option<Duration>,
149 pub fastest_parsing_time: Option<Duration>,
150 pub slowest_parsing_time: Option<Duration>,
151 pub parsing_time_count: usize,
152 pub recent_requests_per_second: f64,
153 pub recent_responses_per_second: f64,
154 pub recent_items_per_second: f64,
155}
156
157impl MetricsSnapshot {
158 pub fn formatted_duration(&self) -> String {
159 DefaultDurationFormatter.formatted_duration(self.elapsed_duration)
160 }
161
162 pub fn formatted_request_time(&self, duration: Option<Duration>) -> String {
163 DefaultDurationFormatter.formatted_request_time(duration)
164 }
165
166 pub fn requests_per_second(&self) -> f64 {
167 DefaultRateCalculator.calculate_rate(self.requests_sent, self.elapsed_duration)
168 }
169
170 pub fn responses_per_second(&self) -> f64 {
171 DefaultRateCalculator.calculate_rate(self.responses_received, self.elapsed_duration)
172 }
173
174 pub fn items_per_second(&self) -> f64 {
175 DefaultRateCalculator.calculate_rate(self.items_scraped, self.elapsed_duration)
176 }
177
178 pub fn formatted_bytes(&self) -> String {
179 DefaultByteFormatter.formatted_bytes(self.total_bytes_downloaded)
180 }
181}
182
183pub trait SnapshotProvider {
185 type Snapshot: Clone;
186 fn create_snapshot(&self) -> Self::Snapshot;
187}
188
189pub trait MetricsExporter<T> {
191 fn to_json_string(&self) -> Result<String, crate::error::SpiderError>;
192 fn to_json_string_pretty(&self) -> Result<String, crate::error::SpiderError>;
193 fn to_markdown_string(&self) -> String;
194 fn to_display_string(&self) -> String;
195}
196
197pub struct MetricsDisplayFormatter;
199
200impl MetricsDisplayFormatter {
201 pub fn format_metrics<T: MetricsSnapshotProvider>(&self, snapshot: &T) -> String {
202 format!(
203 "\nCrawl Statistics\n----------------\n duration : {}\n speed : req/s: {:.2}, resp/s: {:.2}, item/s: {:.2}\n requests : enqueued: {}, sent: {}, ok: {}, fail: {}, retry: {}, drop: {}\n response : received: {}, from_cache: {}, downloaded: {}\n items : scraped: {}, processed: {}, dropped: {}\n req time : avg: {}, fastest: {}, slowest: {}, total: {}\n parsing : avg: {}, fastest: {}, slowest: {}, total: {}\n status : {}\n",
204 snapshot.formatted_duration(),
205 snapshot.get_recent_requests_per_second(),
206 snapshot.get_recent_responses_per_second(),
207 snapshot.get_recent_items_per_second(),
208 snapshot.get_requests_enqueued(),
209 snapshot.get_requests_sent(),
210 snapshot.get_requests_succeeded(),
211 snapshot.get_requests_failed(),
212 snapshot.get_requests_retried(),
213 snapshot.get_requests_dropped(),
214 snapshot.get_responses_received(),
215 snapshot.get_responses_from_cache(),
216 snapshot.formatted_bytes(),
217 snapshot.get_items_scraped(),
218 snapshot.get_items_processed(),
219 snapshot.get_items_dropped_by_pipeline(),
220 snapshot.formatted_request_time(snapshot.get_average_request_time()),
221 snapshot.formatted_request_time(snapshot.get_fastest_request_time()),
222 snapshot.formatted_request_time(snapshot.get_slowest_request_time()),
223 snapshot.get_request_time_count(),
224 snapshot.formatted_request_time(snapshot.get_average_parsing_time()),
225 snapshot.formatted_request_time(snapshot.get_fastest_parsing_time()),
226 snapshot.formatted_request_time(snapshot.get_slowest_parsing_time()),
227 snapshot.get_parsing_time_count(),
228 if snapshot.get_response_status_counts().is_empty() {
229 "none".to_string()
230 } else {
231 snapshot
232 .get_response_status_counts()
233 .iter()
234 .map(|(code, count)| format!("{}: {}", code, count))
235 .collect::<Vec<String>>()
236 .join(", ")
237 }
238 )
239 }
240}
241
242pub trait MetricsSnapshotProvider {
244 fn get_requests_enqueued(&self) -> usize;
245 fn get_requests_sent(&self) -> usize;
246 fn get_requests_succeeded(&self) -> usize;
247 fn get_requests_failed(&self) -> usize;
248 fn get_requests_retried(&self) -> usize;
249 fn get_requests_dropped(&self) -> usize;
250 fn get_responses_received(&self) -> usize;
251 fn get_responses_from_cache(&self) -> usize;
252 fn get_total_bytes_downloaded(&self) -> usize;
253 fn get_items_scraped(&self) -> usize;
254 fn get_items_processed(&self) -> usize;
255 fn get_items_dropped_by_pipeline(&self) -> usize;
256 fn get_response_status_counts(&self) -> &std::collections::HashMap<u16, usize>;
257 fn get_elapsed_duration(&self) -> Duration;
258 fn get_average_request_time(&self) -> Option<Duration>;
259 fn get_fastest_request_time(&self) -> Option<Duration>;
260 fn get_slowest_request_time(&self) -> Option<Duration>;
261 fn get_request_time_count(&self) -> usize;
262 fn get_average_parsing_time(&self) -> Option<Duration>;
263 fn get_fastest_parsing_time(&self) -> Option<Duration>;
264 fn get_slowest_parsing_time(&self) -> Option<Duration>;
265 fn get_parsing_time_count(&self) -> usize;
266 fn get_recent_requests_per_second(&self) -> f64;
267 fn get_recent_responses_per_second(&self) -> f64;
268 fn get_recent_items_per_second(&self) -> f64;
269 fn formatted_duration(&self) -> String;
270 fn formatted_request_time(&self, duration: Option<Duration>) -> String;
271 fn formatted_bytes(&self) -> String;
272}
273