1use std::fmt::Write;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::Arc;
28
29use super::http_connection_limiter::HttpConnectionLimiter;
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum HttpTransport {
33 Http,
34 Https,
35}
36
37impl HttpTransport {
38 fn label(self) -> &'static str {
39 match self {
40 HttpTransport::Http => "http",
41 HttpTransport::Https => "https",
42 }
43 }
44
45 fn index(self) -> usize {
46 match self {
47 HttpTransport::Http => 0,
48 HttpTransport::Https => 1,
49 }
50 }
51}
52
53#[derive(Debug, Clone, Copy)]
54pub enum HttpRejectReason {
55 CapExhausted,
56 HandlerTimeout,
57 PrincipalCapExhausted,
60}
61
62const REJECT_REASONS: usize = 3;
65
66impl HttpRejectReason {
67 fn label(self) -> &'static str {
68 match self {
69 HttpRejectReason::CapExhausted => "cap_exhausted",
70 HttpRejectReason::HandlerTimeout => "handler_timeout",
71 HttpRejectReason::PrincipalCapExhausted => "principal_cap_exhausted",
72 }
73 }
74
75 fn index(self) -> usize {
76 match self {
77 HttpRejectReason::CapExhausted => 0,
78 HttpRejectReason::HandlerTimeout => 1,
79 HttpRejectReason::PrincipalCapExhausted => 2,
80 }
81 }
82
83 fn all() -> [HttpRejectReason; REJECT_REASONS] {
85 [
86 HttpRejectReason::CapExhausted,
87 HttpRejectReason::HandlerTimeout,
88 HttpRejectReason::PrincipalCapExhausted,
89 ]
90 }
91}
92
93const DURATION_BUCKETS_SECONDS: [f64; 11] = [
97 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
98];
99
100#[derive(Debug)]
101struct TransportHistogram {
102 buckets: [AtomicU64; 11],
105 inf: AtomicU64,
106 sum_micros: AtomicU64,
109}
110
111impl TransportHistogram {
112 fn new() -> Self {
113 Self {
114 buckets: [
115 AtomicU64::new(0),
116 AtomicU64::new(0),
117 AtomicU64::new(0),
118 AtomicU64::new(0),
119 AtomicU64::new(0),
120 AtomicU64::new(0),
121 AtomicU64::new(0),
122 AtomicU64::new(0),
123 AtomicU64::new(0),
124 AtomicU64::new(0),
125 AtomicU64::new(0),
126 ],
127 inf: AtomicU64::new(0),
128 sum_micros: AtomicU64::new(0),
129 }
130 }
131
132 fn observe_seconds(&self, value: f64) {
133 let micros = (value * 1_000_000.0).round().clamp(0.0, u64::MAX as f64) as u64;
134 self.sum_micros.fetch_add(micros, Ordering::Relaxed);
135 self.inf.fetch_add(1, Ordering::Relaxed);
136 for (i, le) in DURATION_BUCKETS_SECONDS.iter().enumerate() {
137 if value <= *le {
138 self.buckets[i].fetch_add(1, Ordering::Relaxed);
139 }
140 }
141 }
142}
143
144#[derive(Debug)]
145struct Inner {
146 rejected: [[AtomicU64; REJECT_REASONS]; 2],
147 duration: [TransportHistogram; 2],
148}
149
150fn zeroed_reject_row() -> [AtomicU64; REJECT_REASONS] {
152 std::array::from_fn(|_| AtomicU64::new(0))
153}
154
155#[derive(Debug, Clone)]
156pub struct HttpHandlerMetrics {
157 inner: Arc<Inner>,
158}
159
160impl HttpHandlerMetrics {
161 pub fn new() -> Self {
162 Self {
163 inner: Arc::new(Inner {
164 rejected: [zeroed_reject_row(), zeroed_reject_row()],
165 duration: [TransportHistogram::new(), TransportHistogram::new()],
166 }),
167 }
168 }
169
170 pub fn record_reject(&self, transport: HttpTransport, reason: HttpRejectReason) {
171 self.inner.rejected[transport.index()][reason.index()].fetch_add(1, Ordering::Relaxed);
172 }
173
174 pub fn record_duration(&self, transport: HttpTransport, seconds: f64) {
175 if !seconds.is_finite() || seconds < 0.0 {
176 return;
177 }
178 self.inner.duration[transport.index()].observe_seconds(seconds);
179 }
180
181 pub fn rejected_count(&self, transport: HttpTransport, reason: HttpRejectReason) -> u64 {
182 self.inner.rejected[transport.index()][reason.index()].load(Ordering::Relaxed)
183 }
184
185 pub fn duration_sample_count(&self, transport: HttpTransport) -> u64 {
186 self.inner.duration[transport.index()]
187 .inf
188 .load(Ordering::Relaxed)
189 }
190
191 pub fn render(&self, body: &mut String, limiter: &HttpConnectionLimiter) {
197 let cap = limiter.cap();
198 let current = limiter.current();
199
200 let _ = writeln!(
206 body,
207 "# HELP http_active_handler_threads Live HTTP/HTTPS handler threads holding a limiter permit."
208 );
209 let _ = writeln!(body, "# TYPE http_active_handler_threads gauge");
210 let _ = writeln!(
211 body,
212 "http_active_handler_threads{{transport=\"http\"}} {}",
213 current
214 );
215 let _ = writeln!(
216 body,
217 "http_active_handler_threads{{transport=\"https\"}} {}",
218 current
219 );
220
221 let _ = writeln!(
223 body,
224 "# HELP http_handler_cap Configured maximum concurrent HTTP/HTTPS handler threads."
225 );
226 let _ = writeln!(body, "# TYPE http_handler_cap gauge");
227 let _ = writeln!(body, "http_handler_cap{{transport=\"http\"}} {}", cap);
228 let _ = writeln!(body, "http_handler_cap{{transport=\"https\"}} {}", cap);
229
230 let _ = writeln!(
232 body,
233 "# HELP http_handler_rejected_total HTTP/HTTPS handler rejections by reason since process start."
234 );
235 let _ = writeln!(body, "# TYPE http_handler_rejected_total counter");
236 for transport in [HttpTransport::Http, HttpTransport::Https] {
237 for reason in HttpRejectReason::all() {
238 let _ = writeln!(
239 body,
240 "http_handler_rejected_total{{transport=\"{}\",reason=\"{}\"}} {}",
241 transport.label(),
242 reason.label(),
243 self.rejected_count(transport, reason)
244 );
245 }
246 }
247
248 let _ = writeln!(
250 body,
251 "# HELP http_handler_duration_seconds Wall-clock handler duration per transport."
252 );
253 let _ = writeln!(body, "# TYPE http_handler_duration_seconds histogram");
254 for transport in [HttpTransport::Http, HttpTransport::Https] {
255 let hist = &self.inner.duration[transport.index()];
256 for (i, le) in DURATION_BUCKETS_SECONDS.iter().enumerate() {
257 let _ = writeln!(
258 body,
259 "http_handler_duration_seconds_bucket{{transport=\"{}\",le=\"{}\"}} {}",
260 transport.label(),
261 format_bucket_le(*le),
262 hist.buckets[i].load(Ordering::Relaxed)
263 );
264 }
265 let inf = hist.inf.load(Ordering::Relaxed);
266 let _ = writeln!(
267 body,
268 "http_handler_duration_seconds_bucket{{transport=\"{}\",le=\"+Inf\"}} {}",
269 transport.label(),
270 inf
271 );
272 let sum_secs = (hist.sum_micros.load(Ordering::Relaxed) as f64) / 1_000_000.0;
273 let _ = writeln!(
274 body,
275 "http_handler_duration_seconds_sum{{transport=\"{}\"}} {}",
276 transport.label(),
277 sum_secs
278 );
279 let _ = writeln!(
280 body,
281 "http_handler_duration_seconds_count{{transport=\"{}\"}} {}",
282 transport.label(),
283 inf
284 );
285 }
286 }
287}
288
289impl Default for HttpHandlerMetrics {
290 fn default() -> Self {
291 Self::new()
292 }
293}
294
295fn format_bucket_le(le: f64) -> String {
296 if le == le.trunc() && le.abs() < 1e16 {
300 format!("{le:.1}")
301 } else {
302 format!("{le}")
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309
310 #[test]
311 fn rejected_counters_isolated_by_label() {
312 let m = HttpHandlerMetrics::new();
313 m.record_reject(HttpTransport::Http, HttpRejectReason::CapExhausted);
314 m.record_reject(HttpTransport::Http, HttpRejectReason::CapExhausted);
315 m.record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
316 assert_eq!(
317 m.rejected_count(HttpTransport::Http, HttpRejectReason::CapExhausted),
318 2
319 );
320 assert_eq!(
321 m.rejected_count(HttpTransport::Http, HttpRejectReason::HandlerTimeout),
322 0
323 );
324 assert_eq!(
325 m.rejected_count(HttpTransport::Https, HttpRejectReason::HandlerTimeout),
326 1
327 );
328 assert_eq!(
329 m.rejected_count(HttpTransport::Https, HttpRejectReason::CapExhausted),
330 0
331 );
332 }
333
334 #[test]
335 fn duration_histogram_buckets_are_cumulative() {
336 let m = HttpHandlerMetrics::new();
337 m.record_duration(HttpTransport::Http, 0.003);
338 m.record_duration(HttpTransport::Http, 0.04);
339 m.record_duration(HttpTransport::Http, 3.0);
340 assert_eq!(m.duration_sample_count(HttpTransport::Http), 3);
341
342 let limiter = HttpConnectionLimiter::new(4);
343 let mut body = String::new();
344 m.render(&mut body, &limiter);
345
346 assert!(body
348 .contains("http_handler_duration_seconds_bucket{transport=\"http\",le=\"0.005\"} 1"));
349 assert!(
351 body.contains("http_handler_duration_seconds_bucket{transport=\"http\",le=\"0.05\"} 2")
352 );
353 assert!(
355 body.contains("http_handler_duration_seconds_bucket{transport=\"http\",le=\"+Inf\"} 3")
356 );
357 assert!(body
359 .contains("http_handler_duration_seconds_bucket{transport=\"https\",le=\"+Inf\"} 0"));
360 }
361
362 #[test]
363 fn render_includes_cap_and_current_from_limiter() {
364 let limiter = HttpConnectionLimiter::new(7);
365 let _p = limiter.try_acquire().unwrap();
366 let m = HttpHandlerMetrics::new();
367 let mut body = String::new();
368 m.render(&mut body, &limiter);
369 assert!(body.contains("http_handler_cap{transport=\"http\"} 7"));
370 assert!(body.contains("http_handler_cap{transport=\"https\"} 7"));
371 assert!(body.contains("http_active_handler_threads{transport=\"http\"} 1"));
372 assert!(body.contains("http_active_handler_threads{transport=\"https\"} 1"));
373 }
374
375 #[test]
376 fn render_emits_all_four_rejection_labels() {
377 let m = HttpHandlerMetrics::new();
378 let limiter = HttpConnectionLimiter::new(1);
379 let mut body = String::new();
380 m.render(&mut body, &limiter);
381 for transport in ["http", "https"] {
382 for reason in [
383 "cap_exhausted",
384 "handler_timeout",
385 "principal_cap_exhausted",
386 ] {
387 let expected = format!(
388 "http_handler_rejected_total{{transport=\"{transport}\",reason=\"{reason}\"}} 0"
389 );
390 assert!(body.contains(&expected), "missing line: {expected}");
391 }
392 }
393 }
394
395 #[test]
396 fn negative_or_nan_durations_are_ignored() {
397 let m = HttpHandlerMetrics::new();
398 m.record_duration(HttpTransport::Http, -1.0);
399 m.record_duration(HttpTransport::Http, f64::NAN);
400 m.record_duration(HttpTransport::Http, f64::INFINITY);
401 assert_eq!(m.duration_sample_count(HttpTransport::Http), 0);
402 }
403}