Skip to main content

soli_proxy/
metrics.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::sync::Arc;
4use std::time::Instant;
5
6#[derive(Clone)]
7struct CpuSnapshot {
8    total_time: u64,
9    timestamp: Instant,
10}
11
12#[derive(Debug, Clone, serde::Serialize)]
13pub struct AppMetricsJson {
14    pub requests: u64,
15    pub bytes_received: u64,
16    pub bytes_sent: u64,
17    pub avg_response_time_ms: f64,
18    pub errors: u64,
19    pub memory_rss_bytes: Option<u64>,
20    pub cpu_percent: Option<f64>,
21}
22
23#[derive(Debug, Clone, serde::Serialize)]
24pub struct SlotMetrics {
25    pub memory_rss_bytes: Option<u64>,
26    pub cpu_percent: Option<f64>,
27}
28
29#[derive(Debug, Clone, serde::Serialize)]
30pub struct AppSystemMetrics {
31    pub blue: SlotMetrics,
32    pub green: SlotMetrics,
33}
34
35#[derive(Clone)]
36pub struct AppMetrics {
37    pub requests_total: Arc<AtomicU64>,
38    pub bytes_received: Arc<AtomicU64>,
39    pub bytes_sent: Arc<AtomicU64>,
40    pub response_time_nanos_sum: Arc<AtomicU64>,
41    pub response_time_count: Arc<AtomicU64>,
42    pub errors_total: Arc<AtomicU64>,
43}
44
45impl AppMetrics {
46    pub fn new() -> Self {
47        Self {
48            requests_total: Arc::new(AtomicU64::new(0)),
49            bytes_received: Arc::new(AtomicU64::new(0)),
50            bytes_sent: Arc::new(AtomicU64::new(0)),
51            response_time_nanos_sum: Arc::new(AtomicU64::new(0)),
52            response_time_count: Arc::new(AtomicU64::new(0)),
53            errors_total: Arc::new(AtomicU64::new(0)),
54        }
55    }
56}
57
58impl Default for AppMetrics {
59    fn default() -> Self {
60        Self::new()
61    }
62}
63
64/// Status code array size: covers HTTP codes 100-599
65const STATUS_ARRAY_SIZE: usize = 512;
66
67#[derive(Clone)]
68pub struct Metrics {
69    requests_total: Arc<AtomicU64>,
70    requests_in_flight: Arc<AtomicUsize>,
71    bytes_received: Arc<AtomicU64>,
72    bytes_sent: Arc<AtomicU64>,
73    response_time_nanos_sum: Arc<AtomicU64>,
74    response_time_count: Arc<AtomicU64>,
75    status_codes: Arc<[AtomicU64; STATUS_ARRAY_SIZE]>,
76    tls_connections: Arc<AtomicU64>,
77    errors_total: Arc<AtomicU64>,
78    last_request_nanos: Arc<AtomicU64>,
79    epoch_start: Instant,
80    app_metrics: Arc<parking_lot::RwLock<HashMap<String, AppMetrics>>>,
81    cpu_snapshots: Arc<parking_lot::RwLock<HashMap<u32, CpuSnapshot>>>,
82}
83
84impl Default for Metrics {
85    fn default() -> Self {
86        Self::new()
87    }
88}
89
90impl Metrics {
91    pub fn new() -> Self {
92        Self {
93            requests_total: Arc::new(AtomicU64::new(0)),
94            requests_in_flight: Arc::new(AtomicUsize::new(0)),
95            bytes_received: Arc::new(AtomicU64::new(0)),
96            bytes_sent: Arc::new(AtomicU64::new(0)),
97            response_time_nanos_sum: Arc::new(AtomicU64::new(0)),
98            response_time_count: Arc::new(AtomicU64::new(0)),
99            status_codes: Arc::new(std::array::from_fn(|_| AtomicU64::new(0))),
100            tls_connections: Arc::new(AtomicU64::new(0)),
101            errors_total: Arc::new(AtomicU64::new(0)),
102            last_request_nanos: Arc::new(AtomicU64::new(0)),
103            epoch_start: Instant::now(),
104            app_metrics: Arc::new(parking_lot::RwLock::new(HashMap::new())),
105            cpu_snapshots: Arc::new(parking_lot::RwLock::new(HashMap::new())),
106        }
107    }
108
109    pub fn get_app_metrics(&self, app_name: &str) -> Option<AppMetricsJson> {
110        let apps = self.app_metrics.read();
111        apps.get(app_name).map(|m| AppMetricsJson {
112            requests: m.requests_total.load(Ordering::Relaxed),
113            bytes_received: m.bytes_received.load(Ordering::Relaxed),
114            bytes_sent: m.bytes_sent.load(Ordering::Relaxed),
115            avg_response_time_ms: {
116                let count = m.response_time_count.load(Ordering::Relaxed);
117                if count == 0 {
118                    0.0
119                } else {
120                    let sum = m.response_time_nanos_sum.load(Ordering::Relaxed);
121                    (sum as f64) / (count as f64) / 1_000_000.0
122                }
123            },
124            errors: m.errors_total.load(Ordering::Relaxed),
125            memory_rss_bytes: None,
126            cpu_percent: None,
127        })
128    }
129
130    pub fn get_all_app_metrics(&self) -> HashMap<String, AppMetricsJson> {
131        let apps = self.app_metrics.read();
132        apps.iter()
133            .map(|(name, m)| {
134                (
135                    name.clone(),
136                    AppMetricsJson {
137                        requests: m.requests_total.load(Ordering::Relaxed),
138                        bytes_received: m.bytes_received.load(Ordering::Relaxed),
139                        bytes_sent: m.bytes_sent.load(Ordering::Relaxed),
140                        avg_response_time_ms: {
141                            let count = m.response_time_count.load(Ordering::Relaxed);
142                            if count == 0 {
143                                0.0
144                            } else {
145                                let sum = m.response_time_nanos_sum.load(Ordering::Relaxed);
146                                (sum as f64) / (count as f64) / 1_000_000.0
147                            }
148                        },
149                        errors: m.errors_total.load(Ordering::Relaxed),
150                        memory_rss_bytes: None,
151                        cpu_percent: None,
152                    },
153                )
154            })
155            .collect()
156    }
157
158    #[cfg(unix)]
159    pub fn get_process_stats(&self, pid: u32) -> Option<SlotMetrics> {
160        let stat_path = format!("/proc/{}/stat", pid);
161        let status_path = format!("/proc/{}/status", pid);
162
163        let mut memory_bytes: Option<u64> = None;
164        if let Ok(content) = std::fs::read_to_string(&status_path) {
165            for line in content.lines() {
166                if line.starts_with("VmRSS:") {
167                    let parts: Vec<&str> = line.split_whitespace().collect();
168                    if parts.len() >= 2 {
169                        if let Ok(kb) = parts[1].parse::<u64>() {
170                            memory_bytes = Some(kb * 1024);
171                        }
172                    }
173                    break;
174                }
175            }
176        }
177
178        let mut cpu_percent: Option<f64> = None;
179        if let Ok(content) = std::fs::read_to_string(&stat_path) {
180            if let Some(idx) = content.find('(') {
181                if let Some(idx2) = content[idx..].find(')') {
182                    let parts: Vec<&str> = content[idx + idx2 + 2..].split_whitespace().collect();
183                    if parts.len() >= 14 {
184                        let utime: u64 = parts[12].parse().unwrap_or(0);
185                        let stime: u64 = parts[13].parse().unwrap_or(0);
186                        let total_time = utime + stime;
187
188                        let now = Instant::now();
189                        let prev = self.cpu_snapshots.read().get(&pid).cloned();
190
191                        if let Some(prev_snapshot) = prev {
192                            let delta_time = now.duration_since(prev_snapshot.timestamp);
193                            if delta_time.as_secs_f64() > 0.0 {
194                                let delta_cpu = total_time as f64 - prev_snapshot.total_time as f64;
195                                let hz = 100.0;
196                                cpu_percent =
197                                    Some((delta_cpu / hz) / delta_time.as_secs_f64() * 100.0);
198                            }
199                        }
200
201                        self.cpu_snapshots.write().insert(
202                            pid,
203                            CpuSnapshot {
204                                total_time,
205                                timestamp: now,
206                            },
207                        );
208                    }
209                }
210            }
211        }
212
213        if memory_bytes.is_none() && cpu_percent.is_none() {
214            return None;
215        }
216
217        Some(SlotMetrics {
218            memory_rss_bytes: memory_bytes,
219            cpu_percent,
220        })
221    }
222
223    #[cfg(not(unix))]
224    pub fn get_process_stats(&self, _pid: u32) -> Option<SlotMetrics> {
225        None
226    }
227
228    pub fn record_request(
229        &self,
230        bytes_in: u64,
231        bytes_out: u64,
232        status: u16,
233        duration: std::time::Duration,
234    ) {
235        self.requests_total.fetch_add(1, Ordering::Relaxed);
236        self.bytes_received.fetch_add(bytes_in, Ordering::Relaxed);
237        self.bytes_sent.fetch_add(bytes_out, Ordering::Relaxed);
238
239        // Lock-free EWMA: accumulate nanos sum and count
240        self.response_time_nanos_sum
241            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
242        self.response_time_count.fetch_add(1, Ordering::Relaxed);
243
244        // Lock-free status code: index by (code - 100), bounds-checked
245        if status >= 100 && (status as usize - 100) < STATUS_ARRAY_SIZE {
246            self.status_codes[status as usize - 100].fetch_add(1, Ordering::Relaxed);
247        }
248
249        // Lock-free last request time
250        let nanos = self.epoch_start.elapsed().as_nanos() as u64;
251        self.last_request_nanos.store(nanos, Ordering::Relaxed);
252    }
253
254    pub fn record_app_request(
255        &self,
256        app_name: &str,
257        bytes_in: u64,
258        bytes_out: u64,
259        status: u16,
260        duration: std::time::Duration,
261    ) {
262        let success = (200..400).contains(&status);
263        self.record_app_request_with_success(app_name, bytes_in, bytes_out, duration, success);
264    }
265
266    pub fn record_app_request_with_success(
267        &self,
268        app_name: &str,
269        bytes_in: u64,
270        bytes_out: u64,
271        duration: std::time::Duration,
272        success: bool,
273    ) {
274        let app_name = app_name.to_string();
275        {
276            let mut apps = self.app_metrics.write();
277            apps.entry(app_name.clone()).or_default();
278        }
279
280        let app_metrics = {
281            let apps = self.app_metrics.read();
282            apps.get(&app_name).cloned()
283        };
284
285        if let Some(metrics) = app_metrics {
286            metrics.requests_total.fetch_add(1, Ordering::Relaxed);
287            metrics
288                .bytes_received
289                .fetch_add(bytes_in, Ordering::Relaxed);
290            metrics.bytes_sent.fetch_add(bytes_out, Ordering::Relaxed);
291            metrics
292                .response_time_nanos_sum
293                .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
294            metrics.response_time_count.fetch_add(1, Ordering::Relaxed);
295            if !success {
296                metrics.errors_total.fetch_add(1, Ordering::Relaxed);
297            }
298        }
299    }
300
301    pub fn inc_in_flight(&self) {
302        self.requests_in_flight.fetch_add(1, Ordering::Relaxed);
303    }
304
305    pub fn dec_in_flight(&self) {
306        self.requests_in_flight.fetch_sub(1, Ordering::Relaxed);
307    }
308
309    pub fn inc_tls_connections(&self) {
310        self.tls_connections.fetch_add(1, Ordering::Relaxed);
311    }
312
313    pub fn inc_errors(&self) {
314        self.errors_total.fetch_add(1, Ordering::Relaxed);
315    }
316
317    pub fn format_metrics(&self) -> String {
318        let requests = self.requests_total.load(Ordering::Relaxed);
319        let in_flight = self.requests_in_flight.load(Ordering::Relaxed);
320        let bytes_in = self.bytes_received.load(Ordering::Relaxed);
321        let bytes_out = self.bytes_sent.load(Ordering::Relaxed);
322        let tls = self.tls_connections.load(Ordering::Relaxed);
323        let errors = self.errors_total.load(Ordering::Relaxed);
324
325        let avg_response_time = {
326            let count = self.response_time_count.load(Ordering::Relaxed);
327            if count == 0 {
328                0.0
329            } else {
330                let sum = self.response_time_nanos_sum.load(Ordering::Relaxed);
331                (sum as f64) / (count as f64) / 1_000_000_000.0
332            }
333        };
334
335        // Collect non-zero status codes from the array
336        let mut status_entries: Vec<(u16, u64)> = Vec::new();
337        for i in 0..STATUS_ARRAY_SIZE {
338            let count = self.status_codes[i].load(Ordering::Relaxed);
339            if count > 0 {
340                status_entries.push(((i + 100) as u16, count));
341            }
342        }
343
344        let mut output = String::new();
345        output.push_str("# HELP proxy_requests_total Total number of HTTP requests\n");
346        output.push_str("# TYPE proxy_requests_total counter\n");
347        output.push_str(&format!("proxy_requests_total {}\n", requests));
348
349        output.push_str(
350            "# HELP proxy_requests_in_flight Number of requests currently being processed\n",
351        );
352        output.push_str("# TYPE proxy_requests_in_flight gauge\n");
353        output.push_str(&format!("proxy_requests_in_flight {}\n", in_flight));
354
355        output.push_str("# HELP proxy_bytes_received Total bytes received from clients\n");
356        output.push_str("# TYPE proxy_bytes_received counter\n");
357        output.push_str(&format!("proxy_bytes_received {}\n", bytes_in));
358
359        output.push_str("# HELP proxy_bytes_sent Total bytes sent to clients\n");
360        output.push_str("# TYPE proxy_bytes_sent counter\n");
361        output.push_str(&format!("proxy_bytes_sent {}\n", bytes_out));
362
363        output.push_str("# HELP proxy_response_time_seconds Average response time in seconds\n");
364        output.push_str("# TYPE proxy_response_time_seconds gauge\n");
365        output.push_str(&format!(
366            "proxy_response_time_seconds {}\n",
367            avg_response_time
368        ));
369
370        output.push_str("# HELP proxy_tls_connections_total Total number of TLS connections\n");
371        output.push_str("# TYPE proxy_tls_connections_total counter\n");
372        output.push_str(&format!("proxy_tls_connections_total {}\n", tls));
373
374        output.push_str("# HELP proxy_errors_total Total number of proxy errors\n");
375        output.push_str("# TYPE proxy_errors_total counter\n");
376        output.push_str(&format!("proxy_errors_total {}\n", errors));
377
378        output.push_str("# HELP proxy_response_status_codes_total HTTP response status codes\n");
379        output.push_str("# TYPE proxy_response_status_codes_total counter\n");
380        for (code, count) in status_entries.iter() {
381            output.push_str(&format!(
382                "proxy_response_status_codes_total{{code=\"{}\"}} {}\n",
383                code, count
384            ));
385        }
386
387        output
388    }
389}
390
391pub type SharedMetrics = Arc<Metrics>;
392
393pub fn new_metrics() -> SharedMetrics {
394    Arc::new(Metrics::new())
395}