1use parking_lot::RwLock;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7pub use crate::formatters::{
9 ByteFormatter, DefaultByteFormatter, DefaultDurationFormatter, DefaultRateCalculator,
10 DurationFormatter, RateCalculator,
11};
12
13#[derive(Debug)]
15pub struct ExpMovingAverage {
16 alpha: f64,
17 rate: Arc<RwLock<f64>>,
18 last_update: Arc<RwLock<Instant>>,
19 event_count: Arc<RwLock<usize>>,
20}
21
22impl ExpMovingAverage {
23 pub fn new(alpha: f64) -> Self {
27 ExpMovingAverage {
28 alpha,
29 rate: Arc::new(RwLock::new(0.0)),
30 last_update: Arc::new(RwLock::new(Instant::now())),
31 event_count: Arc::new(RwLock::new(0)),
32 }
33 }
34
35 pub fn update(&self, count: usize) {
37 let now = Instant::now();
38 let mut last_update = self.last_update.write();
39 let mut event_count = self.event_count.write();
40
41 *event_count += count;
42 let time_delta = now.duration_since(*last_update).as_secs_f64();
43
44 if time_delta >= 1.0 {
45 let current_rate = *event_count as f64 / time_delta;
46 let mut rate = self.rate.write();
47 *rate = self.alpha * current_rate + (1.0 - self.alpha) * (*rate);
48
49 *event_count = 0;
50 *last_update = now;
51 }
52 }
53
54 pub fn get_rate(&self) -> f64 {
56 *self.rate.read()
57 }
58}
59
60#[derive(Debug, Clone, serde::Serialize)]
62pub struct MetricsSnapshot {
63 pub requests_enqueued: usize,
64 pub requests_sent: usize,
65 pub requests_succeeded: usize,
66 pub requests_failed: usize,
67 pub requests_retried: usize,
68 pub requests_scheduled_for_retry: usize,
69 pub requests_dropped: usize,
70 pub retry_delay_in_flight_ms: u64,
71 pub responses_received: usize,
72 pub responses_from_cache: usize,
73 pub total_bytes_downloaded: usize,
74 pub items_scraped: usize,
75 pub items_processed: usize,
76 pub items_dropped_by_pipeline: usize,
77 pub response_status_counts: std::collections::HashMap<u16, usize>,
78 pub elapsed_duration: Duration,
79 pub average_request_time: Option<Duration>,
80 pub fastest_request_time: Option<Duration>,
81 pub slowest_request_time: Option<Duration>,
82 pub request_time_count: usize,
83 pub average_parsing_time: Option<Duration>,
84 pub fastest_parsing_time: Option<Duration>,
85 pub slowest_parsing_time: Option<Duration>,
86 pub parsing_time_count: usize,
87 pub recent_requests_per_second: f64,
88 pub recent_responses_per_second: f64,
89 pub recent_items_per_second: f64,
90 pub current_item_preview: String,
91}
92
93impl MetricsSnapshot {
94 pub fn formatted_duration(&self) -> String {
96 DefaultDurationFormatter.formatted_duration(self.elapsed_duration)
97 }
98
99 pub fn formatted_request_time(&self, duration: Option<Duration>) -> String {
101 DefaultDurationFormatter.formatted_request_time(duration)
102 }
103
104 pub fn requests_per_second(&self) -> f64 {
106 DefaultRateCalculator.calculate_rate(self.requests_sent, self.elapsed_duration)
107 }
108
109 pub fn responses_per_second(&self) -> f64 {
111 DefaultRateCalculator.calculate_rate(self.responses_received, self.elapsed_duration)
112 }
113
114 pub fn items_per_second(&self) -> f64 {
116 DefaultRateCalculator.calculate_rate(self.items_scraped, self.elapsed_duration)
117 }
118
119 pub fn bytes_per_second(&self) -> f64 {
121 DefaultRateCalculator.calculate_rate(self.total_bytes_downloaded, self.elapsed_duration)
122 }
123
124 pub fn formatted_bytes(&self) -> String {
126 DefaultByteFormatter.formatted_bytes(self.total_bytes_downloaded)
127 }
128
129 pub fn formatted_bytes_per_second(&self) -> String {
131 format!(
132 "{}/s",
133 DefaultByteFormatter.formatted_bytes(self.bytes_per_second() as usize)
134 )
135 }
136}
137
138pub trait SnapshotProvider {
140 type Snapshot: Clone;
142
143 fn create_snapshot(&self) -> Self::Snapshot;
145}
146
147pub trait MetricsExporter<T> {
149 fn to_json_string(&self) -> Result<String, crate::error::SpiderError>;
155
156 fn to_json_string_pretty(&self) -> Result<String, crate::error::SpiderError>;
162
163 fn to_markdown_string(&self) -> String;
165
166 fn to_display_string(&self) -> String;
168}
169
170pub struct MetricsDisplayFormatter;
172
173impl MetricsDisplayFormatter {
174 pub fn format_metrics<T: MetricsSnapshotProvider>(&self, snapshot: &T) -> String {
176 format!("\n{}\n", format_plain_text_metrics(snapshot))
177 }
178}
179
180pub fn format_plain_text_metrics<T: MetricsSnapshotProvider>(snapshot: &T) -> String {
182 let overall_req_per_sec = calculate_rate(
183 snapshot.get_requests_sent(),
184 snapshot.get_elapsed_duration(),
185 );
186 let overall_resp_per_sec = calculate_rate(
187 snapshot.get_responses_received(),
188 snapshot.get_elapsed_duration(),
189 );
190 let overall_item_per_sec = calculate_rate(
191 snapshot.get_items_scraped(),
192 snapshot.get_elapsed_duration(),
193 );
194 let pending_requests = snapshot.get_requests_enqueued().saturating_sub(
195 snapshot.get_requests_succeeded()
196 + snapshot.get_requests_failed()
197 + snapshot.get_requests_dropped(),
198 );
199 let success_ratio = format_ratio(
200 snapshot.get_requests_succeeded(),
201 snapshot.get_requests_sent(),
202 );
203 let failure_ratio = format_ratio(snapshot.get_requests_failed(), snapshot.get_requests_sent());
204 let cache_hit_ratio = format_ratio(
205 snapshot.get_responses_from_cache(),
206 snapshot.get_responses_received(),
207 );
208 let bytes_per_second = format_byte_rate(
209 snapshot.get_total_bytes_downloaded(),
210 snapshot.get_elapsed_duration(),
211 );
212
213 format!(
214 "Crawl Statistics\n\
215 ----------------\n\
216 duration : {}\n\
217 speed : req/s {:.2}, resp/s {:.2}, item/s {:.2}\n\
218 requests : enqueued {}, sent {}, pending {}, ok {}, fail {}\n\
219 retry : retry {}, scheduled {}, drop {}\n\
220 ratios : success {}, failure {}, cache hit {}\n\
221 response : received {}, cache {}, downloaded {}, bytes/s {}\n\
222 delay : retry in flight {} ms\n\
223 items : scraped {}, processed {}, dropped {}\n\
224 current : {}\n\
225 req time : avg {}, fastest {}, slowest {}, total {}\n\
226 parsing : avg {}, fastest {}, slowest {}, total {}\n\
227 status : {}",
228 snapshot.formatted_duration(),
229 overall_req_per_sec,
230 overall_resp_per_sec,
231 overall_item_per_sec,
232 snapshot.get_requests_enqueued(),
233 snapshot.get_requests_sent(),
234 pending_requests,
235 snapshot.get_requests_succeeded(),
236 snapshot.get_requests_failed(),
237 snapshot.get_requests_retried(),
238 snapshot.get_requests_scheduled_for_retry(),
239 snapshot.get_requests_dropped(),
240 success_ratio,
241 failure_ratio,
242 cache_hit_ratio,
243 snapshot.get_responses_received(),
244 snapshot.get_responses_from_cache(),
245 snapshot.formatted_bytes(),
246 bytes_per_second,
247 snapshot.get_retry_delay_in_flight_ms(),
248 snapshot.get_items_scraped(),
249 snapshot.get_items_processed(),
250 snapshot.get_items_dropped_by_pipeline(),
251 snapshot.get_current_item_preview(),
252 snapshot.formatted_request_time(snapshot.get_average_request_time()),
253 snapshot.formatted_request_time(snapshot.get_fastest_request_time()),
254 snapshot.formatted_request_time(snapshot.get_slowest_request_time()),
255 snapshot.get_request_time_count(),
256 snapshot.formatted_request_time(snapshot.get_average_parsing_time()),
257 snapshot.formatted_request_time(snapshot.get_fastest_parsing_time()),
258 snapshot.formatted_request_time(snapshot.get_slowest_parsing_time()),
259 snapshot.get_parsing_time_count(),
260 format_status_counts(snapshot.get_response_status_counts())
261 )
262}
263
264fn format_status_counts(status_counts: &std::collections::HashMap<u16, usize>) -> String {
265 if status_counts.is_empty() {
266 return "none".to_string();
267 }
268
269 let mut status_entries = status_counts
270 .iter()
271 .map(|(code, count)| (*code, *count))
272 .collect::<Vec<_>>();
273 status_entries.sort_unstable_by_key(|(code, _)| *code);
274
275 status_entries
276 .into_iter()
277 .map(|(code, count)| format!("{code}: {count}"))
278 .collect::<Vec<_>>()
279 .join(", ")
280}
281
282fn calculate_rate(count: usize, elapsed_duration: Duration) -> f64 {
283 DefaultRateCalculator.calculate_rate(count, elapsed_duration)
284}
285
286fn format_ratio(numerator: usize, denominator: usize) -> String {
287 if denominator == 0 {
288 return "0.00%".to_string();
289 }
290
291 format!("{:.2}%", (numerator as f64 / denominator as f64) * 100.0)
292}
293
294fn format_byte_rate(total_bytes: usize, elapsed_duration: Duration) -> String {
295 let bytes_per_second = calculate_rate(total_bytes, elapsed_duration);
296 format!(
297 "{}/s",
298 DefaultByteFormatter.formatted_bytes(bytes_per_second as usize)
299 )
300}
301
302pub trait MetricsSnapshotProvider {
304 fn get_requests_enqueued(&self) -> usize;
305 fn get_requests_sent(&self) -> usize;
306 fn get_requests_succeeded(&self) -> usize;
307 fn get_requests_failed(&self) -> usize;
308 fn get_requests_retried(&self) -> usize;
309 fn get_requests_scheduled_for_retry(&self) -> usize;
310 fn get_requests_dropped(&self) -> usize;
311 fn get_retry_delay_in_flight_ms(&self) -> u64;
312 fn get_responses_received(&self) -> usize;
313 fn get_responses_from_cache(&self) -> usize;
314 fn get_total_bytes_downloaded(&self) -> usize;
315 fn get_items_scraped(&self) -> usize;
316 fn get_items_processed(&self) -> usize;
317 fn get_items_dropped_by_pipeline(&self) -> usize;
318 fn get_response_status_counts(&self) -> &std::collections::HashMap<u16, usize>;
319 fn get_elapsed_duration(&self) -> Duration;
320 fn get_average_request_time(&self) -> Option<Duration>;
321 fn get_fastest_request_time(&self) -> Option<Duration>;
322 fn get_slowest_request_time(&self) -> Option<Duration>;
323 fn get_request_time_count(&self) -> usize;
324 fn get_average_parsing_time(&self) -> Option<Duration>;
325 fn get_fastest_parsing_time(&self) -> Option<Duration>;
326 fn get_slowest_parsing_time(&self) -> Option<Duration>;
327 fn get_parsing_time_count(&self) -> usize;
328 fn get_recent_requests_per_second(&self) -> f64;
329 fn get_recent_responses_per_second(&self) -> f64;
330 fn get_recent_items_per_second(&self) -> f64;
331 fn get_current_item_preview(&self) -> &str;
332 fn formatted_duration(&self) -> String;
333 fn formatted_request_time(&self, duration: Option<Duration>) -> String;
334 fn formatted_bytes(&self) -> String;
335}
336
337impl MetricsSnapshotProvider for MetricsSnapshot {
338 fn get_requests_enqueued(&self) -> usize {
339 self.requests_enqueued
340 }
341
342 fn get_requests_sent(&self) -> usize {
343 self.requests_sent
344 }
345
346 fn get_requests_succeeded(&self) -> usize {
347 self.requests_succeeded
348 }
349
350 fn get_requests_failed(&self) -> usize {
351 self.requests_failed
352 }
353
354 fn get_requests_retried(&self) -> usize {
355 self.requests_retried
356 }
357
358 fn get_requests_scheduled_for_retry(&self) -> usize {
359 self.requests_scheduled_for_retry
360 }
361
362 fn get_requests_dropped(&self) -> usize {
363 self.requests_dropped
364 }
365
366 fn get_retry_delay_in_flight_ms(&self) -> u64 {
367 self.retry_delay_in_flight_ms
368 }
369
370 fn get_responses_received(&self) -> usize {
371 self.responses_received
372 }
373
374 fn get_responses_from_cache(&self) -> usize {
375 self.responses_from_cache
376 }
377
378 fn get_total_bytes_downloaded(&self) -> usize {
379 self.total_bytes_downloaded
380 }
381
382 fn get_items_scraped(&self) -> usize {
383 self.items_scraped
384 }
385
386 fn get_items_processed(&self) -> usize {
387 self.items_processed
388 }
389
390 fn get_items_dropped_by_pipeline(&self) -> usize {
391 self.items_dropped_by_pipeline
392 }
393
394 fn get_response_status_counts(&self) -> &std::collections::HashMap<u16, usize> {
395 &self.response_status_counts
396 }
397
398 fn get_elapsed_duration(&self) -> Duration {
399 self.elapsed_duration
400 }
401
402 fn get_average_request_time(&self) -> Option<Duration> {
403 self.average_request_time
404 }
405
406 fn get_fastest_request_time(&self) -> Option<Duration> {
407 self.fastest_request_time
408 }
409
410 fn get_slowest_request_time(&self) -> Option<Duration> {
411 self.slowest_request_time
412 }
413
414 fn get_request_time_count(&self) -> usize {
415 self.request_time_count
416 }
417
418 fn get_average_parsing_time(&self) -> Option<Duration> {
419 self.average_parsing_time
420 }
421
422 fn get_fastest_parsing_time(&self) -> Option<Duration> {
423 self.fastest_parsing_time
424 }
425
426 fn get_slowest_parsing_time(&self) -> Option<Duration> {
427 self.slowest_parsing_time
428 }
429
430 fn get_parsing_time_count(&self) -> usize {
431 self.parsing_time_count
432 }
433
434 fn get_recent_requests_per_second(&self) -> f64 {
435 self.recent_requests_per_second
436 }
437
438 fn get_recent_responses_per_second(&self) -> f64 {
439 self.recent_responses_per_second
440 }
441
442 fn get_recent_items_per_second(&self) -> f64 {
443 self.recent_items_per_second
444 }
445
446 fn get_current_item_preview(&self) -> &str {
447 &self.current_item_preview
448 }
449
450 fn formatted_duration(&self) -> String {
451 self.formatted_duration()
452 }
453
454 fn formatted_request_time(&self, duration: Option<Duration>) -> String {
455 self.formatted_request_time(duration)
456 }
457
458 fn formatted_bytes(&self) -> String {
459 self.formatted_bytes()
460 }
461}