Skip to main content

soli_proxy/
metrics.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::sync::Arc;
4use std::time::Instant;
5
6#[derive(Debug, Clone, serde::Serialize)]
7pub struct AppMetricsJson {
8    pub requests: u64,
9    pub bytes_received: u64,
10    pub bytes_sent: u64,
11    pub avg_response_time_ms: f64,
12    pub errors: u64,
13}
14
15#[derive(Clone)]
16pub struct AppMetrics {
17    pub requests_total: Arc<AtomicU64>,
18    pub bytes_received: Arc<AtomicU64>,
19    pub bytes_sent: Arc<AtomicU64>,
20    pub response_time_nanos_sum: Arc<AtomicU64>,
21    pub response_time_count: Arc<AtomicU64>,
22    pub errors_total: Arc<AtomicU64>,
23}
24
25impl AppMetrics {
26    pub fn new() -> Self {
27        Self {
28            requests_total: Arc::new(AtomicU64::new(0)),
29            bytes_received: Arc::new(AtomicU64::new(0)),
30            bytes_sent: Arc::new(AtomicU64::new(0)),
31            response_time_nanos_sum: Arc::new(AtomicU64::new(0)),
32            response_time_count: Arc::new(AtomicU64::new(0)),
33            errors_total: Arc::new(AtomicU64::new(0)),
34        }
35    }
36}
37
38impl Default for AppMetrics {
39    fn default() -> Self {
40        Self::new()
41    }
42}
43
44/// Status code array size: covers HTTP codes 100-599
45const STATUS_ARRAY_SIZE: usize = 512;
46
47#[derive(Clone)]
48pub struct Metrics {
49    requests_total: Arc<AtomicU64>,
50    requests_in_flight: Arc<AtomicUsize>,
51    bytes_received: Arc<AtomicU64>,
52    bytes_sent: Arc<AtomicU64>,
53    response_time_nanos_sum: Arc<AtomicU64>,
54    response_time_count: Arc<AtomicU64>,
55    status_codes: Arc<[AtomicU64; STATUS_ARRAY_SIZE]>,
56    tls_connections: Arc<AtomicU64>,
57    errors_total: Arc<AtomicU64>,
58    last_request_nanos: Arc<AtomicU64>,
59    epoch_start: Instant,
60    app_metrics: Arc<parking_lot::RwLock<HashMap<String, AppMetrics>>>,
61}
62
63impl Default for Metrics {
64    fn default() -> Self {
65        Self::new()
66    }
67}
68
69impl Metrics {
70    pub fn new() -> Self {
71        Self {
72            requests_total: Arc::new(AtomicU64::new(0)),
73            requests_in_flight: Arc::new(AtomicUsize::new(0)),
74            bytes_received: Arc::new(AtomicU64::new(0)),
75            bytes_sent: Arc::new(AtomicU64::new(0)),
76            response_time_nanos_sum: Arc::new(AtomicU64::new(0)),
77            response_time_count: Arc::new(AtomicU64::new(0)),
78            status_codes: Arc::new(std::array::from_fn(|_| AtomicU64::new(0))),
79            tls_connections: Arc::new(AtomicU64::new(0)),
80            errors_total: Arc::new(AtomicU64::new(0)),
81            last_request_nanos: Arc::new(AtomicU64::new(0)),
82            epoch_start: Instant::now(),
83            app_metrics: Arc::new(parking_lot::RwLock::new(HashMap::new())),
84        }
85    }
86
87    pub fn get_app_metrics(&self, app_name: &str) -> Option<AppMetricsJson> {
88        let apps = self.app_metrics.read();
89        apps.get(app_name).map(|m| AppMetricsJson {
90            requests: m.requests_total.load(Ordering::Relaxed),
91            bytes_received: m.bytes_received.load(Ordering::Relaxed),
92            bytes_sent: m.bytes_sent.load(Ordering::Relaxed),
93            avg_response_time_ms: {
94                let count = m.response_time_count.load(Ordering::Relaxed);
95                if count == 0 {
96                    0.0
97                } else {
98                    let sum = m.response_time_nanos_sum.load(Ordering::Relaxed);
99                    (sum as f64) / (count as f64) / 1_000_000.0
100                }
101            },
102            errors: m.errors_total.load(Ordering::Relaxed),
103        })
104    }
105
106    pub fn get_all_app_metrics(&self) -> HashMap<String, AppMetricsJson> {
107        let apps = self.app_metrics.read();
108        apps.iter()
109            .map(|(name, m)| {
110                (
111                    name.clone(),
112                    AppMetricsJson {
113                        requests: m.requests_total.load(Ordering::Relaxed),
114                        bytes_received: m.bytes_received.load(Ordering::Relaxed),
115                        bytes_sent: m.bytes_sent.load(Ordering::Relaxed),
116                        avg_response_time_ms: {
117                            let count = m.response_time_count.load(Ordering::Relaxed);
118                            if count == 0 {
119                                0.0
120                            } else {
121                                let sum = m.response_time_nanos_sum.load(Ordering::Relaxed);
122                                (sum as f64) / (count as f64) / 1_000_000.0
123                            }
124                        },
125                        errors: m.errors_total.load(Ordering::Relaxed),
126                    },
127                )
128            })
129            .collect()
130    }
131
132    pub fn record_request(
133        &self,
134        bytes_in: u64,
135        bytes_out: u64,
136        status: u16,
137        duration: std::time::Duration,
138    ) {
139        self.requests_total.fetch_add(1, Ordering::Relaxed);
140        self.bytes_received.fetch_add(bytes_in, Ordering::Relaxed);
141        self.bytes_sent.fetch_add(bytes_out, Ordering::Relaxed);
142
143        // Lock-free EWMA: accumulate nanos sum and count
144        self.response_time_nanos_sum
145            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
146        self.response_time_count.fetch_add(1, Ordering::Relaxed);
147
148        // Lock-free status code: index by (code - 100), bounds-checked
149        if status >= 100 && (status as usize - 100) < STATUS_ARRAY_SIZE {
150            self.status_codes[status as usize - 100].fetch_add(1, Ordering::Relaxed);
151        }
152
153        // Lock-free last request time
154        let nanos = self.epoch_start.elapsed().as_nanos() as u64;
155        self.last_request_nanos.store(nanos, Ordering::Relaxed);
156    }
157
158    pub fn record_app_request(
159        &self,
160        app_name: &str,
161        bytes_in: u64,
162        bytes_out: u64,
163        status: u16,
164        duration: std::time::Duration,
165    ) {
166        let success = (200..400).contains(&status);
167        self.record_app_request_with_success(app_name, bytes_in, bytes_out, duration, success);
168    }
169
170    pub fn record_app_request_with_success(
171        &self,
172        app_name: &str,
173        bytes_in: u64,
174        bytes_out: u64,
175        duration: std::time::Duration,
176        success: bool,
177    ) {
178        let app_name = app_name.to_string();
179        {
180            let mut apps = self.app_metrics.write();
181            apps.entry(app_name.clone()).or_default();
182        }
183
184        let app_metrics = {
185            let apps = self.app_metrics.read();
186            apps.get(&app_name).cloned()
187        };
188
189        if let Some(metrics) = app_metrics {
190            metrics.requests_total.fetch_add(1, Ordering::Relaxed);
191            metrics
192                .bytes_received
193                .fetch_add(bytes_in, Ordering::Relaxed);
194            metrics.bytes_sent.fetch_add(bytes_out, Ordering::Relaxed);
195            metrics
196                .response_time_nanos_sum
197                .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
198            metrics.response_time_count.fetch_add(1, Ordering::Relaxed);
199            if !success {
200                metrics.errors_total.fetch_add(1, Ordering::Relaxed);
201            }
202        }
203    }
204
205    pub fn inc_in_flight(&self) {
206        self.requests_in_flight.fetch_add(1, Ordering::Relaxed);
207    }
208
209    pub fn dec_in_flight(&self) {
210        self.requests_in_flight.fetch_sub(1, Ordering::Relaxed);
211    }
212
213    pub fn inc_tls_connections(&self) {
214        self.tls_connections.fetch_add(1, Ordering::Relaxed);
215    }
216
217    pub fn inc_errors(&self) {
218        self.errors_total.fetch_add(1, Ordering::Relaxed);
219    }
220
221    pub fn format_metrics(&self) -> String {
222        let requests = self.requests_total.load(Ordering::Relaxed);
223        let in_flight = self.requests_in_flight.load(Ordering::Relaxed);
224        let bytes_in = self.bytes_received.load(Ordering::Relaxed);
225        let bytes_out = self.bytes_sent.load(Ordering::Relaxed);
226        let tls = self.tls_connections.load(Ordering::Relaxed);
227        let errors = self.errors_total.load(Ordering::Relaxed);
228
229        let avg_response_time = {
230            let count = self.response_time_count.load(Ordering::Relaxed);
231            if count == 0 {
232                0.0
233            } else {
234                let sum = self.response_time_nanos_sum.load(Ordering::Relaxed);
235                (sum as f64) / (count as f64) / 1_000_000_000.0
236            }
237        };
238
239        // Collect non-zero status codes from the array
240        let mut status_entries: Vec<(u16, u64)> = Vec::new();
241        for i in 0..STATUS_ARRAY_SIZE {
242            let count = self.status_codes[i].load(Ordering::Relaxed);
243            if count > 0 {
244                status_entries.push(((i + 100) as u16, count));
245            }
246        }
247
248        let mut output = String::new();
249        output.push_str("# HELP proxy_requests_total Total number of HTTP requests\n");
250        output.push_str("# TYPE proxy_requests_total counter\n");
251        output.push_str(&format!("proxy_requests_total {}\n", requests));
252
253        output.push_str(
254            "# HELP proxy_requests_in_flight Number of requests currently being processed\n",
255        );
256        output.push_str("# TYPE proxy_requests_in_flight gauge\n");
257        output.push_str(&format!("proxy_requests_in_flight {}\n", in_flight));
258
259        output.push_str("# HELP proxy_bytes_received Total bytes received from clients\n");
260        output.push_str("# TYPE proxy_bytes_received counter\n");
261        output.push_str(&format!("proxy_bytes_received {}\n", bytes_in));
262
263        output.push_str("# HELP proxy_bytes_sent Total bytes sent to clients\n");
264        output.push_str("# TYPE proxy_bytes_sent counter\n");
265        output.push_str(&format!("proxy_bytes_sent {}\n", bytes_out));
266
267        output.push_str("# HELP proxy_response_time_seconds Average response time in seconds\n");
268        output.push_str("# TYPE proxy_response_time_seconds gauge\n");
269        output.push_str(&format!(
270            "proxy_response_time_seconds {}\n",
271            avg_response_time
272        ));
273
274        output.push_str("# HELP proxy_tls_connections_total Total number of TLS connections\n");
275        output.push_str("# TYPE proxy_tls_connections_total counter\n");
276        output.push_str(&format!("proxy_tls_connections_total {}\n", tls));
277
278        output.push_str("# HELP proxy_errors_total Total number of proxy errors\n");
279        output.push_str("# TYPE proxy_errors_total counter\n");
280        output.push_str(&format!("proxy_errors_total {}\n", errors));
281
282        output.push_str("# HELP proxy_response_status_codes_total HTTP response status codes\n");
283        output.push_str("# TYPE proxy_response_status_codes_total counter\n");
284        for (code, count) in status_entries.iter() {
285            output.push_str(&format!(
286                "proxy_response_status_codes_total{{code=\"{}\"}} {}\n",
287                code, count
288            ));
289        }
290
291        output
292    }
293}
294
295pub type SharedMetrics = Arc<Metrics>;
296
297pub fn new_metrics() -> SharedMetrics {
298    Arc::new(Metrics::new())
299}