1use parking_lot::RwLock;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9pub use crate::formatters::{
11 ByteFormatter, DefaultByteFormatter, DefaultDurationFormatter, DefaultRateCalculator,
12 DurationFormatter, RateCalculator,
13};
14
15#[derive(Debug)]
17pub struct ExpMovingAverage {
18 alpha: f64,
19 rate: Arc<RwLock<f64>>,
20 last_update: Arc<RwLock<Instant>>,
21 event_count: Arc<RwLock<usize>>,
22}
23
24impl ExpMovingAverage {
25 pub fn new(alpha: f64) -> Self {
29 ExpMovingAverage {
30 alpha,
31 rate: Arc::new(RwLock::new(0.0)),
32 last_update: Arc::new(RwLock::new(Instant::now())),
33 event_count: Arc::new(RwLock::new(0)),
34 }
35 }
36
37 pub fn update(&self, count: usize) {
39 let now = Instant::now();
40 let mut last_update = self.last_update.write();
41 let mut event_count = self.event_count.write();
42
43 *event_count += count;
44 let time_delta = now.duration_since(*last_update).as_secs_f64();
45
46 if time_delta >= 1.0 {
47 let current_rate = *event_count as f64 / time_delta;
48 let mut rate = self.rate.write();
49 *rate = self.alpha * current_rate + (1.0 - self.alpha) * (*rate);
50
51 *event_count = 0;
52 *last_update = now;
53 }
54 }
55
56 pub fn get_rate(&self) -> f64 {
58 *self.rate.read()
59 }
60}
61
62#[derive(Debug, Clone, serde::Serialize)]
64pub struct MetricsSnapshot {
65 pub requests_enqueued: usize,
66 pub requests_sent: usize,
67 pub requests_succeeded: usize,
68 pub requests_failed: usize,
69 pub requests_retried: usize,
70 pub requests_dropped: usize,
71 pub responses_received: usize,
72 pub responses_from_cache: usize,
73 pub total_bytes_downloaded: usize,
74 pub items_scraped: usize,
75 pub items_processed: usize,
76 pub items_dropped_by_pipeline: usize,
77 pub response_status_counts: std::collections::HashMap<u16, usize>,
78 pub elapsed_duration: Duration,
79 pub average_request_time: Option<Duration>,
80 pub fastest_request_time: Option<Duration>,
81 pub slowest_request_time: Option<Duration>,
82 pub request_time_count: usize,
83 pub average_parsing_time: Option<Duration>,
84 pub fastest_parsing_time: Option<Duration>,
85 pub slowest_parsing_time: Option<Duration>,
86 pub parsing_time_count: usize,
87 pub recent_requests_per_second: f64,
88 pub recent_responses_per_second: f64,
89 pub recent_items_per_second: f64,
90}
91
92impl MetricsSnapshot {
93 pub fn formatted_duration(&self) -> String {
95 DefaultDurationFormatter.formatted_duration(self.elapsed_duration)
96 }
97
98 pub fn formatted_request_time(&self, duration: Option<Duration>) -> String {
100 DefaultDurationFormatter.formatted_request_time(duration)
101 }
102
103 pub fn requests_per_second(&self) -> f64 {
105 DefaultRateCalculator.calculate_rate(self.requests_sent, self.elapsed_duration)
106 }
107
108 pub fn responses_per_second(&self) -> f64 {
110 DefaultRateCalculator.calculate_rate(self.responses_received, self.elapsed_duration)
111 }
112
113 pub fn items_per_second(&self) -> f64 {
115 DefaultRateCalculator.calculate_rate(self.items_scraped, self.elapsed_duration)
116 }
117
118 pub fn formatted_bytes(&self) -> String {
120 DefaultByteFormatter.formatted_bytes(self.total_bytes_downloaded)
121 }
122}
123
124pub trait SnapshotProvider {
126 type Snapshot: Clone;
128
129 fn create_snapshot(&self) -> Self::Snapshot;
131}
132
133pub trait MetricsExporter<T> {
135 fn to_json_string(&self) -> Result<String, crate::error::SpiderError>;
141
142 fn to_json_string_pretty(&self) -> Result<String, crate::error::SpiderError>;
148
149 fn to_markdown_string(&self) -> String;
151
152 fn to_display_string(&self) -> String;
154}
155
156pub struct MetricsDisplayFormatter;
158
159impl MetricsDisplayFormatter {
160 pub fn format_metrics<T: MetricsSnapshotProvider>(&self, snapshot: &T) -> String {
162 format!(
163 "\nCrawl Statistics\n----------------\nduration : {}\n speed : req/s: {:.2}, resp/s: {:.2}, item/s: {:.2}\nrequests : enqueued: {}, sent: {}, ok: {}, fail: {}, retry: {}, drop: {}\nresponse : received: {}, from_cache: {}, downloaded: {}\nitems : scraped: {}, processed: {}, dropped: {}\nreq time : avg: {}, fastest: {}, slowest: {}, total: {}\nparsing : avg: {}, fastest: {}, slowest: {}, total: {}\nstatus : {}\n",
164 snapshot.formatted_duration(),
165 snapshot.get_recent_requests_per_second(),
166 snapshot.get_recent_responses_per_second(),
167 snapshot.get_recent_items_per_second(),
168 snapshot.get_requests_enqueued(),
169 snapshot.get_requests_sent(),
170 snapshot.get_requests_succeeded(),
171 snapshot.get_requests_failed(),
172 snapshot.get_requests_retried(),
173 snapshot.get_requests_dropped(),
174 snapshot.get_responses_received(),
175 snapshot.get_responses_from_cache(),
176 snapshot.formatted_bytes(),
177 snapshot.get_items_scraped(),
178 snapshot.get_items_processed(),
179 snapshot.get_items_dropped_by_pipeline(),
180 snapshot.formatted_request_time(snapshot.get_average_request_time()),
181 snapshot.formatted_request_time(snapshot.get_fastest_request_time()),
182 snapshot.formatted_request_time(snapshot.get_slowest_request_time()),
183 snapshot.get_request_time_count(),
184 snapshot.formatted_request_time(snapshot.get_average_parsing_time()),
185 snapshot.formatted_request_time(snapshot.get_fastest_parsing_time()),
186 snapshot.formatted_request_time(snapshot.get_slowest_parsing_time()),
187 snapshot.get_parsing_time_count(),
188 if snapshot.get_response_status_counts().is_empty() {
189 "none".to_string()
190 } else {
191 snapshot
192 .get_response_status_counts()
193 .iter()
194 .map(|(code, count)| format!("{}: {}", code, count))
195 .collect::<Vec<String>>()
196 .join(", ")
197 }
198 )
199 }
200}
201
202pub trait MetricsSnapshotProvider {
204 fn get_requests_enqueued(&self) -> usize;
205 fn get_requests_sent(&self) -> usize;
206 fn get_requests_succeeded(&self) -> usize;
207 fn get_requests_failed(&self) -> usize;
208 fn get_requests_retried(&self) -> usize;
209 fn get_requests_dropped(&self) -> usize;
210 fn get_responses_received(&self) -> usize;
211 fn get_responses_from_cache(&self) -> usize;
212 fn get_total_bytes_downloaded(&self) -> usize;
213 fn get_items_scraped(&self) -> usize;
214 fn get_items_processed(&self) -> usize;
215 fn get_items_dropped_by_pipeline(&self) -> usize;
216 fn get_response_status_counts(&self) -> &std::collections::HashMap<u16, usize>;
217 fn get_elapsed_duration(&self) -> Duration;
218 fn get_average_request_time(&self) -> Option<Duration>;
219 fn get_fastest_request_time(&self) -> Option<Duration>;
220 fn get_slowest_request_time(&self) -> Option<Duration>;
221 fn get_request_time_count(&self) -> usize;
222 fn get_average_parsing_time(&self) -> Option<Duration>;
223 fn get_fastest_parsing_time(&self) -> Option<Duration>;
224 fn get_slowest_parsing_time(&self) -> Option<Duration>;
225 fn get_parsing_time_count(&self) -> usize;
226 fn get_recent_requests_per_second(&self) -> f64;
227 fn get_recent_responses_per_second(&self) -> f64;
228 fn get_recent_items_per_second(&self) -> f64;
229 fn formatted_duration(&self) -> String;
230 fn formatted_request_time(&self, duration: Option<Duration>) -> String;
231 fn formatted_bytes(&self) -> String;
232}