Skip to main content

reddb_server/server/
http_handler_metrics.rs

1//! Prometheus metrics for the HTTP handler-thread pool.
2//!
3//! Slice 4 of issue #573 / parent #569. Exposes four series per
4//! ADR-0017 (Prometheus / Grafana adapters) so operators can observe
5//! saturation arriving before the cap is hit:
6//!
7//! - `http_active_handler_threads{transport}` — gauge, sourced from
8//!   `HttpConnectionLimiter::current()` at scrape time.
9//! - `http_handler_cap{transport}` — static gauge, sourced from the
10//!   limiter's cap. Same value for `http` and `https` since slice 3
11//!   collapsed both transports onto one cap.
12//! - `http_handler_rejected_total{transport, reason}` — counter,
13//!   incremented per 503 emitted by the accept-loop reject path
14//!   (`reason=cap_exhausted`) or per slice-2 deadline exit
15//!   (`reason=handler_timeout`).
16//! - `http_handler_duration_seconds{transport}` — histogram of total
17//!   handler wall-clock time, sampled on every handler exit
18//!   (happy-path and timeout). Buckets are the standard Prometheus
19//!   client default set so existing dashboards render quantiles via
20//!   `histogram_quantile` without configuration.
21//!
22//! Counters and histogram updates are plain `AtomicU64` operations on
23//! the hot path. No registry, no mutex.
24
25use std::fmt::Write;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::Arc;
28
29use super::http_connection_limiter::HttpConnectionLimiter;
30
31#[derive(Debug, Clone, Copy)]
32pub enum HttpTransport {
33    Http,
34    Https,
35}
36
37impl HttpTransport {
38    fn label(self) -> &'static str {
39        match self {
40            HttpTransport::Http => "http",
41            HttpTransport::Https => "https",
42        }
43    }
44
45    fn index(self) -> usize {
46        match self {
47            HttpTransport::Http => 0,
48            HttpTransport::Https => 1,
49        }
50    }
51}
52
53#[derive(Debug, Clone, Copy)]
54pub enum HttpRejectReason {
55    CapExhausted,
56    HandlerTimeout,
57}
58
59impl HttpRejectReason {
60    fn label(self) -> &'static str {
61        match self {
62            HttpRejectReason::CapExhausted => "cap_exhausted",
63            HttpRejectReason::HandlerTimeout => "handler_timeout",
64        }
65    }
66
67    fn index(self) -> usize {
68        match self {
69            HttpRejectReason::CapExhausted => 0,
70            HttpRejectReason::HandlerTimeout => 1,
71        }
72    }
73}
74
75/// Prometheus client default histogram buckets, in seconds. Aligned
76/// with `prometheus.DefBuckets` so operator dashboards render
77/// `histogram_quantile` without per-deployment tuning.
78const DURATION_BUCKETS_SECONDS: [f64; 11] = [
79    0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
80];
81
82#[derive(Debug)]
83struct TransportHistogram {
84    /// One counter per bucket in `DURATION_BUCKETS_SECONDS`, plus a
85    /// trailing `+Inf` slot that holds the total sample count.
86    buckets: [AtomicU64; 11],
87    inf: AtomicU64,
88    /// Sum of observed durations in microseconds (we keep the sum in
89    /// `u64` and convert to seconds at render time).
90    sum_micros: AtomicU64,
91}
92
93impl TransportHistogram {
94    fn new() -> Self {
95        Self {
96            buckets: [
97                AtomicU64::new(0),
98                AtomicU64::new(0),
99                AtomicU64::new(0),
100                AtomicU64::new(0),
101                AtomicU64::new(0),
102                AtomicU64::new(0),
103                AtomicU64::new(0),
104                AtomicU64::new(0),
105                AtomicU64::new(0),
106                AtomicU64::new(0),
107                AtomicU64::new(0),
108            ],
109            inf: AtomicU64::new(0),
110            sum_micros: AtomicU64::new(0),
111        }
112    }
113
114    fn observe_seconds(&self, value: f64) {
115        let micros = (value * 1_000_000.0).round().clamp(0.0, u64::MAX as f64) as u64;
116        self.sum_micros.fetch_add(micros, Ordering::Relaxed);
117        self.inf.fetch_add(1, Ordering::Relaxed);
118        for (i, le) in DURATION_BUCKETS_SECONDS.iter().enumerate() {
119            if value <= *le {
120                self.buckets[i].fetch_add(1, Ordering::Relaxed);
121            }
122        }
123    }
124}
125
126#[derive(Debug)]
127struct Inner {
128    rejected: [[AtomicU64; 2]; 2],
129    duration: [TransportHistogram; 2],
130}
131
132#[derive(Debug, Clone)]
133pub struct HttpHandlerMetrics {
134    inner: Arc<Inner>,
135}
136
137impl HttpHandlerMetrics {
138    pub fn new() -> Self {
139        Self {
140            inner: Arc::new(Inner {
141                rejected: [
142                    [AtomicU64::new(0), AtomicU64::new(0)],
143                    [AtomicU64::new(0), AtomicU64::new(0)],
144                ],
145                duration: [TransportHistogram::new(), TransportHistogram::new()],
146            }),
147        }
148    }
149
150    pub fn record_reject(&self, transport: HttpTransport, reason: HttpRejectReason) {
151        self.inner.rejected[transport.index()][reason.index()].fetch_add(1, Ordering::Relaxed);
152    }
153
154    pub fn record_duration(&self, transport: HttpTransport, seconds: f64) {
155        if !seconds.is_finite() || seconds < 0.0 {
156            return;
157        }
158        self.inner.duration[transport.index()].observe_seconds(seconds);
159    }
160
161    pub fn rejected_count(&self, transport: HttpTransport, reason: HttpRejectReason) -> u64 {
162        self.inner.rejected[transport.index()][reason.index()].load(Ordering::Relaxed)
163    }
164
165    pub fn duration_sample_count(&self, transport: HttpTransport) -> u64 {
166        self.inner.duration[transport.index()]
167            .inf
168            .load(Ordering::Relaxed)
169    }
170
171    /// Render all four series in Prometheus text exposition format,
172    /// appending to `body`. Reads the live `current()` and `cap()` off
173    /// the supplied limiter; the limiter is the source of truth for
174    /// the two gauges so the metrics can't drift from the admission
175    /// path.
176    pub fn render(&self, body: &mut String, limiter: &HttpConnectionLimiter) {
177        let cap = limiter.cap();
178        let current = limiter.current();
179
180        // `http_active_handler_threads{transport}` — gauge.
181        // The clear-text and TLS accept loops share a single limiter
182        // (slice 3), so the live count is duplicated on both labels
183        // for dashboard ergonomics: an operator can graph the per-
184        // transport pane without special-casing the shared cap.
185        let _ = writeln!(
186            body,
187            "# HELP http_active_handler_threads Live HTTP/HTTPS handler threads holding a limiter permit."
188        );
189        let _ = writeln!(body, "# TYPE http_active_handler_threads gauge");
190        let _ = writeln!(
191            body,
192            "http_active_handler_threads{{transport=\"http\"}} {}",
193            current
194        );
195        let _ = writeln!(
196            body,
197            "http_active_handler_threads{{transport=\"https\"}} {}",
198            current
199        );
200
201        // `http_handler_cap{transport}` — static gauge.
202        let _ = writeln!(
203            body,
204            "# HELP http_handler_cap Configured maximum concurrent HTTP/HTTPS handler threads."
205        );
206        let _ = writeln!(body, "# TYPE http_handler_cap gauge");
207        let _ = writeln!(body, "http_handler_cap{{transport=\"http\"}} {}", cap);
208        let _ = writeln!(body, "http_handler_cap{{transport=\"https\"}} {}", cap);
209
210        // `http_handler_rejected_total{transport, reason}` — counter.
211        let _ = writeln!(
212            body,
213            "# HELP http_handler_rejected_total HTTP/HTTPS handler rejections by reason since process start."
214        );
215        let _ = writeln!(body, "# TYPE http_handler_rejected_total counter");
216        for transport in [HttpTransport::Http, HttpTransport::Https] {
217            for reason in [
218                HttpRejectReason::CapExhausted,
219                HttpRejectReason::HandlerTimeout,
220            ] {
221                let _ = writeln!(
222                    body,
223                    "http_handler_rejected_total{{transport=\"{}\",reason=\"{}\"}} {}",
224                    transport.label(),
225                    reason.label(),
226                    self.rejected_count(transport, reason)
227                );
228            }
229        }
230
231        // `http_handler_duration_seconds{transport}` — histogram.
232        let _ = writeln!(
233            body,
234            "# HELP http_handler_duration_seconds Wall-clock handler duration per transport."
235        );
236        let _ = writeln!(body, "# TYPE http_handler_duration_seconds histogram");
237        for transport in [HttpTransport::Http, HttpTransport::Https] {
238            let hist = &self.inner.duration[transport.index()];
239            for (i, le) in DURATION_BUCKETS_SECONDS.iter().enumerate() {
240                let _ = writeln!(
241                    body,
242                    "http_handler_duration_seconds_bucket{{transport=\"{}\",le=\"{}\"}} {}",
243                    transport.label(),
244                    format_bucket_le(*le),
245                    hist.buckets[i].load(Ordering::Relaxed)
246                );
247            }
248            let inf = hist.inf.load(Ordering::Relaxed);
249            let _ = writeln!(
250                body,
251                "http_handler_duration_seconds_bucket{{transport=\"{}\",le=\"+Inf\"}} {}",
252                transport.label(),
253                inf
254            );
255            let sum_secs = (hist.sum_micros.load(Ordering::Relaxed) as f64) / 1_000_000.0;
256            let _ = writeln!(
257                body,
258                "http_handler_duration_seconds_sum{{transport=\"{}\"}} {}",
259                transport.label(),
260                sum_secs
261            );
262            let _ = writeln!(
263                body,
264                "http_handler_duration_seconds_count{{transport=\"{}\"}} {}",
265                transport.label(),
266                inf
267            );
268        }
269    }
270}
271
272impl Default for HttpHandlerMetrics {
273    fn default() -> Self {
274        Self::new()
275    }
276}
277
278fn format_bucket_le(le: f64) -> String {
279    // Match Prometheus's exposition formatting: trailing zeros are
280    // significant for canonical bucket labels, so we use the same
281    // shape that the upstream client library emits.
282    if le == le.trunc() && le.abs() < 1e16 {
283        format!("{le:.1}")
284    } else {
285        format!("{le}")
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292
293    #[test]
294    fn rejected_counters_isolated_by_label() {
295        let m = HttpHandlerMetrics::new();
296        m.record_reject(HttpTransport::Http, HttpRejectReason::CapExhausted);
297        m.record_reject(HttpTransport::Http, HttpRejectReason::CapExhausted);
298        m.record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
299        assert_eq!(
300            m.rejected_count(HttpTransport::Http, HttpRejectReason::CapExhausted),
301            2
302        );
303        assert_eq!(
304            m.rejected_count(HttpTransport::Http, HttpRejectReason::HandlerTimeout),
305            0
306        );
307        assert_eq!(
308            m.rejected_count(HttpTransport::Https, HttpRejectReason::HandlerTimeout),
309            1
310        );
311        assert_eq!(
312            m.rejected_count(HttpTransport::Https, HttpRejectReason::CapExhausted),
313            0
314        );
315    }
316
317    #[test]
318    fn duration_histogram_buckets_are_cumulative() {
319        let m = HttpHandlerMetrics::new();
320        m.record_duration(HttpTransport::Http, 0.003);
321        m.record_duration(HttpTransport::Http, 0.04);
322        m.record_duration(HttpTransport::Http, 3.0);
323        assert_eq!(m.duration_sample_count(HttpTransport::Http), 3);
324
325        let limiter = HttpConnectionLimiter::new(4);
326        let mut body = String::new();
327        m.render(&mut body, &limiter);
328
329        // `le="0.005"` includes only the 3ms sample (cumulative).
330        assert!(body.contains(
331            "http_handler_duration_seconds_bucket{transport=\"http\",le=\"0.005\"} 1"
332        ));
333        // `le="0.05"` includes the 3ms + 40ms samples.
334        assert!(body.contains(
335            "http_handler_duration_seconds_bucket{transport=\"http\",le=\"0.05\"} 2"
336        ));
337        // `le="+Inf"` sees all 3 samples.
338        assert!(body.contains(
339            "http_handler_duration_seconds_bucket{transport=\"http\",le=\"+Inf\"} 3"
340        ));
341        // HTTPS labelset present but empty.
342        assert!(body.contains(
343            "http_handler_duration_seconds_bucket{transport=\"https\",le=\"+Inf\"} 0"
344        ));
345    }
346
347    #[test]
348    fn render_includes_cap_and_current_from_limiter() {
349        let limiter = HttpConnectionLimiter::new(7);
350        let _p = limiter.try_acquire().unwrap();
351        let m = HttpHandlerMetrics::new();
352        let mut body = String::new();
353        m.render(&mut body, &limiter);
354        assert!(body.contains("http_handler_cap{transport=\"http\"} 7"));
355        assert!(body.contains("http_handler_cap{transport=\"https\"} 7"));
356        assert!(body.contains("http_active_handler_threads{transport=\"http\"} 1"));
357        assert!(body.contains("http_active_handler_threads{transport=\"https\"} 1"));
358    }
359
360    #[test]
361    fn render_emits_all_four_rejection_labels() {
362        let m = HttpHandlerMetrics::new();
363        let limiter = HttpConnectionLimiter::new(1);
364        let mut body = String::new();
365        m.render(&mut body, &limiter);
366        for transport in ["http", "https"] {
367            for reason in ["cap_exhausted", "handler_timeout"] {
368                let expected = format!(
369                    "http_handler_rejected_total{{transport=\"{transport}\",reason=\"{reason}\"}} 0"
370                );
371                assert!(body.contains(&expected), "missing line: {expected}");
372            }
373        }
374    }
375
376    #[test]
377    fn negative_or_nan_durations_are_ignored() {
378        let m = HttpHandlerMetrics::new();
379        m.record_duration(HttpTransport::Http, -1.0);
380        m.record_duration(HttpTransport::Http, f64::NAN);
381        m.record_duration(HttpTransport::Http, f64::INFINITY);
382        assert_eq!(m.duration_sample_count(HttpTransport::Http), 0);
383    }
384}