1use std::fmt::Write;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::Arc;
28
29use super::http_connection_limiter::HttpConnectionLimiter;
30
31#[derive(Debug, Clone, Copy)]
32pub enum HttpTransport {
33 Http,
34 Https,
35}
36
37impl HttpTransport {
38 fn label(self) -> &'static str {
39 match self {
40 HttpTransport::Http => "http",
41 HttpTransport::Https => "https",
42 }
43 }
44
45 fn index(self) -> usize {
46 match self {
47 HttpTransport::Http => 0,
48 HttpTransport::Https => 1,
49 }
50 }
51}
52
53#[derive(Debug, Clone, Copy)]
54pub enum HttpRejectReason {
55 CapExhausted,
56 HandlerTimeout,
57}
58
59impl HttpRejectReason {
60 fn label(self) -> &'static str {
61 match self {
62 HttpRejectReason::CapExhausted => "cap_exhausted",
63 HttpRejectReason::HandlerTimeout => "handler_timeout",
64 }
65 }
66
67 fn index(self) -> usize {
68 match self {
69 HttpRejectReason::CapExhausted => 0,
70 HttpRejectReason::HandlerTimeout => 1,
71 }
72 }
73}
74
75const DURATION_BUCKETS_SECONDS: [f64; 11] = [
79 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
80];
81
82#[derive(Debug)]
83struct TransportHistogram {
84 buckets: [AtomicU64; 11],
87 inf: AtomicU64,
88 sum_micros: AtomicU64,
91}
92
93impl TransportHistogram {
94 fn new() -> Self {
95 Self {
96 buckets: [
97 AtomicU64::new(0),
98 AtomicU64::new(0),
99 AtomicU64::new(0),
100 AtomicU64::new(0),
101 AtomicU64::new(0),
102 AtomicU64::new(0),
103 AtomicU64::new(0),
104 AtomicU64::new(0),
105 AtomicU64::new(0),
106 AtomicU64::new(0),
107 AtomicU64::new(0),
108 ],
109 inf: AtomicU64::new(0),
110 sum_micros: AtomicU64::new(0),
111 }
112 }
113
114 fn observe_seconds(&self, value: f64) {
115 let micros = (value * 1_000_000.0).round().clamp(0.0, u64::MAX as f64) as u64;
116 self.sum_micros.fetch_add(micros, Ordering::Relaxed);
117 self.inf.fetch_add(1, Ordering::Relaxed);
118 for (i, le) in DURATION_BUCKETS_SECONDS.iter().enumerate() {
119 if value <= *le {
120 self.buckets[i].fetch_add(1, Ordering::Relaxed);
121 }
122 }
123 }
124}
125
126#[derive(Debug)]
127struct Inner {
128 rejected: [[AtomicU64; 2]; 2],
129 duration: [TransportHistogram; 2],
130}
131
132#[derive(Debug, Clone)]
133pub struct HttpHandlerMetrics {
134 inner: Arc<Inner>,
135}
136
137impl HttpHandlerMetrics {
138 pub fn new() -> Self {
139 Self {
140 inner: Arc::new(Inner {
141 rejected: [
142 [AtomicU64::new(0), AtomicU64::new(0)],
143 [AtomicU64::new(0), AtomicU64::new(0)],
144 ],
145 duration: [TransportHistogram::new(), TransportHistogram::new()],
146 }),
147 }
148 }
149
150 pub fn record_reject(&self, transport: HttpTransport, reason: HttpRejectReason) {
151 self.inner.rejected[transport.index()][reason.index()].fetch_add(1, Ordering::Relaxed);
152 }
153
154 pub fn record_duration(&self, transport: HttpTransport, seconds: f64) {
155 if !seconds.is_finite() || seconds < 0.0 {
156 return;
157 }
158 self.inner.duration[transport.index()].observe_seconds(seconds);
159 }
160
161 pub fn rejected_count(&self, transport: HttpTransport, reason: HttpRejectReason) -> u64 {
162 self.inner.rejected[transport.index()][reason.index()].load(Ordering::Relaxed)
163 }
164
165 pub fn duration_sample_count(&self, transport: HttpTransport) -> u64 {
166 self.inner.duration[transport.index()]
167 .inf
168 .load(Ordering::Relaxed)
169 }
170
171 pub fn render(&self, body: &mut String, limiter: &HttpConnectionLimiter) {
177 let cap = limiter.cap();
178 let current = limiter.current();
179
180 let _ = writeln!(
186 body,
187 "# HELP http_active_handler_threads Live HTTP/HTTPS handler threads holding a limiter permit."
188 );
189 let _ = writeln!(body, "# TYPE http_active_handler_threads gauge");
190 let _ = writeln!(
191 body,
192 "http_active_handler_threads{{transport=\"http\"}} {}",
193 current
194 );
195 let _ = writeln!(
196 body,
197 "http_active_handler_threads{{transport=\"https\"}} {}",
198 current
199 );
200
201 let _ = writeln!(
203 body,
204 "# HELP http_handler_cap Configured maximum concurrent HTTP/HTTPS handler threads."
205 );
206 let _ = writeln!(body, "# TYPE http_handler_cap gauge");
207 let _ = writeln!(body, "http_handler_cap{{transport=\"http\"}} {}", cap);
208 let _ = writeln!(body, "http_handler_cap{{transport=\"https\"}} {}", cap);
209
210 let _ = writeln!(
212 body,
213 "# HELP http_handler_rejected_total HTTP/HTTPS handler rejections by reason since process start."
214 );
215 let _ = writeln!(body, "# TYPE http_handler_rejected_total counter");
216 for transport in [HttpTransport::Http, HttpTransport::Https] {
217 for reason in [
218 HttpRejectReason::CapExhausted,
219 HttpRejectReason::HandlerTimeout,
220 ] {
221 let _ = writeln!(
222 body,
223 "http_handler_rejected_total{{transport=\"{}\",reason=\"{}\"}} {}",
224 transport.label(),
225 reason.label(),
226 self.rejected_count(transport, reason)
227 );
228 }
229 }
230
231 let _ = writeln!(
233 body,
234 "# HELP http_handler_duration_seconds Wall-clock handler duration per transport."
235 );
236 let _ = writeln!(body, "# TYPE http_handler_duration_seconds histogram");
237 for transport in [HttpTransport::Http, HttpTransport::Https] {
238 let hist = &self.inner.duration[transport.index()];
239 for (i, le) in DURATION_BUCKETS_SECONDS.iter().enumerate() {
240 let _ = writeln!(
241 body,
242 "http_handler_duration_seconds_bucket{{transport=\"{}\",le=\"{}\"}} {}",
243 transport.label(),
244 format_bucket_le(*le),
245 hist.buckets[i].load(Ordering::Relaxed)
246 );
247 }
248 let inf = hist.inf.load(Ordering::Relaxed);
249 let _ = writeln!(
250 body,
251 "http_handler_duration_seconds_bucket{{transport=\"{}\",le=\"+Inf\"}} {}",
252 transport.label(),
253 inf
254 );
255 let sum_secs = (hist.sum_micros.load(Ordering::Relaxed) as f64) / 1_000_000.0;
256 let _ = writeln!(
257 body,
258 "http_handler_duration_seconds_sum{{transport=\"{}\"}} {}",
259 transport.label(),
260 sum_secs
261 );
262 let _ = writeln!(
263 body,
264 "http_handler_duration_seconds_count{{transport=\"{}\"}} {}",
265 transport.label(),
266 inf
267 );
268 }
269 }
270}
271
272impl Default for HttpHandlerMetrics {
273 fn default() -> Self {
274 Self::new()
275 }
276}
277
278fn format_bucket_le(le: f64) -> String {
279 if le == le.trunc() && le.abs() < 1e16 {
283 format!("{le:.1}")
284 } else {
285 format!("{le}")
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292
293 #[test]
294 fn rejected_counters_isolated_by_label() {
295 let m = HttpHandlerMetrics::new();
296 m.record_reject(HttpTransport::Http, HttpRejectReason::CapExhausted);
297 m.record_reject(HttpTransport::Http, HttpRejectReason::CapExhausted);
298 m.record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
299 assert_eq!(
300 m.rejected_count(HttpTransport::Http, HttpRejectReason::CapExhausted),
301 2
302 );
303 assert_eq!(
304 m.rejected_count(HttpTransport::Http, HttpRejectReason::HandlerTimeout),
305 0
306 );
307 assert_eq!(
308 m.rejected_count(HttpTransport::Https, HttpRejectReason::HandlerTimeout),
309 1
310 );
311 assert_eq!(
312 m.rejected_count(HttpTransport::Https, HttpRejectReason::CapExhausted),
313 0
314 );
315 }
316
317 #[test]
318 fn duration_histogram_buckets_are_cumulative() {
319 let m = HttpHandlerMetrics::new();
320 m.record_duration(HttpTransport::Http, 0.003);
321 m.record_duration(HttpTransport::Http, 0.04);
322 m.record_duration(HttpTransport::Http, 3.0);
323 assert_eq!(m.duration_sample_count(HttpTransport::Http), 3);
324
325 let limiter = HttpConnectionLimiter::new(4);
326 let mut body = String::new();
327 m.render(&mut body, &limiter);
328
329 assert!(body.contains(
331 "http_handler_duration_seconds_bucket{transport=\"http\",le=\"0.005\"} 1"
332 ));
333 assert!(body.contains(
335 "http_handler_duration_seconds_bucket{transport=\"http\",le=\"0.05\"} 2"
336 ));
337 assert!(body.contains(
339 "http_handler_duration_seconds_bucket{transport=\"http\",le=\"+Inf\"} 3"
340 ));
341 assert!(body.contains(
343 "http_handler_duration_seconds_bucket{transport=\"https\",le=\"+Inf\"} 0"
344 ));
345 }
346
347 #[test]
348 fn render_includes_cap_and_current_from_limiter() {
349 let limiter = HttpConnectionLimiter::new(7);
350 let _p = limiter.try_acquire().unwrap();
351 let m = HttpHandlerMetrics::new();
352 let mut body = String::new();
353 m.render(&mut body, &limiter);
354 assert!(body.contains("http_handler_cap{transport=\"http\"} 7"));
355 assert!(body.contains("http_handler_cap{transport=\"https\"} 7"));
356 assert!(body.contains("http_active_handler_threads{transport=\"http\"} 1"));
357 assert!(body.contains("http_active_handler_threads{transport=\"https\"} 1"));
358 }
359
360 #[test]
361 fn render_emits_all_four_rejection_labels() {
362 let m = HttpHandlerMetrics::new();
363 let limiter = HttpConnectionLimiter::new(1);
364 let mut body = String::new();
365 m.render(&mut body, &limiter);
366 for transport in ["http", "https"] {
367 for reason in ["cap_exhausted", "handler_timeout"] {
368 let expected = format!(
369 "http_handler_rejected_total{{transport=\"{transport}\",reason=\"{reason}\"}} 0"
370 );
371 assert!(body.contains(&expected), "missing line: {expected}");
372 }
373 }
374 }
375
376 #[test]
377 fn negative_or_nan_durations_are_ignored() {
378 let m = HttpHandlerMetrics::new();
379 m.record_duration(HttpTransport::Http, -1.0);
380 m.record_duration(HttpTransport::Http, f64::NAN);
381 m.record_duration(HttpTransport::Http, f64::INFINITY);
382 assert_eq!(m.duration_sample_count(HttpTransport::Http), 0);
383 }
384}