Skip to main content

edgeguard/
metrics.rs

1//! Prometheus metrics, hand-rolled.
2//!
3//! A full metrics library (`prometheus`, `metrics`) would be a heavy dependency for the
4//! handful of series EdgeGuard exposes, so — in the same spirit as `parse_host_port` being a
5//! small URL parser rather than a full one — this is a minimal text-exposition renderer over
6//! a few atomics. It emits the Prometheus text format (v0.0.4) at `/__edgeguard/metrics`.
7//!
8//! The registry lives in [`crate::proxy::AppState`] *outside* the hot-swappable runtime, so
9//! counters survive a config hot-reload instead of resetting to zero.
10
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::Duration;
13
14/// Request `outcome` label values. These mirror the `outcome` field already emitted on the
15/// JSON access log in [`crate::proxy`], so a metric series lines up 1:1 with a log line.
16/// Anything not in this list is bucketed under `other` rather than silently dropped.
17const OUTCOMES: &[&str] = &[
18    "ok",
19    "rate_limited",
20    "over_quota",
21    "limiter_error",
22    "unauthorized",
23    "forbidden",
24    "method_not_allowed",
25    "not_found",
26    "payload_too_large",
27    "header_too_large",
28    "bad_gateway",
29    "upstream_error",
30    "upstream_timeout",
31    "upstream_body_too_large",
32    "upstream_body_error",
33    "other",
34];
35
36/// Rate-limit `scope` label values (which limiter rejected the request).
37const RL_SCOPES: &[&str] = &["ip", "route", "key"];
38
39/// WAF `rule` label values (which ruleset class matched). Custom `[[waf.rules]]` all roll up
40/// under `custom`; the specific rule id is in the log line, not the metric.
41const WAF_RULES: &[&str] = &["sqli", "xss", "path_traversal", "custom"];
42
43/// Upper bounds (seconds) for the request-duration histogram, plus an implicit `+Inf`.
44const LATENCY_BUCKETS: &[f64] = &[
45    0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
46];
47
48/// Process-wide metric registry. All methods take `&self` and use relaxed atomics — metrics
49/// are monotonic counters/observations where exact inter-thread ordering doesn't matter.
50pub struct Metrics {
51    /// One counter per [`OUTCOMES`] entry (parallel index).
52    requests: Vec<AtomicU64>,
53    /// One counter per [`RL_SCOPES`] entry (parallel index).
54    ratelimit_hits: Vec<AtomicU64>,
55    /// One counter per [`WAF_RULES`] entry (parallel index).
56    waf_hits: Vec<AtomicU64>,
57    /// Cumulative histogram buckets (parallel to [`LATENCY_BUCKETS`]): `bucket[i]` counts
58    /// observations with value <= `LATENCY_BUCKETS[i]`.
59    latency_buckets: Vec<AtomicU64>,
60    latency_sum_micros: AtomicU64,
61    latency_count: AtomicU64,
62    csp_reports: AtomicU64,
63    /// Drainable usage accumulators for managed-mode reporting (requests + bandwidth *since the
64    /// last drain*). Kept separate from the monotonic Prometheus counters above precisely because
65    /// the usage reporter resets these to zero each period — a Prometheus counter must not decrease.
66    usage_requests: AtomicU64,
67    usage_ingress_bytes: AtomicU64,
68    usage_egress_bytes: AtomicU64,
69}
70
71impl Default for Metrics {
72    fn default() -> Self {
73        Metrics {
74            requests: OUTCOMES.iter().map(|_| AtomicU64::new(0)).collect(),
75            ratelimit_hits: RL_SCOPES.iter().map(|_| AtomicU64::new(0)).collect(),
76            waf_hits: WAF_RULES.iter().map(|_| AtomicU64::new(0)).collect(),
77            latency_buckets: LATENCY_BUCKETS.iter().map(|_| AtomicU64::new(0)).collect(),
78            latency_sum_micros: AtomicU64::new(0),
79            latency_count: AtomicU64::new(0),
80            csp_reports: AtomicU64::new(0),
81            usage_requests: AtomicU64::new(0),
82            usage_ingress_bytes: AtomicU64::new(0),
83            usage_egress_bytes: AtomicU64::new(0),
84        }
85    }
86}
87
88impl Metrics {
89    pub fn new() -> Self {
90        Self::default()
91    }
92
93    /// Count one finished request under its `outcome` label.
94    pub fn record_request(&self, outcome: &str) {
95        let idx = OUTCOMES
96            .iter()
97            .position(|o| *o == outcome)
98            .unwrap_or(OUTCOMES.len() - 1); // -> "other"
99        self.requests[idx].fetch_add(1, Ordering::Relaxed);
100    }
101
102    /// Observe a request's end-to-end latency into the histogram.
103    pub fn observe_latency(&self, elapsed: Duration) {
104        let secs = elapsed.as_secs_f64();
105        for (i, bound) in LATENCY_BUCKETS.iter().enumerate() {
106            if secs <= *bound {
107                self.latency_buckets[i].fetch_add(1, Ordering::Relaxed);
108            }
109        }
110        self.latency_sum_micros
111            .fetch_add(elapsed.as_micros() as u64, Ordering::Relaxed);
112        self.latency_count.fetch_add(1, Ordering::Relaxed);
113    }
114
115    /// Count a rate-limit rejection by which limiter scope tripped (`ip`/`route`/`key`).
116    pub fn record_ratelimit_hit(&self, scope: &str) {
117        if let Some(idx) = RL_SCOPES.iter().position(|s| *s == scope) {
118            self.ratelimit_hits[idx].fetch_add(1, Ordering::Relaxed);
119        }
120    }
121
122    /// Count one WAF rule match by rule class (`sqli`/`xss`/`path_traversal`/`custom`).
123    /// Recorded for both report-only and blocking modes — so a report-first rollout is
124    /// visible — while a *blocked* request is additionally counted under the `forbidden`
125    /// request outcome.
126    pub fn record_waf_hit(&self, class: &str) {
127        if let Some(idx) = WAF_RULES.iter().position(|c| *c == class) {
128            self.waf_hits[idx].fetch_add(1, Ordering::Relaxed);
129        }
130    }
131
132    /// Count one received CSP violation report.
133    pub fn record_csp_report(&self) {
134        self.csp_reports.fetch_add(1, Ordering::Relaxed);
135    }
136
137    /// Count one request toward the drainable usage accumulator (managed mode). Called once per
138    /// request from the single `finish` exit, so every request — proxied or rejected — counts.
139    pub fn add_usage_request(&self) {
140        self.usage_requests.fetch_add(1, Ordering::Relaxed);
141    }
142
143    /// Add request (ingress) + response (egress) bytes to the drainable usage accumulator. Called
144    /// on the proxied path where both bodies are buffered and the counts are known.
145    pub fn add_usage_bytes(&self, ingress: usize, egress: usize) {
146        self.usage_ingress_bytes
147            .fetch_add(ingress as u64, Ordering::Relaxed);
148        self.usage_egress_bytes
149            .fetch_add(egress as u64, Ordering::Relaxed);
150    }
151
152    /// Atomically read-and-zero the usage accumulators, returning `(requests, ingress, egress)`
153    /// accrued since the last drain — the delta the usage reporter ships to the control plane.
154    pub fn drain_usage(&self) -> (u64, u64, u64) {
155        (
156            self.usage_requests.swap(0, Ordering::Relaxed),
157            self.usage_ingress_bytes.swap(0, Ordering::Relaxed),
158            self.usage_egress_bytes.swap(0, Ordering::Relaxed),
159        )
160    }
161
162    /// Add a previously-drained delta back, e.g. when a usage report failed to send — so the
163    /// next period reships it instead of losing billable usage. (New requests that arrived during
164    /// the failed send simply add on top, as intended.)
165    pub fn restore_usage(&self, requests: u64, ingress: u64, egress: u64) {
166        self.usage_requests.fetch_add(requests, Ordering::Relaxed);
167        self.usage_ingress_bytes
168            .fetch_add(ingress, Ordering::Relaxed);
169        self.usage_egress_bytes.fetch_add(egress, Ordering::Relaxed);
170    }
171
172    /// Render the Prometheus text exposition (format version 0.0.4).
173    pub fn render(&self) -> String {
174        let mut out = String::with_capacity(1024);
175
176        out.push_str("# HELP edgeguard_requests_total Total proxied requests by outcome.\n");
177        out.push_str("# TYPE edgeguard_requests_total counter\n");
178        for (i, label) in OUTCOMES.iter().enumerate() {
179            let v = self.requests[i].load(Ordering::Relaxed);
180            out.push_str(&format!(
181                "edgeguard_requests_total{{outcome=\"{label}\"}} {v}\n"
182            ));
183        }
184
185        out.push_str(
186            "# HELP edgeguard_ratelimit_hits_total Requests rejected by a rate limiter, by scope.\n",
187        );
188        out.push_str("# TYPE edgeguard_ratelimit_hits_total counter\n");
189        for (i, label) in RL_SCOPES.iter().enumerate() {
190            let v = self.ratelimit_hits[i].load(Ordering::Relaxed);
191            out.push_str(&format!(
192                "edgeguard_ratelimit_hits_total{{scope=\"{label}\"}} {v}\n"
193            ));
194        }
195
196        out.push_str(
197            "# HELP edgeguard_waf_hits_total WAF rule matches by class (report-only + blocked).\n",
198        );
199        out.push_str("# TYPE edgeguard_waf_hits_total counter\n");
200        for (i, label) in WAF_RULES.iter().enumerate() {
201            let v = self.waf_hits[i].load(Ordering::Relaxed);
202            out.push_str(&format!(
203                "edgeguard_waf_hits_total{{rule=\"{label}\"}} {v}\n"
204            ));
205        }
206
207        out.push_str("# HELP edgeguard_csp_reports_total CSP violation reports received.\n");
208        out.push_str("# TYPE edgeguard_csp_reports_total counter\n");
209        out.push_str(&format!(
210            "edgeguard_csp_reports_total {}\n",
211            self.csp_reports.load(Ordering::Relaxed)
212        ));
213
214        out.push_str(
215            "# HELP edgeguard_request_duration_seconds Request handling latency in seconds.\n",
216        );
217        out.push_str("# TYPE edgeguard_request_duration_seconds histogram\n");
218        for (i, bound) in LATENCY_BUCKETS.iter().enumerate() {
219            let v = self.latency_buckets[i].load(Ordering::Relaxed);
220            out.push_str(&format!(
221                "edgeguard_request_duration_seconds_bucket{{le=\"{bound}\"}} {v}\n"
222            ));
223        }
224        let count = self.latency_count.load(Ordering::Relaxed);
225        // The `+Inf` bucket equals the total observation count by definition.
226        out.push_str(&format!(
227            "edgeguard_request_duration_seconds_bucket{{le=\"+Inf\"}} {count}\n"
228        ));
229        let sum_secs = self.latency_sum_micros.load(Ordering::Relaxed) as f64 / 1_000_000.0;
230        out.push_str(&format!(
231            "edgeguard_request_duration_seconds_sum {sum_secs}\n"
232        ));
233        out.push_str(&format!(
234            "edgeguard_request_duration_seconds_count {count}\n"
235        ));
236
237        out
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244
245    #[test]
246    fn records_and_renders_request_outcomes() {
247        let m = Metrics::new();
248        m.record_request("ok");
249        m.record_request("ok");
250        m.record_request("rate_limited");
251        // An unknown outcome falls into the `other` bucket, not "ok".
252        m.record_request("totally_unknown");
253
254        let text = m.render();
255        assert!(
256            text.contains("edgeguard_requests_total{outcome=\"ok\"} 2"),
257            "{text}"
258        );
259        assert!(
260            text.contains("edgeguard_requests_total{outcome=\"rate_limited\"} 1"),
261            "{text}"
262        );
263        assert!(
264            text.contains("edgeguard_requests_total{outcome=\"other\"} 1"),
265            "{text}"
266        );
267    }
268
269    #[test]
270    fn latency_histogram_is_cumulative() {
271        let m = Metrics::new();
272        m.observe_latency(Duration::from_millis(3)); // <= 0.005
273        m.observe_latency(Duration::from_millis(40)); // <= 0.05
274        let text = m.render();
275        // 3ms falls under every bucket >= 0.005; 40ms under every bucket >= 0.05.
276        assert!(
277            text.contains("edgeguard_request_duration_seconds_bucket{le=\"0.005\"} 1"),
278            "{text}"
279        );
280        assert!(
281            text.contains("edgeguard_request_duration_seconds_bucket{le=\"0.05\"} 2"),
282            "{text}"
283        );
284        assert!(
285            text.contains("edgeguard_request_duration_seconds_bucket{le=\"+Inf\"} 2"),
286            "{text}"
287        );
288        assert!(
289            text.contains("edgeguard_request_duration_seconds_count 2"),
290            "{text}"
291        );
292    }
293
294    #[test]
295    fn ratelimit_and_csp_counters() {
296        let m = Metrics::new();
297        m.record_ratelimit_hit("ip");
298        m.record_ratelimit_hit("route");
299        m.record_ratelimit_hit("route");
300        m.record_csp_report();
301        let text = m.render();
302        assert!(
303            text.contains("edgeguard_ratelimit_hits_total{scope=\"ip\"} 1"),
304            "{text}"
305        );
306        assert!(
307            text.contains("edgeguard_ratelimit_hits_total{scope=\"route\"} 2"),
308            "{text}"
309        );
310        assert!(text.contains("edgeguard_csp_reports_total 1"), "{text}");
311    }
312
313    #[test]
314    fn usage_accumulates_drains_and_restores() {
315        let m = Metrics::new();
316        m.add_usage_request();
317        m.add_usage_request();
318        m.add_usage_bytes(100, 250);
319        m.add_usage_bytes(0, 50);
320        // Drain returns the accrued delta and zeroes the accumulator.
321        assert_eq!(m.drain_usage(), (2, 100, 300));
322        assert_eq!(m.drain_usage(), (0, 0, 0));
323        // Restore (failed-report path) re-adds it for the next period.
324        m.restore_usage(2, 100, 300);
325        assert_eq!(m.drain_usage(), (2, 100, 300));
326    }
327
328    #[test]
329    fn waf_hit_counters_by_class() {
330        let m = Metrics::new();
331        m.record_waf_hit("sqli");
332        m.record_waf_hit("sqli");
333        m.record_waf_hit("custom");
334        // An unknown class is ignored rather than miscounted.
335        m.record_waf_hit("totally_unknown");
336        let text = m.render();
337        assert!(
338            text.contains("edgeguard_waf_hits_total{rule=\"sqli\"} 2"),
339            "{text}"
340        );
341        assert!(
342            text.contains("edgeguard_waf_hits_total{rule=\"custom\"} 1"),
343            "{text}"
344        );
345        // A class that never fired still renders at 0.
346        assert!(
347            text.contains("edgeguard_waf_hits_total{rule=\"xss\"} 0"),
348            "{text}"
349        );
350    }
351}