Skip to main content

grapsus_proxy/
metrics.rs

1//! Prometheus metrics endpoint for Grapsus proxy.
2//!
3//! This module provides:
4//! - An HTTP endpoint for Prometheus to scrape metrics
5//! - Integration with the UnifiedMetricsAggregator
6//! - Standard proxy metrics (requests, latencies, errors)
7//! - Agent pool metrics from v2 agents
8
9use pingora_http::ResponseHeader;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13use grapsus_agent_protocol::v2::{MetricsCollector, UnifiedMetricsAggregator};
14
15/// Metrics manager for the proxy.
16///
17/// This manages all proxy metrics and provides a Prometheus-compatible
18/// export endpoint.
19pub struct MetricsManager {
20    /// The unified metrics aggregator
21    aggregator: Arc<UnifiedMetricsAggregator>,
22    /// Whether metrics are enabled
23    enabled: bool,
24    /// Path for the metrics endpoint
25    path: String,
26    /// Allowed IP addresses for metrics access (empty = all allowed)
27    allowed_ips: Vec<String>,
28    /// Pool metrics collectors from v2 agents (agent_id -> collector)
29    pool_metrics: RwLock<HashMap<String, Arc<MetricsCollector>>>,
30}
31
32impl MetricsManager {
33    /// Create a new metrics manager.
34    pub fn new(service_name: impl Into<String>, instance_id: impl Into<String>) -> Self {
35        Self {
36            aggregator: Arc::new(UnifiedMetricsAggregator::new(service_name, instance_id)),
37            enabled: true,
38            path: "/metrics".to_string(),
39            allowed_ips: Vec::new(),
40            pool_metrics: RwLock::new(HashMap::new()),
41        }
42    }
43
44    /// Create from an existing aggregator.
45    pub fn with_aggregator(aggregator: Arc<UnifiedMetricsAggregator>) -> Self {
46        Self {
47            aggregator,
48            enabled: true,
49            path: "/metrics".to_string(),
50            allowed_ips: Vec::new(),
51            pool_metrics: RwLock::new(HashMap::new()),
52        }
53    }
54
55    /// Create from metrics configuration.
56    ///
57    /// Applies `enabled` and `path` from the config.
58    /// The `address` field determines which listener serves the metrics
59    /// endpoint but is handled at the listener level, not here.
60    pub fn from_config(
61        config: &grapsus_config::MetricsConfig,
62        service_name: impl Into<String>,
63        instance_id: impl Into<String>,
64    ) -> Self {
65        let mut manager = Self::new(service_name, instance_id);
66        manager.enabled = config.enabled;
67        manager.path = config.path.clone();
68        manager
69    }
70
71    /// Set the metrics endpoint path.
72    pub fn path(mut self, path: impl Into<String>) -> Self {
73        self.path = path.into();
74        self
75    }
76
77    /// Set allowed IPs for metrics access.
78    pub fn allowed_ips(mut self, ips: Vec<String>) -> Self {
79        self.allowed_ips = ips;
80        self
81    }
82
83    /// Disable metrics collection.
84    pub fn disable(mut self) -> Self {
85        self.enabled = false;
86        self
87    }
88
89    /// Check if metrics are enabled.
90    pub fn is_enabled(&self) -> bool {
91        self.enabled
92    }
93
94    /// Get the metrics path.
95    pub fn metrics_path(&self) -> &str {
96        &self.path
97    }
98
99    /// Get a reference to the aggregator.
100    pub fn aggregator(&self) -> &UnifiedMetricsAggregator {
101        &self.aggregator
102    }
103
104    /// Get an Arc to the aggregator.
105    pub fn aggregator_arc(&self) -> Arc<UnifiedMetricsAggregator> {
106        Arc::clone(&self.aggregator)
107    }
108
109    /// Check if an IP is allowed to access metrics.
110    pub fn is_ip_allowed(&self, ip: &str) -> bool {
111        if self.allowed_ips.is_empty() {
112            return true;
113        }
114        self.allowed_ips.iter().any(|allowed| allowed == ip)
115    }
116
117    /// Register a pool metrics collector for a v2 agent.
118    ///
119    /// Pool metrics will be included in the /metrics output.
120    pub async fn register_pool_metrics(
121        &self,
122        agent_id: impl Into<String>,
123        collector: Arc<MetricsCollector>,
124    ) {
125        self.pool_metrics
126            .write()
127            .await
128            .insert(agent_id.into(), collector);
129    }
130
131    /// Unregister a pool metrics collector.
132    pub async fn unregister_pool_metrics(&self, agent_id: &str) {
133        self.pool_metrics.write().await.remove(agent_id);
134    }
135
136    /// Handle a metrics request.
137    ///
138    /// Returns the Prometheus text format metrics body, including:
139    /// - Proxy metrics from the UnifiedMetricsAggregator
140    /// - Pool metrics from all registered v2 agent pools
141    pub fn handle_metrics_request(&self) -> MetricsResponse {
142        if !self.enabled {
143            return MetricsResponse::not_found();
144        }
145
146        // Export proxy metrics
147        let mut body = self.aggregator.export_prometheus();
148
149        // Append pool metrics from all registered v2 agents
150        // Use try_read to avoid blocking - if lock is held, skip pool metrics this scrape
151        if let Ok(pool_metrics) = self.pool_metrics.try_read() {
152            for (agent_id, collector) in pool_metrics.iter() {
153                let pool_output = collector.export_prometheus();
154                if !pool_output.is_empty() {
155                    // Add a comment separator for clarity
156                    body.push_str(&format!("\n# Agent pool metrics: {}\n", agent_id));
157                    body.push_str(&pool_output);
158                }
159            }
160        }
161
162        MetricsResponse::ok(body)
163    }
164
165    // -------------------------------------------------------------------------
166    // Convenience methods for recording proxy metrics
167    // -------------------------------------------------------------------------
168
169    /// Increment total requests counter.
170    pub fn inc_requests_total(&self, method: &str, status: u16, route: &str) {
171        let mut labels = HashMap::new();
172        labels.insert("method".to_string(), method.to_string());
173        labels.insert("status".to_string(), status.to_string());
174        labels.insert("route".to_string(), route.to_string());
175
176        self.aggregator.increment_counter(
177            "grapsus_requests_total",
178            "Total HTTP requests handled by the proxy",
179            labels,
180            1,
181        );
182    }
183
184    /// Record request duration.
185    pub fn observe_request_duration(&self, method: &str, route: &str, duration_secs: f64) {
186        let mut labels = HashMap::new();
187        labels.insert("method".to_string(), method.to_string());
188        labels.insert("route".to_string(), route.to_string());
189
190        // Standard latency buckets
191        let buckets = vec![
192            0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
193        ];
194
195        self.aggregator.observe_histogram(
196            "grapsus_request_duration_seconds",
197            "HTTP request duration in seconds",
198            labels,
199            &buckets,
200            duration_secs,
201        );
202    }
203
204    /// Set active connections gauge.
205    pub fn set_active_connections(&self, count: f64) {
206        self.aggregator.set_gauge(
207            "grapsus_active_connections",
208            "Number of active client connections",
209            HashMap::new(),
210            count,
211        );
212    }
213
214    /// Set active requests gauge.
215    pub fn set_active_requests(&self, count: f64) {
216        self.aggregator.set_gauge(
217            "grapsus_active_requests",
218            "Number of requests currently being processed",
219            HashMap::new(),
220            count,
221        );
222    }
223
224    /// Increment upstream requests.
225    pub fn inc_upstream_requests(&self, upstream: &str, status: u16, success: bool) {
226        let mut labels = HashMap::new();
227        labels.insert("upstream".to_string(), upstream.to_string());
228        labels.insert("status".to_string(), status.to_string());
229        labels.insert("success".to_string(), success.to_string());
230
231        self.aggregator.increment_counter(
232            "grapsus_upstream_requests_total",
233            "Total requests to upstream servers",
234            labels,
235            1,
236        );
237    }
238
239    /// Record upstream latency.
240    pub fn observe_upstream_duration(&self, upstream: &str, duration_secs: f64) {
241        let mut labels = HashMap::new();
242        labels.insert("upstream".to_string(), upstream.to_string());
243
244        let buckets = vec![
245            0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
246        ];
247
248        self.aggregator.observe_histogram(
249            "grapsus_upstream_duration_seconds",
250            "Time spent waiting for upstream response",
251            labels,
252            &buckets,
253            duration_secs,
254        );
255    }
256
257    /// Increment agent requests.
258    pub fn inc_agent_requests(&self, agent: &str, decision: &str) {
259        let mut labels = HashMap::new();
260        labels.insert("agent".to_string(), agent.to_string());
261        labels.insert("decision".to_string(), decision.to_string());
262
263        self.aggregator.increment_counter(
264            "grapsus_agent_requests_total",
265            "Total requests processed by agents",
266            labels,
267            1,
268        );
269    }
270
271    /// Record agent processing time.
272    pub fn observe_agent_duration(&self, agent: &str, duration_secs: f64) {
273        let mut labels = HashMap::new();
274        labels.insert("agent".to_string(), agent.to_string());
275
276        let buckets = vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0];
277
278        self.aggregator.observe_histogram(
279            "grapsus_agent_duration_seconds",
280            "Time spent processing request in agent",
281            labels,
282            &buckets,
283            duration_secs,
284        );
285    }
286
287    /// Increment circuit breaker trips.
288    pub fn inc_circuit_breaker_trips(&self, upstream: &str) {
289        let mut labels = HashMap::new();
290        labels.insert("upstream".to_string(), upstream.to_string());
291
292        self.aggregator.increment_counter(
293            "grapsus_circuit_breaker_trips_total",
294            "Number of times circuit breaker has tripped",
295            labels,
296            1,
297        );
298    }
299
300    /// Set circuit breaker state.
301    pub fn set_circuit_breaker_state(&self, upstream: &str, open: bool) {
302        let mut labels = HashMap::new();
303        labels.insert("upstream".to_string(), upstream.to_string());
304
305        self.aggregator.set_gauge(
306            "grapsus_circuit_breaker_open",
307            "Whether circuit breaker is open (1) or closed (0)",
308            labels,
309            if open { 1.0 } else { 0.0 },
310        );
311    }
312
313    /// Increment rate limited requests.
314    pub fn inc_rate_limited(&self, route: &str) {
315        let mut labels = HashMap::new();
316        labels.insert("route".to_string(), route.to_string());
317
318        self.aggregator.increment_counter(
319            "grapsus_rate_limited_total",
320            "Total requests rate limited",
321            labels,
322            1,
323        );
324    }
325
326    /// Increment cache hits/misses.
327    pub fn inc_cache_access(&self, hit: bool) {
328        let mut labels = HashMap::new();
329        labels.insert(
330            "result".to_string(),
331            if hit { "hit" } else { "miss" }.to_string(),
332        );
333
334        self.aggregator.increment_counter(
335            "grapsus_cache_accesses_total",
336            "Total cache accesses",
337            labels,
338            1,
339        );
340    }
341
342    /// Set cache size.
343    pub fn set_cache_size(&self, size_bytes: f64) {
344        self.aggregator.set_gauge(
345            "grapsus_cache_size_bytes",
346            "Current cache size in bytes",
347            HashMap::new(),
348            size_bytes,
349        );
350    }
351}
352
353/// Response for metrics requests.
354#[derive(Debug)]
355pub struct MetricsResponse {
356    /// HTTP status code
357    pub status: u16,
358    /// Content type
359    pub content_type: String,
360    /// Response body
361    pub body: String,
362}
363
364impl MetricsResponse {
365    /// Create a successful metrics response.
366    pub fn ok(body: String) -> Self {
367        Self {
368            status: 200,
369            content_type: "text/plain; version=0.0.4; charset=utf-8".to_string(),
370            body,
371        }
372    }
373
374    /// Create a 404 response.
375    pub fn not_found() -> Self {
376        Self {
377            status: 404,
378            content_type: "text/plain".to_string(),
379            body: "Metrics not found".to_string(),
380        }
381    }
382
383    /// Create a 403 response.
384    pub fn forbidden() -> Self {
385        Self {
386            status: 403,
387            content_type: "text/plain".to_string(),
388            body: "Forbidden".to_string(),
389        }
390    }
391
392    /// Convert to HTTP response header.
393    pub fn to_header(&self) -> ResponseHeader {
394        let mut header = ResponseHeader::build(self.status, Some(2)).unwrap();
395        header
396            .append_header("Content-Type", &self.content_type)
397            .ok();
398        header
399            .append_header("Content-Length", self.body.len().to_string())
400            .ok();
401        header
402    }
403}
404
405/// Standard metric names for Grapsus proxy.
406pub mod standard {
407    /// Total HTTP requests
408    pub const REQUESTS_TOTAL: &str = "grapsus_requests_total";
409    /// Request duration histogram
410    pub const REQUEST_DURATION: &str = "grapsus_request_duration_seconds";
411    /// Active connections gauge
412    pub const ACTIVE_CONNECTIONS: &str = "grapsus_active_connections";
413    /// Active requests gauge
414    pub const ACTIVE_REQUESTS: &str = "grapsus_active_requests";
415    /// Upstream requests total
416    pub const UPSTREAM_REQUESTS: &str = "grapsus_upstream_requests_total";
417    /// Upstream duration histogram
418    pub const UPSTREAM_DURATION: &str = "grapsus_upstream_duration_seconds";
419    /// Agent requests total
420    pub const AGENT_REQUESTS: &str = "grapsus_agent_requests_total";
421    /// Agent duration histogram
422    pub const AGENT_DURATION: &str = "grapsus_agent_duration_seconds";
423    /// Circuit breaker trips
424    pub const CIRCUIT_BREAKER_TRIPS: &str = "grapsus_circuit_breaker_trips_total";
425    /// Circuit breaker state
426    pub const CIRCUIT_BREAKER_OPEN: &str = "grapsus_circuit_breaker_open";
427    /// Rate limited requests
428    pub const RATE_LIMITED: &str = "grapsus_rate_limited_total";
429    /// Cache accesses
430    pub const CACHE_ACCESSES: &str = "grapsus_cache_accesses_total";
431    /// Cache size
432    pub const CACHE_SIZE: &str = "grapsus_cache_size_bytes";
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438
439    #[test]
440    fn test_metrics_manager_creation() {
441        let manager = MetricsManager::new("test-service", "node-1");
442        assert!(manager.is_enabled());
443        assert_eq!(manager.metrics_path(), "/metrics");
444    }
445
446    #[test]
447    fn test_metrics_manager_disabled() {
448        let manager = MetricsManager::new("test", "1").disable();
449        assert!(!manager.is_enabled());
450
451        let response = manager.handle_metrics_request();
452        assert_eq!(response.status, 404);
453    }
454
455    #[test]
456    fn test_metrics_manager_ip_filtering() {
457        let manager = MetricsManager::new("test", "1")
458            .allowed_ips(vec!["127.0.0.1".to_string(), "10.0.0.1".to_string()]);
459
460        assert!(manager.is_ip_allowed("127.0.0.1"));
461        assert!(manager.is_ip_allowed("10.0.0.1"));
462        assert!(!manager.is_ip_allowed("192.168.1.1"));
463    }
464
465    #[test]
466    fn test_metrics_manager_all_ips_allowed() {
467        let manager = MetricsManager::new("test", "1");
468
469        // Empty allowed_ips means all IPs are allowed
470        assert!(manager.is_ip_allowed("127.0.0.1"));
471        assert!(manager.is_ip_allowed("192.168.1.1"));
472        assert!(manager.is_ip_allowed("any-ip"));
473    }
474
475    #[test]
476    fn test_metrics_response() {
477        let manager = MetricsManager::new("test", "node-1");
478
479        // Record some metrics
480        manager.inc_requests_total("GET", 200, "/api/users");
481        manager.set_active_connections(42.0);
482
483        let response = manager.handle_metrics_request();
484        assert_eq!(response.status, 200);
485        assert!(response.content_type.contains("text/plain"));
486        assert!(response.body.contains("grapsus_requests_total"));
487        assert!(response.body.contains("grapsus_active_connections"));
488        assert!(response.body.contains("grapsus_info"));
489    }
490
491    #[test]
492    fn test_request_duration_histogram() {
493        let manager = MetricsManager::new("test", "1");
494
495        manager.observe_request_duration("GET", "/api", 0.05);
496        manager.observe_request_duration("GET", "/api", 0.15);
497        manager.observe_request_duration("GET", "/api", 0.5);
498
499        let response = manager.handle_metrics_request();
500        assert!(response
501            .body
502            .contains("grapsus_request_duration_seconds_bucket"));
503        assert!(response
504            .body
505            .contains("grapsus_request_duration_seconds_sum"));
506        assert!(response
507            .body
508            .contains("grapsus_request_duration_seconds_count"));
509        // Verify count is 3 (with labels, the format is {labels} 3)
510        assert!(response.body.contains("} 3\n") || response.body.contains(" 3\n"));
511    }
512
513    #[test]
514    fn test_custom_path() {
515        let manager = MetricsManager::new("test", "1").path("/internal/metrics");
516        assert_eq!(manager.metrics_path(), "/internal/metrics");
517    }
518
519    #[test]
520    fn test_upstream_metrics() {
521        let manager = MetricsManager::new("test", "1");
522
523        manager.inc_upstream_requests("backend-1", 200, true);
524        manager.observe_upstream_duration("backend-1", 0.1);
525
526        let response = manager.handle_metrics_request();
527        assert!(response.body.contains("grapsus_upstream_requests_total"));
528        assert!(response.body.contains("grapsus_upstream_duration_seconds"));
529    }
530
531    #[test]
532    fn test_agent_metrics() {
533        let manager = MetricsManager::new("test", "1");
534
535        manager.inc_agent_requests("waf", "allow");
536        manager.observe_agent_duration("waf", 0.005);
537
538        let response = manager.handle_metrics_request();
539        assert!(response.body.contains("grapsus_agent_requests_total"));
540        assert!(response.body.contains("grapsus_agent_duration_seconds"));
541    }
542}