1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::sync::Arc;
4use std::time::Instant;
5
6#[derive(Debug, Clone, serde::Serialize)]
7pub struct AppMetricsJson {
8 pub requests: u64,
9 pub bytes_received: u64,
10 pub bytes_sent: u64,
11 pub avg_response_time_ms: f64,
12 pub errors: u64,
13}
14
15#[derive(Clone)]
16pub struct AppMetrics {
17 pub requests_total: Arc<AtomicU64>,
18 pub bytes_received: Arc<AtomicU64>,
19 pub bytes_sent: Arc<AtomicU64>,
20 pub response_time_nanos_sum: Arc<AtomicU64>,
21 pub response_time_count: Arc<AtomicU64>,
22 pub errors_total: Arc<AtomicU64>,
23}
24
25impl AppMetrics {
26 pub fn new() -> Self {
27 Self {
28 requests_total: Arc::new(AtomicU64::new(0)),
29 bytes_received: Arc::new(AtomicU64::new(0)),
30 bytes_sent: Arc::new(AtomicU64::new(0)),
31 response_time_nanos_sum: Arc::new(AtomicU64::new(0)),
32 response_time_count: Arc::new(AtomicU64::new(0)),
33 errors_total: Arc::new(AtomicU64::new(0)),
34 }
35 }
36}
37
38impl Default for AppMetrics {
39 fn default() -> Self {
40 Self::new()
41 }
42}
43
44const STATUS_ARRAY_SIZE: usize = 512;
46
47#[derive(Clone)]
48pub struct Metrics {
49 requests_total: Arc<AtomicU64>,
50 requests_in_flight: Arc<AtomicUsize>,
51 bytes_received: Arc<AtomicU64>,
52 bytes_sent: Arc<AtomicU64>,
53 response_time_nanos_sum: Arc<AtomicU64>,
54 response_time_count: Arc<AtomicU64>,
55 status_codes: Arc<[AtomicU64; STATUS_ARRAY_SIZE]>,
56 tls_connections: Arc<AtomicU64>,
57 errors_total: Arc<AtomicU64>,
58 last_request_nanos: Arc<AtomicU64>,
59 epoch_start: Instant,
60 app_metrics: Arc<parking_lot::RwLock<HashMap<String, AppMetrics>>>,
61}
62
63impl Default for Metrics {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69impl Metrics {
70 pub fn new() -> Self {
71 Self {
72 requests_total: Arc::new(AtomicU64::new(0)),
73 requests_in_flight: Arc::new(AtomicUsize::new(0)),
74 bytes_received: Arc::new(AtomicU64::new(0)),
75 bytes_sent: Arc::new(AtomicU64::new(0)),
76 response_time_nanos_sum: Arc::new(AtomicU64::new(0)),
77 response_time_count: Arc::new(AtomicU64::new(0)),
78 status_codes: Arc::new(std::array::from_fn(|_| AtomicU64::new(0))),
79 tls_connections: Arc::new(AtomicU64::new(0)),
80 errors_total: Arc::new(AtomicU64::new(0)),
81 last_request_nanos: Arc::new(AtomicU64::new(0)),
82 epoch_start: Instant::now(),
83 app_metrics: Arc::new(parking_lot::RwLock::new(HashMap::new())),
84 }
85 }
86
87 pub fn get_app_metrics(&self, app_name: &str) -> Option<AppMetricsJson> {
88 let apps = self.app_metrics.read();
89 apps.get(app_name).map(|m| AppMetricsJson {
90 requests: m.requests_total.load(Ordering::Relaxed),
91 bytes_received: m.bytes_received.load(Ordering::Relaxed),
92 bytes_sent: m.bytes_sent.load(Ordering::Relaxed),
93 avg_response_time_ms: {
94 let count = m.response_time_count.load(Ordering::Relaxed);
95 if count == 0 {
96 0.0
97 } else {
98 let sum = m.response_time_nanos_sum.load(Ordering::Relaxed);
99 (sum as f64) / (count as f64) / 1_000_000.0
100 }
101 },
102 errors: m.errors_total.load(Ordering::Relaxed),
103 })
104 }
105
106 pub fn get_all_app_metrics(&self) -> HashMap<String, AppMetricsJson> {
107 let apps = self.app_metrics.read();
108 apps.iter()
109 .map(|(name, m)| {
110 (
111 name.clone(),
112 AppMetricsJson {
113 requests: m.requests_total.load(Ordering::Relaxed),
114 bytes_received: m.bytes_received.load(Ordering::Relaxed),
115 bytes_sent: m.bytes_sent.load(Ordering::Relaxed),
116 avg_response_time_ms: {
117 let count = m.response_time_count.load(Ordering::Relaxed);
118 if count == 0 {
119 0.0
120 } else {
121 let sum = m.response_time_nanos_sum.load(Ordering::Relaxed);
122 (sum as f64) / (count as f64) / 1_000_000.0
123 }
124 },
125 errors: m.errors_total.load(Ordering::Relaxed),
126 },
127 )
128 })
129 .collect()
130 }
131
132 pub fn record_request(
133 &self,
134 bytes_in: u64,
135 bytes_out: u64,
136 status: u16,
137 duration: std::time::Duration,
138 ) {
139 self.requests_total.fetch_add(1, Ordering::Relaxed);
140 self.bytes_received.fetch_add(bytes_in, Ordering::Relaxed);
141 self.bytes_sent.fetch_add(bytes_out, Ordering::Relaxed);
142
143 self.response_time_nanos_sum
145 .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
146 self.response_time_count.fetch_add(1, Ordering::Relaxed);
147
148 if status >= 100 && (status as usize - 100) < STATUS_ARRAY_SIZE {
150 self.status_codes[status as usize - 100].fetch_add(1, Ordering::Relaxed);
151 }
152
153 let nanos = self.epoch_start.elapsed().as_nanos() as u64;
155 self.last_request_nanos.store(nanos, Ordering::Relaxed);
156 }
157
158 pub fn record_app_request(
159 &self,
160 app_name: &str,
161 bytes_in: u64,
162 bytes_out: u64,
163 status: u16,
164 duration: std::time::Duration,
165 ) {
166 let success = (200..400).contains(&status);
167 self.record_app_request_with_success(app_name, bytes_in, bytes_out, duration, success);
168 }
169
170 pub fn record_app_request_with_success(
171 &self,
172 app_name: &str,
173 bytes_in: u64,
174 bytes_out: u64,
175 duration: std::time::Duration,
176 success: bool,
177 ) {
178 let app_name = app_name.to_string();
179 {
180 let mut apps = self.app_metrics.write();
181 apps.entry(app_name.clone()).or_default();
182 }
183
184 let app_metrics = {
185 let apps = self.app_metrics.read();
186 apps.get(&app_name).cloned()
187 };
188
189 if let Some(metrics) = app_metrics {
190 metrics.requests_total.fetch_add(1, Ordering::Relaxed);
191 metrics
192 .bytes_received
193 .fetch_add(bytes_in, Ordering::Relaxed);
194 metrics.bytes_sent.fetch_add(bytes_out, Ordering::Relaxed);
195 metrics
196 .response_time_nanos_sum
197 .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
198 metrics.response_time_count.fetch_add(1, Ordering::Relaxed);
199 if !success {
200 metrics.errors_total.fetch_add(1, Ordering::Relaxed);
201 }
202 }
203 }
204
205 pub fn inc_in_flight(&self) {
206 self.requests_in_flight.fetch_add(1, Ordering::Relaxed);
207 }
208
209 pub fn dec_in_flight(&self) {
210 self.requests_in_flight.fetch_sub(1, Ordering::Relaxed);
211 }
212
213 pub fn inc_tls_connections(&self) {
214 self.tls_connections.fetch_add(1, Ordering::Relaxed);
215 }
216
217 pub fn inc_errors(&self) {
218 self.errors_total.fetch_add(1, Ordering::Relaxed);
219 }
220
221 pub fn format_metrics(&self) -> String {
222 let requests = self.requests_total.load(Ordering::Relaxed);
223 let in_flight = self.requests_in_flight.load(Ordering::Relaxed);
224 let bytes_in = self.bytes_received.load(Ordering::Relaxed);
225 let bytes_out = self.bytes_sent.load(Ordering::Relaxed);
226 let tls = self.tls_connections.load(Ordering::Relaxed);
227 let errors = self.errors_total.load(Ordering::Relaxed);
228
229 let avg_response_time = {
230 let count = self.response_time_count.load(Ordering::Relaxed);
231 if count == 0 {
232 0.0
233 } else {
234 let sum = self.response_time_nanos_sum.load(Ordering::Relaxed);
235 (sum as f64) / (count as f64) / 1_000_000_000.0
236 }
237 };
238
239 let mut status_entries: Vec<(u16, u64)> = Vec::new();
241 for i in 0..STATUS_ARRAY_SIZE {
242 let count = self.status_codes[i].load(Ordering::Relaxed);
243 if count > 0 {
244 status_entries.push(((i + 100) as u16, count));
245 }
246 }
247
248 let mut output = String::new();
249 output.push_str("# HELP proxy_requests_total Total number of HTTP requests\n");
250 output.push_str("# TYPE proxy_requests_total counter\n");
251 output.push_str(&format!("proxy_requests_total {}\n", requests));
252
253 output.push_str(
254 "# HELP proxy_requests_in_flight Number of requests currently being processed\n",
255 );
256 output.push_str("# TYPE proxy_requests_in_flight gauge\n");
257 output.push_str(&format!("proxy_requests_in_flight {}\n", in_flight));
258
259 output.push_str("# HELP proxy_bytes_received Total bytes received from clients\n");
260 output.push_str("# TYPE proxy_bytes_received counter\n");
261 output.push_str(&format!("proxy_bytes_received {}\n", bytes_in));
262
263 output.push_str("# HELP proxy_bytes_sent Total bytes sent to clients\n");
264 output.push_str("# TYPE proxy_bytes_sent counter\n");
265 output.push_str(&format!("proxy_bytes_sent {}\n", bytes_out));
266
267 output.push_str("# HELP proxy_response_time_seconds Average response time in seconds\n");
268 output.push_str("# TYPE proxy_response_time_seconds gauge\n");
269 output.push_str(&format!(
270 "proxy_response_time_seconds {}\n",
271 avg_response_time
272 ));
273
274 output.push_str("# HELP proxy_tls_connections_total Total number of TLS connections\n");
275 output.push_str("# TYPE proxy_tls_connections_total counter\n");
276 output.push_str(&format!("proxy_tls_connections_total {}\n", tls));
277
278 output.push_str("# HELP proxy_errors_total Total number of proxy errors\n");
279 output.push_str("# TYPE proxy_errors_total counter\n");
280 output.push_str(&format!("proxy_errors_total {}\n", errors));
281
282 output.push_str("# HELP proxy_response_status_codes_total HTTP response status codes\n");
283 output.push_str("# TYPE proxy_response_status_codes_total counter\n");
284 for (code, count) in status_entries.iter() {
285 output.push_str(&format!(
286 "proxy_response_status_codes_total{{code=\"{}\"}} {}\n",
287 code, count
288 ));
289 }
290
291 output
292 }
293}
294
295pub type SharedMetrics = Arc<Metrics>;
296
297pub fn new_metrics() -> SharedMetrics {
298 Arc::new(Metrics::new())
299}