Skip to main content

reddb_server/server/
http_handler_metrics.rs

1//! Prometheus metrics for the HTTP handler-thread pool.
2//!
3//! Slice 4 of issue #573 / parent #569. Exposes four series per
4//! ADR-0017 (Prometheus / Grafana adapters) so operators can observe
5//! saturation arriving before the cap is hit:
6//!
7//! - `http_active_handler_threads{transport}` — gauge, sourced from
8//!   `HttpConnectionLimiter::current()` at scrape time.
9//! - `http_handler_cap{transport}` — static gauge, sourced from the
10//!   limiter's cap. Same value for `http` and `https` since slice 3
11//!   collapsed both transports onto one cap.
12//! - `http_handler_rejected_total{transport, reason}` — counter,
13//!   incremented per 503 emitted by the accept-loop reject path
14//!   (`reason=cap_exhausted`) or per slice-2 deadline exit
15//!   (`reason=handler_timeout`).
16//! - `http_handler_duration_seconds{transport}` — histogram of total
17//!   handler wall-clock time, sampled on every handler exit
18//!   (happy-path and timeout). Buckets are the standard Prometheus
19//!   client default set so existing dashboards render quantiles via
20//!   `histogram_quantile` without configuration.
21//!
22//! Counters and histogram updates are plain `AtomicU64` operations on
23//! the hot path. No registry, no mutex.
24
25use std::fmt::Write;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::Arc;
28
29use super::http_connection_limiter::HttpConnectionLimiter;
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum HttpTransport {
33    Http,
34    Https,
35}
36
37impl HttpTransport {
38    fn label(self) -> &'static str {
39        match self {
40            HttpTransport::Http => "http",
41            HttpTransport::Https => "https",
42        }
43    }
44
45    fn index(self) -> usize {
46        match self {
47            HttpTransport::Http => 0,
48            HttpTransport::Https => 1,
49        }
50    }
51}
52
53#[derive(Debug, Clone, Copy)]
54pub enum HttpRejectReason {
55    CapExhausted,
56    HandlerTimeout,
57    /// Issue #934 — a principal hit its per-principal concurrent in-flight
58    /// cap (the global cap still had room; this caller did not).
59    PrincipalCapExhausted,
60}
61
62/// Number of distinct [`HttpRejectReason`] variants — array width for the
63/// per-transport rejection counters.
64const REJECT_REASONS: usize = 3;
65
66impl HttpRejectReason {
67    fn label(self) -> &'static str {
68        match self {
69            HttpRejectReason::CapExhausted => "cap_exhausted",
70            HttpRejectReason::HandlerTimeout => "handler_timeout",
71            HttpRejectReason::PrincipalCapExhausted => "principal_cap_exhausted",
72        }
73    }
74
75    fn index(self) -> usize {
76        match self {
77            HttpRejectReason::CapExhausted => 0,
78            HttpRejectReason::HandlerTimeout => 1,
79            HttpRejectReason::PrincipalCapExhausted => 2,
80        }
81    }
82
83    /// All variants, for render-time iteration.
84    fn all() -> [HttpRejectReason; REJECT_REASONS] {
85        [
86            HttpRejectReason::CapExhausted,
87            HttpRejectReason::HandlerTimeout,
88            HttpRejectReason::PrincipalCapExhausted,
89        ]
90    }
91}
92
93/// Prometheus client default histogram buckets, in seconds. Aligned
94/// with `prometheus.DefBuckets` so operator dashboards render
95/// `histogram_quantile` without per-deployment tuning.
96const DURATION_BUCKETS_SECONDS: [f64; 11] = [
97    0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
98];
99
100#[derive(Debug)]
101struct TransportHistogram {
102    /// One counter per bucket in `DURATION_BUCKETS_SECONDS`, plus a
103    /// trailing `+Inf` slot that holds the total sample count.
104    buckets: [AtomicU64; 11],
105    inf: AtomicU64,
106    /// Sum of observed durations in microseconds (we keep the sum in
107    /// `u64` and convert to seconds at render time).
108    sum_micros: AtomicU64,
109}
110
111impl TransportHistogram {
112    fn new() -> Self {
113        Self {
114            buckets: [
115                AtomicU64::new(0),
116                AtomicU64::new(0),
117                AtomicU64::new(0),
118                AtomicU64::new(0),
119                AtomicU64::new(0),
120                AtomicU64::new(0),
121                AtomicU64::new(0),
122                AtomicU64::new(0),
123                AtomicU64::new(0),
124                AtomicU64::new(0),
125                AtomicU64::new(0),
126            ],
127            inf: AtomicU64::new(0),
128            sum_micros: AtomicU64::new(0),
129        }
130    }
131
132    fn observe_seconds(&self, value: f64) {
133        let micros = (value * 1_000_000.0).round().clamp(0.0, u64::MAX as f64) as u64;
134        self.sum_micros.fetch_add(micros, Ordering::Relaxed);
135        self.inf.fetch_add(1, Ordering::Relaxed);
136        for (i, le) in DURATION_BUCKETS_SECONDS.iter().enumerate() {
137            if value <= *le {
138                self.buckets[i].fetch_add(1, Ordering::Relaxed);
139            }
140        }
141    }
142}
143
144#[derive(Debug)]
145struct Inner {
146    rejected: [[AtomicU64; REJECT_REASONS]; 2],
147    duration: [TransportHistogram; 2],
148}
149
150/// One fresh zeroed rejection-counter row (`REJECT_REASONS` wide).
151fn zeroed_reject_row() -> [AtomicU64; REJECT_REASONS] {
152    std::array::from_fn(|_| AtomicU64::new(0))
153}
154
155#[derive(Debug, Clone)]
156pub struct HttpHandlerMetrics {
157    inner: Arc<Inner>,
158}
159
160impl HttpHandlerMetrics {
161    pub fn new() -> Self {
162        Self {
163            inner: Arc::new(Inner {
164                rejected: [zeroed_reject_row(), zeroed_reject_row()],
165                duration: [TransportHistogram::new(), TransportHistogram::new()],
166            }),
167        }
168    }
169
170    pub fn record_reject(&self, transport: HttpTransport, reason: HttpRejectReason) {
171        self.inner.rejected[transport.index()][reason.index()].fetch_add(1, Ordering::Relaxed);
172    }
173
174    pub fn record_duration(&self, transport: HttpTransport, seconds: f64) {
175        if !seconds.is_finite() || seconds < 0.0 {
176            return;
177        }
178        self.inner.duration[transport.index()].observe_seconds(seconds);
179    }
180
181    pub fn rejected_count(&self, transport: HttpTransport, reason: HttpRejectReason) -> u64 {
182        self.inner.rejected[transport.index()][reason.index()].load(Ordering::Relaxed)
183    }
184
185    pub fn duration_sample_count(&self, transport: HttpTransport) -> u64 {
186        self.inner.duration[transport.index()]
187            .inf
188            .load(Ordering::Relaxed)
189    }
190
191    /// Render all four series in Prometheus text exposition format,
192    /// appending to `body`. Reads the live `current()` and `cap()` off
193    /// the supplied limiter; the limiter is the source of truth for
194    /// the two gauges so the metrics can't drift from the admission
195    /// path.
196    pub fn render(&self, body: &mut String, limiter: &HttpConnectionLimiter) {
197        let cap = limiter.cap();
198        let current = limiter.current();
199
200        // `http_active_handler_threads{transport}` — gauge.
201        // The clear-text and TLS accept loops share a single limiter
202        // (slice 3), so the live count is duplicated on both labels
203        // for dashboard ergonomics: an operator can graph the per-
204        // transport pane without special-casing the shared cap.
205        let _ = writeln!(
206            body,
207            "# HELP http_active_handler_threads Live HTTP/HTTPS handler threads holding a limiter permit."
208        );
209        let _ = writeln!(body, "# TYPE http_active_handler_threads gauge");
210        let _ = writeln!(
211            body,
212            "http_active_handler_threads{{transport=\"http\"}} {}",
213            current
214        );
215        let _ = writeln!(
216            body,
217            "http_active_handler_threads{{transport=\"https\"}} {}",
218            current
219        );
220
221        // `http_handler_cap{transport}` — static gauge.
222        let _ = writeln!(
223            body,
224            "# HELP http_handler_cap Configured maximum concurrent HTTP/HTTPS handler threads."
225        );
226        let _ = writeln!(body, "# TYPE http_handler_cap gauge");
227        let _ = writeln!(body, "http_handler_cap{{transport=\"http\"}} {}", cap);
228        let _ = writeln!(body, "http_handler_cap{{transport=\"https\"}} {}", cap);
229
230        // `http_handler_rejected_total{transport, reason}` — counter.
231        let _ = writeln!(
232            body,
233            "# HELP http_handler_rejected_total HTTP/HTTPS handler rejections by reason since process start."
234        );
235        let _ = writeln!(body, "# TYPE http_handler_rejected_total counter");
236        for transport in [HttpTransport::Http, HttpTransport::Https] {
237            for reason in HttpRejectReason::all() {
238                let _ = writeln!(
239                    body,
240                    "http_handler_rejected_total{{transport=\"{}\",reason=\"{}\"}} {}",
241                    transport.label(),
242                    reason.label(),
243                    self.rejected_count(transport, reason)
244                );
245            }
246        }
247
248        // `http_handler_duration_seconds{transport}` — histogram.
249        let _ = writeln!(
250            body,
251            "# HELP http_handler_duration_seconds Wall-clock handler duration per transport."
252        );
253        let _ = writeln!(body, "# TYPE http_handler_duration_seconds histogram");
254        for transport in [HttpTransport::Http, HttpTransport::Https] {
255            let hist = &self.inner.duration[transport.index()];
256            for (i, le) in DURATION_BUCKETS_SECONDS.iter().enumerate() {
257                let _ = writeln!(
258                    body,
259                    "http_handler_duration_seconds_bucket{{transport=\"{}\",le=\"{}\"}} {}",
260                    transport.label(),
261                    format_bucket_le(*le),
262                    hist.buckets[i].load(Ordering::Relaxed)
263                );
264            }
265            let inf = hist.inf.load(Ordering::Relaxed);
266            let _ = writeln!(
267                body,
268                "http_handler_duration_seconds_bucket{{transport=\"{}\",le=\"+Inf\"}} {}",
269                transport.label(),
270                inf
271            );
272            let sum_secs = (hist.sum_micros.load(Ordering::Relaxed) as f64) / 1_000_000.0;
273            let _ = writeln!(
274                body,
275                "http_handler_duration_seconds_sum{{transport=\"{}\"}} {}",
276                transport.label(),
277                sum_secs
278            );
279            let _ = writeln!(
280                body,
281                "http_handler_duration_seconds_count{{transport=\"{}\"}} {}",
282                transport.label(),
283                inf
284            );
285        }
286    }
287}
288
289impl Default for HttpHandlerMetrics {
290    fn default() -> Self {
291        Self::new()
292    }
293}
294
295fn format_bucket_le(le: f64) -> String {
296    // Match Prometheus's exposition formatting: trailing zeros are
297    // significant for canonical bucket labels, so we use the same
298    // shape that the upstream client library emits.
299    if le == le.trunc() && le.abs() < 1e16 {
300        format!("{le:.1}")
301    } else {
302        format!("{le}")
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309
310    #[test]
311    fn rejected_counters_isolated_by_label() {
312        let m = HttpHandlerMetrics::new();
313        m.record_reject(HttpTransport::Http, HttpRejectReason::CapExhausted);
314        m.record_reject(HttpTransport::Http, HttpRejectReason::CapExhausted);
315        m.record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
316        assert_eq!(
317            m.rejected_count(HttpTransport::Http, HttpRejectReason::CapExhausted),
318            2
319        );
320        assert_eq!(
321            m.rejected_count(HttpTransport::Http, HttpRejectReason::HandlerTimeout),
322            0
323        );
324        assert_eq!(
325            m.rejected_count(HttpTransport::Https, HttpRejectReason::HandlerTimeout),
326            1
327        );
328        assert_eq!(
329            m.rejected_count(HttpTransport::Https, HttpRejectReason::CapExhausted),
330            0
331        );
332    }
333
334    #[test]
335    fn duration_histogram_buckets_are_cumulative() {
336        let m = HttpHandlerMetrics::new();
337        m.record_duration(HttpTransport::Http, 0.003);
338        m.record_duration(HttpTransport::Http, 0.04);
339        m.record_duration(HttpTransport::Http, 3.0);
340        assert_eq!(m.duration_sample_count(HttpTransport::Http), 3);
341
342        let limiter = HttpConnectionLimiter::new(4);
343        let mut body = String::new();
344        m.render(&mut body, &limiter);
345
346        // `le="0.005"` includes only the 3ms sample (cumulative).
347        assert!(body
348            .contains("http_handler_duration_seconds_bucket{transport=\"http\",le=\"0.005\"} 1"));
349        // `le="0.05"` includes the 3ms + 40ms samples.
350        assert!(
351            body.contains("http_handler_duration_seconds_bucket{transport=\"http\",le=\"0.05\"} 2")
352        );
353        // `le="+Inf"` sees all 3 samples.
354        assert!(
355            body.contains("http_handler_duration_seconds_bucket{transport=\"http\",le=\"+Inf\"} 3")
356        );
357        // HTTPS labelset present but empty.
358        assert!(body
359            .contains("http_handler_duration_seconds_bucket{transport=\"https\",le=\"+Inf\"} 0"));
360    }
361
362    #[test]
363    fn render_includes_cap_and_current_from_limiter() {
364        let limiter = HttpConnectionLimiter::new(7);
365        let _p = limiter.try_acquire().unwrap();
366        let m = HttpHandlerMetrics::new();
367        let mut body = String::new();
368        m.render(&mut body, &limiter);
369        assert!(body.contains("http_handler_cap{transport=\"http\"} 7"));
370        assert!(body.contains("http_handler_cap{transport=\"https\"} 7"));
371        assert!(body.contains("http_active_handler_threads{transport=\"http\"} 1"));
372        assert!(body.contains("http_active_handler_threads{transport=\"https\"} 1"));
373    }
374
375    #[test]
376    fn render_emits_all_four_rejection_labels() {
377        let m = HttpHandlerMetrics::new();
378        let limiter = HttpConnectionLimiter::new(1);
379        let mut body = String::new();
380        m.render(&mut body, &limiter);
381        for transport in ["http", "https"] {
382            for reason in [
383                "cap_exhausted",
384                "handler_timeout",
385                "principal_cap_exhausted",
386            ] {
387                let expected = format!(
388                    "http_handler_rejected_total{{transport=\"{transport}\",reason=\"{reason}\"}} 0"
389                );
390                assert!(body.contains(&expected), "missing line: {expected}");
391            }
392        }
393    }
394
395    #[test]
396    fn negative_or_nan_durations_are_ignored() {
397        let m = HttpHandlerMetrics::new();
398        m.record_duration(HttpTransport::Http, -1.0);
399        m.record_duration(HttpTransport::Http, f64::NAN);
400        m.record_duration(HttpTransport::Http, f64::INFINITY);
401        assert_eq!(m.duration_sample_count(HttpTransport::Http), 0);
402    }
403}