1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::sync::Arc;
4use std::time::Instant;
5
6#[derive(Clone)]
7struct CpuSnapshot {
8 total_time: u64,
9 timestamp: Instant,
10}
11
12#[derive(Debug, Clone, serde::Serialize)]
13pub struct AppMetricsJson {
14 pub requests: u64,
15 pub bytes_received: u64,
16 pub bytes_sent: u64,
17 pub avg_response_time_ms: f64,
18 pub errors: u64,
19 pub memory_rss_bytes: Option<u64>,
20 pub cpu_percent: Option<f64>,
21}
22
23#[derive(Debug, Clone, serde::Serialize)]
24pub struct SlotMetrics {
25 pub memory_rss_bytes: Option<u64>,
26 pub cpu_percent: Option<f64>,
27}
28
29#[derive(Debug, Clone, serde::Serialize)]
30pub struct AppSystemMetrics {
31 pub blue: SlotMetrics,
32 pub green: SlotMetrics,
33}
34
35#[derive(Clone)]
36pub struct AppMetrics {
37 pub requests_total: Arc<AtomicU64>,
38 pub bytes_received: Arc<AtomicU64>,
39 pub bytes_sent: Arc<AtomicU64>,
40 pub response_time_nanos_sum: Arc<AtomicU64>,
41 pub response_time_count: Arc<AtomicU64>,
42 pub errors_total: Arc<AtomicU64>,
43}
44
45impl AppMetrics {
46 pub fn new() -> Self {
47 Self {
48 requests_total: Arc::new(AtomicU64::new(0)),
49 bytes_received: Arc::new(AtomicU64::new(0)),
50 bytes_sent: Arc::new(AtomicU64::new(0)),
51 response_time_nanos_sum: Arc::new(AtomicU64::new(0)),
52 response_time_count: Arc::new(AtomicU64::new(0)),
53 errors_total: Arc::new(AtomicU64::new(0)),
54 }
55 }
56}
57
58impl Default for AppMetrics {
59 fn default() -> Self {
60 Self::new()
61 }
62}
63
64const STATUS_ARRAY_SIZE: usize = 512;
66
67#[derive(Clone)]
68pub struct Metrics {
69 requests_total: Arc<AtomicU64>,
70 requests_in_flight: Arc<AtomicUsize>,
71 bytes_received: Arc<AtomicU64>,
72 bytes_sent: Arc<AtomicU64>,
73 response_time_nanos_sum: Arc<AtomicU64>,
74 response_time_count: Arc<AtomicU64>,
75 status_codes: Arc<[AtomicU64; STATUS_ARRAY_SIZE]>,
76 tls_connections: Arc<AtomicU64>,
77 errors_total: Arc<AtomicU64>,
78 last_request_nanos: Arc<AtomicU64>,
79 epoch_start: Instant,
80 app_metrics: Arc<parking_lot::RwLock<HashMap<String, AppMetrics>>>,
81 cpu_snapshots: Arc<parking_lot::RwLock<HashMap<u32, CpuSnapshot>>>,
82}
83
84impl Default for Metrics {
85 fn default() -> Self {
86 Self::new()
87 }
88}
89
90impl Metrics {
91 pub fn new() -> Self {
92 Self {
93 requests_total: Arc::new(AtomicU64::new(0)),
94 requests_in_flight: Arc::new(AtomicUsize::new(0)),
95 bytes_received: Arc::new(AtomicU64::new(0)),
96 bytes_sent: Arc::new(AtomicU64::new(0)),
97 response_time_nanos_sum: Arc::new(AtomicU64::new(0)),
98 response_time_count: Arc::new(AtomicU64::new(0)),
99 status_codes: Arc::new(std::array::from_fn(|_| AtomicU64::new(0))),
100 tls_connections: Arc::new(AtomicU64::new(0)),
101 errors_total: Arc::new(AtomicU64::new(0)),
102 last_request_nanos: Arc::new(AtomicU64::new(0)),
103 epoch_start: Instant::now(),
104 app_metrics: Arc::new(parking_lot::RwLock::new(HashMap::new())),
105 cpu_snapshots: Arc::new(parking_lot::RwLock::new(HashMap::new())),
106 }
107 }
108
109 pub fn get_app_metrics(&self, app_name: &str) -> Option<AppMetricsJson> {
110 let apps = self.app_metrics.read();
111 apps.get(app_name).map(|m| AppMetricsJson {
112 requests: m.requests_total.load(Ordering::Relaxed),
113 bytes_received: m.bytes_received.load(Ordering::Relaxed),
114 bytes_sent: m.bytes_sent.load(Ordering::Relaxed),
115 avg_response_time_ms: {
116 let count = m.response_time_count.load(Ordering::Relaxed);
117 if count == 0 {
118 0.0
119 } else {
120 let sum = m.response_time_nanos_sum.load(Ordering::Relaxed);
121 (sum as f64) / (count as f64) / 1_000_000.0
122 }
123 },
124 errors: m.errors_total.load(Ordering::Relaxed),
125 memory_rss_bytes: None,
126 cpu_percent: None,
127 })
128 }
129
130 pub fn get_all_app_metrics(&self) -> HashMap<String, AppMetricsJson> {
131 let apps = self.app_metrics.read();
132 apps.iter()
133 .map(|(name, m)| {
134 (
135 name.clone(),
136 AppMetricsJson {
137 requests: m.requests_total.load(Ordering::Relaxed),
138 bytes_received: m.bytes_received.load(Ordering::Relaxed),
139 bytes_sent: m.bytes_sent.load(Ordering::Relaxed),
140 avg_response_time_ms: {
141 let count = m.response_time_count.load(Ordering::Relaxed);
142 if count == 0 {
143 0.0
144 } else {
145 let sum = m.response_time_nanos_sum.load(Ordering::Relaxed);
146 (sum as f64) / (count as f64) / 1_000_000.0
147 }
148 },
149 errors: m.errors_total.load(Ordering::Relaxed),
150 memory_rss_bytes: None,
151 cpu_percent: None,
152 },
153 )
154 })
155 .collect()
156 }
157
158 #[cfg(unix)]
159 pub fn get_process_stats(&self, pid: u32) -> Option<SlotMetrics> {
160 let stat_path = format!("/proc/{}/stat", pid);
161 let status_path = format!("/proc/{}/status", pid);
162
163 let mut memory_bytes: Option<u64> = None;
164 if let Ok(content) = std::fs::read_to_string(&status_path) {
165 for line in content.lines() {
166 if line.starts_with("VmRSS:") {
167 let parts: Vec<&str> = line.split_whitespace().collect();
168 if parts.len() >= 2 {
169 if let Ok(kb) = parts[1].parse::<u64>() {
170 memory_bytes = Some(kb * 1024);
171 }
172 }
173 break;
174 }
175 }
176 }
177
178 let mut cpu_percent: Option<f64> = None;
179 if let Ok(content) = std::fs::read_to_string(&stat_path) {
180 if let Some(idx) = content.find('(') {
181 if let Some(idx2) = content[idx..].find(')') {
182 let parts: Vec<&str> = content[idx + idx2 + 2..].split_whitespace().collect();
183 if parts.len() >= 14 {
184 let utime: u64 = parts[12].parse().unwrap_or(0);
185 let stime: u64 = parts[13].parse().unwrap_or(0);
186 let total_time = utime + stime;
187
188 let now = Instant::now();
189 let prev = self.cpu_snapshots.read().get(&pid).cloned();
190
191 if let Some(prev_snapshot) = prev {
192 let delta_time = now.duration_since(prev_snapshot.timestamp);
193 if delta_time.as_secs_f64() > 0.0 {
194 let delta_cpu = total_time as f64 - prev_snapshot.total_time as f64;
195 let hz = 100.0;
196 cpu_percent =
197 Some((delta_cpu / hz) / delta_time.as_secs_f64() * 100.0);
198 }
199 }
200
201 self.cpu_snapshots.write().insert(
202 pid,
203 CpuSnapshot {
204 total_time,
205 timestamp: now,
206 },
207 );
208 }
209 }
210 }
211 }
212
213 if memory_bytes.is_none() && cpu_percent.is_none() {
214 return None;
215 }
216
217 Some(SlotMetrics {
218 memory_rss_bytes: memory_bytes,
219 cpu_percent,
220 })
221 }
222
223 #[cfg(not(unix))]
224 pub fn get_process_stats(&self, _pid: u32) -> Option<SlotMetrics> {
225 None
226 }
227
228 pub fn record_request(
229 &self,
230 bytes_in: u64,
231 bytes_out: u64,
232 status: u16,
233 duration: std::time::Duration,
234 ) {
235 self.requests_total.fetch_add(1, Ordering::Relaxed);
236 self.bytes_received.fetch_add(bytes_in, Ordering::Relaxed);
237 self.bytes_sent.fetch_add(bytes_out, Ordering::Relaxed);
238
239 self.response_time_nanos_sum
241 .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
242 self.response_time_count.fetch_add(1, Ordering::Relaxed);
243
244 if status >= 100 && (status as usize - 100) < STATUS_ARRAY_SIZE {
246 self.status_codes[status as usize - 100].fetch_add(1, Ordering::Relaxed);
247 }
248
249 let nanos = self.epoch_start.elapsed().as_nanos() as u64;
251 self.last_request_nanos.store(nanos, Ordering::Relaxed);
252 }
253
254 pub fn record_app_request(
255 &self,
256 app_name: &str,
257 bytes_in: u64,
258 bytes_out: u64,
259 status: u16,
260 duration: std::time::Duration,
261 ) {
262 let success = (200..400).contains(&status);
263 self.record_app_request_with_success(app_name, bytes_in, bytes_out, duration, success);
264 }
265
266 pub fn record_app_request_with_success(
267 &self,
268 app_name: &str,
269 bytes_in: u64,
270 bytes_out: u64,
271 duration: std::time::Duration,
272 success: bool,
273 ) {
274 let app_name = app_name.to_string();
275 {
276 let mut apps = self.app_metrics.write();
277 apps.entry(app_name.clone()).or_default();
278 }
279
280 let app_metrics = {
281 let apps = self.app_metrics.read();
282 apps.get(&app_name).cloned()
283 };
284
285 if let Some(metrics) = app_metrics {
286 metrics.requests_total.fetch_add(1, Ordering::Relaxed);
287 metrics
288 .bytes_received
289 .fetch_add(bytes_in, Ordering::Relaxed);
290 metrics.bytes_sent.fetch_add(bytes_out, Ordering::Relaxed);
291 metrics
292 .response_time_nanos_sum
293 .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
294 metrics.response_time_count.fetch_add(1, Ordering::Relaxed);
295 if !success {
296 metrics.errors_total.fetch_add(1, Ordering::Relaxed);
297 }
298 }
299 }
300
301 pub fn inc_in_flight(&self) {
302 self.requests_in_flight.fetch_add(1, Ordering::Relaxed);
303 }
304
305 pub fn dec_in_flight(&self) {
306 self.requests_in_flight.fetch_sub(1, Ordering::Relaxed);
307 }
308
309 pub fn inc_tls_connections(&self) {
310 self.tls_connections.fetch_add(1, Ordering::Relaxed);
311 }
312
313 pub fn inc_errors(&self) {
314 self.errors_total.fetch_add(1, Ordering::Relaxed);
315 }
316
317 pub fn format_metrics(&self) -> String {
318 let requests = self.requests_total.load(Ordering::Relaxed);
319 let in_flight = self.requests_in_flight.load(Ordering::Relaxed);
320 let bytes_in = self.bytes_received.load(Ordering::Relaxed);
321 let bytes_out = self.bytes_sent.load(Ordering::Relaxed);
322 let tls = self.tls_connections.load(Ordering::Relaxed);
323 let errors = self.errors_total.load(Ordering::Relaxed);
324
325 let avg_response_time = {
326 let count = self.response_time_count.load(Ordering::Relaxed);
327 if count == 0 {
328 0.0
329 } else {
330 let sum = self.response_time_nanos_sum.load(Ordering::Relaxed);
331 (sum as f64) / (count as f64) / 1_000_000_000.0
332 }
333 };
334
335 let mut status_entries: Vec<(u16, u64)> = Vec::new();
337 for i in 0..STATUS_ARRAY_SIZE {
338 let count = self.status_codes[i].load(Ordering::Relaxed);
339 if count > 0 {
340 status_entries.push(((i + 100) as u16, count));
341 }
342 }
343
344 let mut output = String::new();
345 output.push_str("# HELP proxy_requests_total Total number of HTTP requests\n");
346 output.push_str("# TYPE proxy_requests_total counter\n");
347 output.push_str(&format!("proxy_requests_total {}\n", requests));
348
349 output.push_str(
350 "# HELP proxy_requests_in_flight Number of requests currently being processed\n",
351 );
352 output.push_str("# TYPE proxy_requests_in_flight gauge\n");
353 output.push_str(&format!("proxy_requests_in_flight {}\n", in_flight));
354
355 output.push_str("# HELP proxy_bytes_received Total bytes received from clients\n");
356 output.push_str("# TYPE proxy_bytes_received counter\n");
357 output.push_str(&format!("proxy_bytes_received {}\n", bytes_in));
358
359 output.push_str("# HELP proxy_bytes_sent Total bytes sent to clients\n");
360 output.push_str("# TYPE proxy_bytes_sent counter\n");
361 output.push_str(&format!("proxy_bytes_sent {}\n", bytes_out));
362
363 output.push_str("# HELP proxy_response_time_seconds Average response time in seconds\n");
364 output.push_str("# TYPE proxy_response_time_seconds gauge\n");
365 output.push_str(&format!(
366 "proxy_response_time_seconds {}\n",
367 avg_response_time
368 ));
369
370 output.push_str("# HELP proxy_tls_connections_total Total number of TLS connections\n");
371 output.push_str("# TYPE proxy_tls_connections_total counter\n");
372 output.push_str(&format!("proxy_tls_connections_total {}\n", tls));
373
374 output.push_str("# HELP proxy_errors_total Total number of proxy errors\n");
375 output.push_str("# TYPE proxy_errors_total counter\n");
376 output.push_str(&format!("proxy_errors_total {}\n", errors));
377
378 output.push_str("# HELP proxy_response_status_codes_total HTTP response status codes\n");
379 output.push_str("# TYPE proxy_response_status_codes_total counter\n");
380 for (code, count) in status_entries.iter() {
381 output.push_str(&format!(
382 "proxy_response_status_codes_total{{code=\"{}\"}} {}\n",
383 code, count
384 ));
385 }
386
387 output
388 }
389}
390
391pub type SharedMetrics = Arc<Metrics>;
392
393pub fn new_metrics() -> SharedMetrics {
394 Arc::new(Metrics::new())
395}