1use parking_lot::RwLock;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7pub use crate::formatters::{
9 ByteFormatter, DefaultByteFormatter, DefaultDurationFormatter, DefaultRateCalculator,
10 DurationFormatter, RateCalculator,
11};
12
13#[derive(Debug)]
15pub struct ExpMovingAverage {
16 alpha: f64,
17 rate: Arc<RwLock<f64>>,
18 last_update: Arc<RwLock<Instant>>,
19 event_count: Arc<RwLock<usize>>,
20}
21
22impl ExpMovingAverage {
23 pub fn new(alpha: f64) -> Self {
27 ExpMovingAverage {
28 alpha,
29 rate: Arc::new(RwLock::new(0.0)),
30 last_update: Arc::new(RwLock::new(Instant::now())),
31 event_count: Arc::new(RwLock::new(0)),
32 }
33 }
34
35 pub fn update(&self, count: usize) {
37 let now = Instant::now();
38 let mut last_update = self.last_update.write();
39 let mut event_count = self.event_count.write();
40
41 *event_count += count;
42 let time_delta = now.duration_since(*last_update).as_secs_f64();
43
44 if time_delta >= 1.0 {
45 let current_rate = *event_count as f64 / time_delta;
46 let mut rate = self.rate.write();
47 *rate = self.alpha * current_rate + (1.0 - self.alpha) * (*rate);
48
49 *event_count = 0;
50 *last_update = now;
51 }
52 }
53
54 pub fn get_rate(&self) -> f64 {
56 *self.rate.read()
57 }
58}
59
60#[derive(Debug, Clone, serde::Serialize)]
62pub struct MetricsSnapshot {
63 pub requests_enqueued: usize,
64 pub requests_sent: usize,
65 pub requests_succeeded: usize,
66 pub requests_failed: usize,
67 pub requests_retried: usize,
68 pub requests_scheduled_for_retry: usize,
69 pub requests_dropped: usize,
70 pub retry_delay_in_flight_ms: u64,
71 pub responses_received: usize,
72 pub responses_from_cache: usize,
73 pub total_bytes_downloaded: usize,
74 pub items_scraped: usize,
75 pub items_processed: usize,
76 pub items_dropped_by_pipeline: usize,
77 pub queue_depth: usize,
78 pub parser_backlog: usize,
79 pub pipeline_backlog: usize,
80 pub retry_backlog: usize,
81 pub response_status_counts: std::collections::HashMap<u16, usize>,
82 pub elapsed_duration: Duration,
83 pub average_request_time: Option<Duration>,
84 pub fastest_request_time: Option<Duration>,
85 pub slowest_request_time: Option<Duration>,
86 pub request_time_count: usize,
87 pub average_parsing_time: Option<Duration>,
88 pub fastest_parsing_time: Option<Duration>,
89 pub slowest_parsing_time: Option<Duration>,
90 pub parsing_time_count: usize,
91 pub recent_requests_per_second: f64,
92 pub recent_responses_per_second: f64,
93 pub recent_items_per_second: f64,
94 pub current_item_preview: String,
95}
96
97impl MetricsSnapshot {
98 pub fn formatted_duration(&self) -> String {
100 DefaultDurationFormatter.formatted_duration(self.elapsed_duration)
101 }
102
103 pub fn formatted_request_time(&self, duration: Option<Duration>) -> String {
105 DefaultDurationFormatter.formatted_request_time(duration)
106 }
107
108 pub fn requests_per_second(&self) -> f64 {
110 DefaultRateCalculator.calculate_rate(self.requests_sent, self.elapsed_duration)
111 }
112
113 pub fn responses_per_second(&self) -> f64 {
115 DefaultRateCalculator.calculate_rate(self.responses_received, self.elapsed_duration)
116 }
117
118 pub fn items_per_second(&self) -> f64 {
120 DefaultRateCalculator.calculate_rate(self.items_scraped, self.elapsed_duration)
121 }
122
123 pub fn bytes_per_second(&self) -> f64 {
125 DefaultRateCalculator.calculate_rate(self.total_bytes_downloaded, self.elapsed_duration)
126 }
127
128 pub fn formatted_bytes(&self) -> String {
130 DefaultByteFormatter.formatted_bytes(self.total_bytes_downloaded)
131 }
132
133 pub fn formatted_bytes_per_second(&self) -> String {
135 format!(
136 "{}/s",
137 DefaultByteFormatter.formatted_bytes(self.bytes_per_second() as usize)
138 )
139 }
140}
141
142pub trait SnapshotProvider {
144 type Snapshot: Clone;
146
147 fn create_snapshot(&self) -> Self::Snapshot;
149}
150
151pub trait MetricsExporter<T> {
153 fn to_json_string(&self) -> Result<String, crate::error::SpiderError>;
159
160 fn to_json_string_pretty(&self) -> Result<String, crate::error::SpiderError>;
166
167 fn to_markdown_string(&self) -> String;
169
170 fn to_display_string(&self) -> String;
172}
173
174pub struct MetricsDisplayFormatter;
176
177impl MetricsDisplayFormatter {
178 pub fn format_metrics<T: MetricsSnapshotProvider>(&self, snapshot: &T) -> String {
180 format!("\n{}\n", format_plain_text_metrics(snapshot))
181 }
182}
183
184pub fn format_plain_text_metrics<T: MetricsSnapshotProvider>(snapshot: &T) -> String {
186 let overall_req_per_sec = calculate_rate(
187 snapshot.get_requests_sent(),
188 snapshot.get_elapsed_duration(),
189 );
190 let overall_resp_per_sec = calculate_rate(
191 snapshot.get_responses_received(),
192 snapshot.get_elapsed_duration(),
193 );
194 let overall_item_per_sec = calculate_rate(
195 snapshot.get_items_scraped(),
196 snapshot.get_elapsed_duration(),
197 );
198 let pending_requests = snapshot.get_requests_enqueued().saturating_sub(
199 snapshot.get_requests_succeeded()
200 + snapshot.get_requests_failed()
201 + snapshot.get_requests_dropped(),
202 );
203 let success_ratio = format_ratio(
204 snapshot.get_requests_succeeded(),
205 snapshot.get_requests_sent(),
206 );
207 let failure_ratio = format_ratio(snapshot.get_requests_failed(), snapshot.get_requests_sent());
208 let cache_hit_ratio = format_ratio(
209 snapshot.get_responses_from_cache(),
210 snapshot.get_responses_received(),
211 );
212 let bytes_per_second = format_byte_rate(
213 snapshot.get_total_bytes_downloaded(),
214 snapshot.get_elapsed_duration(),
215 );
216
217 format!(
218 "Crawl Statistics\n\
219 ----------------\n\
220 duration : {}\n\
221 speed : req/s {:.2}, resp/s {:.2}, item/s {:.2}\n\
222 requests : enqueued {}, sent {}, pending {}, ok {}, fail {}\n\
223 retry : retry {}, scheduled {}, drop {}\n\
224 ratios : success {}, failure {}, cache hit {}\n\
225 response : received {}, cache {}, downloaded {}, bytes/s {}\n\
226 delay : retry in flight {} ms\n\
227 backlog : queue {}, parser {}, pipeline {}, retry {}\n\
228 items : scraped {}, processed {}, dropped {}\n\
229 current : {}\n\
230 req time : avg {}, fastest {}, slowest {}, total {}\n\
231 parsing : avg {}, fastest {}, slowest {}, total {}\n\
232 status : {}",
233 snapshot.formatted_duration(),
234 overall_req_per_sec,
235 overall_resp_per_sec,
236 overall_item_per_sec,
237 snapshot.get_requests_enqueued(),
238 snapshot.get_requests_sent(),
239 pending_requests,
240 snapshot.get_requests_succeeded(),
241 snapshot.get_requests_failed(),
242 snapshot.get_requests_retried(),
243 snapshot.get_requests_scheduled_for_retry(),
244 snapshot.get_requests_dropped(),
245 success_ratio,
246 failure_ratio,
247 cache_hit_ratio,
248 snapshot.get_responses_received(),
249 snapshot.get_responses_from_cache(),
250 snapshot.formatted_bytes(),
251 bytes_per_second,
252 snapshot.get_retry_delay_in_flight_ms(),
253 snapshot.get_queue_depth(),
254 snapshot.get_parser_backlog(),
255 snapshot.get_pipeline_backlog(),
256 snapshot.get_retry_backlog(),
257 snapshot.get_items_scraped(),
258 snapshot.get_items_processed(),
259 snapshot.get_items_dropped_by_pipeline(),
260 snapshot.get_current_item_preview(),
261 snapshot.formatted_request_time(snapshot.get_average_request_time()),
262 snapshot.formatted_request_time(snapshot.get_fastest_request_time()),
263 snapshot.formatted_request_time(snapshot.get_slowest_request_time()),
264 snapshot.get_request_time_count(),
265 snapshot.formatted_request_time(snapshot.get_average_parsing_time()),
266 snapshot.formatted_request_time(snapshot.get_fastest_parsing_time()),
267 snapshot.formatted_request_time(snapshot.get_slowest_parsing_time()),
268 snapshot.get_parsing_time_count(),
269 format_status_counts(snapshot.get_response_status_counts())
270 )
271}
272
273fn format_status_counts(status_counts: &std::collections::HashMap<u16, usize>) -> String {
274 if status_counts.is_empty() {
275 return "none".to_string();
276 }
277
278 let mut status_entries = status_counts
279 .iter()
280 .map(|(code, count)| (*code, *count))
281 .collect::<Vec<_>>();
282 status_entries.sort_unstable_by_key(|(code, _)| *code);
283
284 status_entries
285 .into_iter()
286 .map(|(code, count)| format!("{code}: {count}"))
287 .collect::<Vec<_>>()
288 .join(", ")
289}
290
291fn calculate_rate(count: usize, elapsed_duration: Duration) -> f64 {
292 DefaultRateCalculator.calculate_rate(count, elapsed_duration)
293}
294
295fn format_ratio(numerator: usize, denominator: usize) -> String {
296 if denominator == 0 {
297 return "0.00%".to_string();
298 }
299
300 format!("{:.2}%", (numerator as f64 / denominator as f64) * 100.0)
301}
302
303fn format_byte_rate(total_bytes: usize, elapsed_duration: Duration) -> String {
304 let bytes_per_second = calculate_rate(total_bytes, elapsed_duration);
305 format!(
306 "{}/s",
307 DefaultByteFormatter.formatted_bytes(bytes_per_second as usize)
308 )
309}
310
311pub trait MetricsSnapshotProvider {
313 fn get_requests_enqueued(&self) -> usize;
314 fn get_requests_sent(&self) -> usize;
315 fn get_requests_succeeded(&self) -> usize;
316 fn get_requests_failed(&self) -> usize;
317 fn get_requests_retried(&self) -> usize;
318 fn get_requests_scheduled_for_retry(&self) -> usize;
319 fn get_requests_dropped(&self) -> usize;
320 fn get_retry_delay_in_flight_ms(&self) -> u64;
321 fn get_responses_received(&self) -> usize;
322 fn get_responses_from_cache(&self) -> usize;
323 fn get_total_bytes_downloaded(&self) -> usize;
324 fn get_items_scraped(&self) -> usize;
325 fn get_items_processed(&self) -> usize;
326 fn get_items_dropped_by_pipeline(&self) -> usize;
327 fn get_queue_depth(&self) -> usize;
328 fn get_parser_backlog(&self) -> usize;
329 fn get_pipeline_backlog(&self) -> usize;
330 fn get_retry_backlog(&self) -> usize;
331 fn get_response_status_counts(&self) -> &std::collections::HashMap<u16, usize>;
332 fn get_elapsed_duration(&self) -> Duration;
333 fn get_average_request_time(&self) -> Option<Duration>;
334 fn get_fastest_request_time(&self) -> Option<Duration>;
335 fn get_slowest_request_time(&self) -> Option<Duration>;
336 fn get_request_time_count(&self) -> usize;
337 fn get_average_parsing_time(&self) -> Option<Duration>;
338 fn get_fastest_parsing_time(&self) -> Option<Duration>;
339 fn get_slowest_parsing_time(&self) -> Option<Duration>;
340 fn get_parsing_time_count(&self) -> usize;
341 fn get_recent_requests_per_second(&self) -> f64;
342 fn get_recent_responses_per_second(&self) -> f64;
343 fn get_recent_items_per_second(&self) -> f64;
344 fn get_current_item_preview(&self) -> &str;
345 fn formatted_duration(&self) -> String;
346 fn formatted_request_time(&self, duration: Option<Duration>) -> String;
347 fn formatted_bytes(&self) -> String;
348}
349
350impl MetricsSnapshotProvider for MetricsSnapshot {
351 fn get_requests_enqueued(&self) -> usize {
352 self.requests_enqueued
353 }
354
355 fn get_requests_sent(&self) -> usize {
356 self.requests_sent
357 }
358
359 fn get_requests_succeeded(&self) -> usize {
360 self.requests_succeeded
361 }
362
363 fn get_requests_failed(&self) -> usize {
364 self.requests_failed
365 }
366
367 fn get_requests_retried(&self) -> usize {
368 self.requests_retried
369 }
370
371 fn get_requests_scheduled_for_retry(&self) -> usize {
372 self.requests_scheduled_for_retry
373 }
374
375 fn get_requests_dropped(&self) -> usize {
376 self.requests_dropped
377 }
378
379 fn get_retry_delay_in_flight_ms(&self) -> u64 {
380 self.retry_delay_in_flight_ms
381 }
382
383 fn get_responses_received(&self) -> usize {
384 self.responses_received
385 }
386
387 fn get_responses_from_cache(&self) -> usize {
388 self.responses_from_cache
389 }
390
391 fn get_total_bytes_downloaded(&self) -> usize {
392 self.total_bytes_downloaded
393 }
394
395 fn get_items_scraped(&self) -> usize {
396 self.items_scraped
397 }
398
399 fn get_items_processed(&self) -> usize {
400 self.items_processed
401 }
402
403 fn get_items_dropped_by_pipeline(&self) -> usize {
404 self.items_dropped_by_pipeline
405 }
406
407 fn get_queue_depth(&self) -> usize {
408 self.queue_depth
409 }
410
411 fn get_parser_backlog(&self) -> usize {
412 self.parser_backlog
413 }
414
415 fn get_pipeline_backlog(&self) -> usize {
416 self.pipeline_backlog
417 }
418
419 fn get_retry_backlog(&self) -> usize {
420 self.retry_backlog
421 }
422
423 fn get_response_status_counts(&self) -> &std::collections::HashMap<u16, usize> {
424 &self.response_status_counts
425 }
426
427 fn get_elapsed_duration(&self) -> Duration {
428 self.elapsed_duration
429 }
430
431 fn get_average_request_time(&self) -> Option<Duration> {
432 self.average_request_time
433 }
434
435 fn get_fastest_request_time(&self) -> Option<Duration> {
436 self.fastest_request_time
437 }
438
439 fn get_slowest_request_time(&self) -> Option<Duration> {
440 self.slowest_request_time
441 }
442
443 fn get_request_time_count(&self) -> usize {
444 self.request_time_count
445 }
446
447 fn get_average_parsing_time(&self) -> Option<Duration> {
448 self.average_parsing_time
449 }
450
451 fn get_fastest_parsing_time(&self) -> Option<Duration> {
452 self.fastest_parsing_time
453 }
454
455 fn get_slowest_parsing_time(&self) -> Option<Duration> {
456 self.slowest_parsing_time
457 }
458
459 fn get_parsing_time_count(&self) -> usize {
460 self.parsing_time_count
461 }
462
463 fn get_recent_requests_per_second(&self) -> f64 {
464 self.recent_requests_per_second
465 }
466
467 fn get_recent_responses_per_second(&self) -> f64 {
468 self.recent_responses_per_second
469 }
470
471 fn get_recent_items_per_second(&self) -> f64 {
472 self.recent_items_per_second
473 }
474
475 fn get_current_item_preview(&self) -> &str {
476 &self.current_item_preview
477 }
478
479 fn formatted_duration(&self) -> String {
480 self.formatted_duration()
481 }
482
483 fn formatted_request_time(&self, duration: Option<Duration>) -> String {
484 self.formatted_request_time(duration)
485 }
486
487 fn formatted_bytes(&self) -> String {
488 self.formatted_bytes()
489 }
490}