sentinel_proxy/
metrics.rs

1//! Prometheus metrics endpoint for Sentinel proxy.
2//!
3//! This module provides:
4//! - An HTTP endpoint for Prometheus to scrape metrics
5//! - Integration with the UnifiedMetricsAggregator
6//! - Standard proxy metrics (requests, latencies, errors)
7//! - Agent pool metrics from v2 agents
8
9use pingora_http::ResponseHeader;
10use sentinel_agent_protocol::v2::{MetricsCollector, UnifiedMetricsAggregator};
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15/// Metrics manager for the proxy.
16///
17/// This manages all proxy metrics and provides a Prometheus-compatible
18/// export endpoint.
19pub struct MetricsManager {
20    /// The unified metrics aggregator
21    aggregator: Arc<UnifiedMetricsAggregator>,
22    /// Whether metrics are enabled
23    enabled: bool,
24    /// Path for the metrics endpoint
25    path: String,
26    /// Allowed IP addresses for metrics access (empty = all allowed)
27    allowed_ips: Vec<String>,
28    /// Pool metrics collectors from v2 agents (agent_id -> collector)
29    pool_metrics: RwLock<HashMap<String, Arc<MetricsCollector>>>,
30}
31
32impl MetricsManager {
33    /// Create a new metrics manager.
34    pub fn new(service_name: impl Into<String>, instance_id: impl Into<String>) -> Self {
35        Self {
36            aggregator: Arc::new(UnifiedMetricsAggregator::new(service_name, instance_id)),
37            enabled: true,
38            path: "/metrics".to_string(),
39            allowed_ips: Vec::new(),
40            pool_metrics: RwLock::new(HashMap::new()),
41        }
42    }
43
44    /// Create from an existing aggregator.
45    pub fn with_aggregator(aggregator: Arc<UnifiedMetricsAggregator>) -> Self {
46        Self {
47            aggregator,
48            enabled: true,
49            path: "/metrics".to_string(),
50            allowed_ips: Vec::new(),
51            pool_metrics: RwLock::new(HashMap::new()),
52        }
53    }
54
55    /// Set the metrics endpoint path.
56    pub fn path(mut self, path: impl Into<String>) -> Self {
57        self.path = path.into();
58        self
59    }
60
61    /// Set allowed IPs for metrics access.
62    pub fn allowed_ips(mut self, ips: Vec<String>) -> Self {
63        self.allowed_ips = ips;
64        self
65    }
66
67    /// Disable metrics collection.
68    pub fn disable(mut self) -> Self {
69        self.enabled = false;
70        self
71    }
72
73    /// Check if metrics are enabled.
74    pub fn is_enabled(&self) -> bool {
75        self.enabled
76    }
77
78    /// Get the metrics path.
79    pub fn metrics_path(&self) -> &str {
80        &self.path
81    }
82
83    /// Get a reference to the aggregator.
84    pub fn aggregator(&self) -> &UnifiedMetricsAggregator {
85        &self.aggregator
86    }
87
88    /// Get an Arc to the aggregator.
89    pub fn aggregator_arc(&self) -> Arc<UnifiedMetricsAggregator> {
90        Arc::clone(&self.aggregator)
91    }
92
93    /// Check if an IP is allowed to access metrics.
94    pub fn is_ip_allowed(&self, ip: &str) -> bool {
95        if self.allowed_ips.is_empty() {
96            return true;
97        }
98        self.allowed_ips.iter().any(|allowed| allowed == ip)
99    }
100
101    /// Register a pool metrics collector for a v2 agent.
102    ///
103    /// Pool metrics will be included in the /metrics output.
104    pub async fn register_pool_metrics(&self, agent_id: impl Into<String>, collector: Arc<MetricsCollector>) {
105        self.pool_metrics.write().await.insert(agent_id.into(), collector);
106    }
107
108    /// Unregister a pool metrics collector.
109    pub async fn unregister_pool_metrics(&self, agent_id: &str) {
110        self.pool_metrics.write().await.remove(agent_id);
111    }
112
113    /// Handle a metrics request.
114    ///
115    /// Returns the Prometheus text format metrics body, including:
116    /// - Proxy metrics from the UnifiedMetricsAggregator
117    /// - Pool metrics from all registered v2 agent pools
118    pub fn handle_metrics_request(&self) -> MetricsResponse {
119        if !self.enabled {
120            return MetricsResponse::not_found();
121        }
122
123        // Export proxy metrics
124        let mut body = self.aggregator.export_prometheus();
125
126        // Append pool metrics from all registered v2 agents
127        // Use try_read to avoid blocking - if lock is held, skip pool metrics this scrape
128        if let Ok(pool_metrics) = self.pool_metrics.try_read() {
129            for (agent_id, collector) in pool_metrics.iter() {
130                let pool_output = collector.export_prometheus();
131                if !pool_output.is_empty() {
132                    // Add a comment separator for clarity
133                    body.push_str(&format!("\n# Agent pool metrics: {}\n", agent_id));
134                    body.push_str(&pool_output);
135                }
136            }
137        }
138
139        MetricsResponse::ok(body)
140    }
141
142    // -------------------------------------------------------------------------
143    // Convenience methods for recording proxy metrics
144    // -------------------------------------------------------------------------
145
146    /// Increment total requests counter.
147    pub fn inc_requests_total(&self, method: &str, status: u16, route: &str) {
148        let mut labels = HashMap::new();
149        labels.insert("method".to_string(), method.to_string());
150        labels.insert("status".to_string(), status.to_string());
151        labels.insert("route".to_string(), route.to_string());
152
153        self.aggregator.increment_counter(
154            "sentinel_requests_total",
155            "Total HTTP requests handled by the proxy",
156            labels,
157            1,
158        );
159    }
160
161    /// Record request duration.
162    pub fn observe_request_duration(&self, method: &str, route: &str, duration_secs: f64) {
163        let mut labels = HashMap::new();
164        labels.insert("method".to_string(), method.to_string());
165        labels.insert("route".to_string(), route.to_string());
166
167        // Standard latency buckets
168        let buckets = vec![
169            0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
170        ];
171
172        self.aggregator.observe_histogram(
173            "sentinel_request_duration_seconds",
174            "HTTP request duration in seconds",
175            labels,
176            &buckets,
177            duration_secs,
178        );
179    }
180
181    /// Set active connections gauge.
182    pub fn set_active_connections(&self, count: f64) {
183        self.aggregator.set_gauge(
184            "sentinel_active_connections",
185            "Number of active client connections",
186            HashMap::new(),
187            count,
188        );
189    }
190
191    /// Set active requests gauge.
192    pub fn set_active_requests(&self, count: f64) {
193        self.aggregator.set_gauge(
194            "sentinel_active_requests",
195            "Number of requests currently being processed",
196            HashMap::new(),
197            count,
198        );
199    }
200
201    /// Increment upstream requests.
202    pub fn inc_upstream_requests(&self, upstream: &str, status: u16, success: bool) {
203        let mut labels = HashMap::new();
204        labels.insert("upstream".to_string(), upstream.to_string());
205        labels.insert("status".to_string(), status.to_string());
206        labels.insert("success".to_string(), success.to_string());
207
208        self.aggregator.increment_counter(
209            "sentinel_upstream_requests_total",
210            "Total requests to upstream servers",
211            labels,
212            1,
213        );
214    }
215
216    /// Record upstream latency.
217    pub fn observe_upstream_duration(&self, upstream: &str, duration_secs: f64) {
218        let mut labels = HashMap::new();
219        labels.insert("upstream".to_string(), upstream.to_string());
220
221        let buckets = vec![
222            0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
223        ];
224
225        self.aggregator.observe_histogram(
226            "sentinel_upstream_duration_seconds",
227            "Time spent waiting for upstream response",
228            labels,
229            &buckets,
230            duration_secs,
231        );
232    }
233
234    /// Increment agent requests.
235    pub fn inc_agent_requests(&self, agent: &str, decision: &str) {
236        let mut labels = HashMap::new();
237        labels.insert("agent".to_string(), agent.to_string());
238        labels.insert("decision".to_string(), decision.to_string());
239
240        self.aggregator.increment_counter(
241            "sentinel_agent_requests_total",
242            "Total requests processed by agents",
243            labels,
244            1,
245        );
246    }
247
248    /// Record agent processing time.
249    pub fn observe_agent_duration(&self, agent: &str, duration_secs: f64) {
250        let mut labels = HashMap::new();
251        labels.insert("agent".to_string(), agent.to_string());
252
253        let buckets = vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0];
254
255        self.aggregator.observe_histogram(
256            "sentinel_agent_duration_seconds",
257            "Time spent processing request in agent",
258            labels,
259            &buckets,
260            duration_secs,
261        );
262    }
263
264    /// Increment circuit breaker trips.
265    pub fn inc_circuit_breaker_trips(&self, upstream: &str) {
266        let mut labels = HashMap::new();
267        labels.insert("upstream".to_string(), upstream.to_string());
268
269        self.aggregator.increment_counter(
270            "sentinel_circuit_breaker_trips_total",
271            "Number of times circuit breaker has tripped",
272            labels,
273            1,
274        );
275    }
276
277    /// Set circuit breaker state.
278    pub fn set_circuit_breaker_state(&self, upstream: &str, open: bool) {
279        let mut labels = HashMap::new();
280        labels.insert("upstream".to_string(), upstream.to_string());
281
282        self.aggregator.set_gauge(
283            "sentinel_circuit_breaker_open",
284            "Whether circuit breaker is open (1) or closed (0)",
285            labels,
286            if open { 1.0 } else { 0.0 },
287        );
288    }
289
290    /// Increment rate limited requests.
291    pub fn inc_rate_limited(&self, route: &str) {
292        let mut labels = HashMap::new();
293        labels.insert("route".to_string(), route.to_string());
294
295        self.aggregator.increment_counter(
296            "sentinel_rate_limited_total",
297            "Total requests rate limited",
298            labels,
299            1,
300        );
301    }
302
303    /// Increment cache hits/misses.
304    pub fn inc_cache_access(&self, hit: bool) {
305        let mut labels = HashMap::new();
306        labels.insert("result".to_string(), if hit { "hit" } else { "miss" }.to_string());
307
308        self.aggregator.increment_counter(
309            "sentinel_cache_accesses_total",
310            "Total cache accesses",
311            labels,
312            1,
313        );
314    }
315
316    /// Set cache size.
317    pub fn set_cache_size(&self, size_bytes: f64) {
318        self.aggregator.set_gauge(
319            "sentinel_cache_size_bytes",
320            "Current cache size in bytes",
321            HashMap::new(),
322            size_bytes,
323        );
324    }
325}
326
327/// Response for metrics requests.
328#[derive(Debug)]
329pub struct MetricsResponse {
330    /// HTTP status code
331    pub status: u16,
332    /// Content type
333    pub content_type: String,
334    /// Response body
335    pub body: String,
336}
337
338impl MetricsResponse {
339    /// Create a successful metrics response.
340    pub fn ok(body: String) -> Self {
341        Self {
342            status: 200,
343            content_type: "text/plain; version=0.0.4; charset=utf-8".to_string(),
344            body,
345        }
346    }
347
348    /// Create a 404 response.
349    pub fn not_found() -> Self {
350        Self {
351            status: 404,
352            content_type: "text/plain".to_string(),
353            body: "Metrics not found".to_string(),
354        }
355    }
356
357    /// Create a 403 response.
358    pub fn forbidden() -> Self {
359        Self {
360            status: 403,
361            content_type: "text/plain".to_string(),
362            body: "Forbidden".to_string(),
363        }
364    }
365
366    /// Convert to HTTP response header.
367    pub fn to_header(&self) -> ResponseHeader {
368        let mut header = ResponseHeader::build(self.status, Some(2)).unwrap();
369        header
370            .append_header("Content-Type", &self.content_type)
371            .ok();
372        header
373            .append_header("Content-Length", self.body.len().to_string())
374            .ok();
375        header
376    }
377}
378
379/// Standard metric names for Sentinel proxy.
380pub mod standard {
381    /// Total HTTP requests
382    pub const REQUESTS_TOTAL: &str = "sentinel_requests_total";
383    /// Request duration histogram
384    pub const REQUEST_DURATION: &str = "sentinel_request_duration_seconds";
385    /// Active connections gauge
386    pub const ACTIVE_CONNECTIONS: &str = "sentinel_active_connections";
387    /// Active requests gauge
388    pub const ACTIVE_REQUESTS: &str = "sentinel_active_requests";
389    /// Upstream requests total
390    pub const UPSTREAM_REQUESTS: &str = "sentinel_upstream_requests_total";
391    /// Upstream duration histogram
392    pub const UPSTREAM_DURATION: &str = "sentinel_upstream_duration_seconds";
393    /// Agent requests total
394    pub const AGENT_REQUESTS: &str = "sentinel_agent_requests_total";
395    /// Agent duration histogram
396    pub const AGENT_DURATION: &str = "sentinel_agent_duration_seconds";
397    /// Circuit breaker trips
398    pub const CIRCUIT_BREAKER_TRIPS: &str = "sentinel_circuit_breaker_trips_total";
399    /// Circuit breaker state
400    pub const CIRCUIT_BREAKER_OPEN: &str = "sentinel_circuit_breaker_open";
401    /// Rate limited requests
402    pub const RATE_LIMITED: &str = "sentinel_rate_limited_total";
403    /// Cache accesses
404    pub const CACHE_ACCESSES: &str = "sentinel_cache_accesses_total";
405    /// Cache size
406    pub const CACHE_SIZE: &str = "sentinel_cache_size_bytes";
407}
408
409#[cfg(test)]
410mod tests {
411    use super::*;
412
413    #[test]
414    fn test_metrics_manager_creation() {
415        let manager = MetricsManager::new("test-service", "node-1");
416        assert!(manager.is_enabled());
417        assert_eq!(manager.metrics_path(), "/metrics");
418    }
419
420    #[test]
421    fn test_metrics_manager_disabled() {
422        let manager = MetricsManager::new("test", "1").disable();
423        assert!(!manager.is_enabled());
424
425        let response = manager.handle_metrics_request();
426        assert_eq!(response.status, 404);
427    }
428
429    #[test]
430    fn test_metrics_manager_ip_filtering() {
431        let manager = MetricsManager::new("test", "1")
432            .allowed_ips(vec!["127.0.0.1".to_string(), "10.0.0.1".to_string()]);
433
434        assert!(manager.is_ip_allowed("127.0.0.1"));
435        assert!(manager.is_ip_allowed("10.0.0.1"));
436        assert!(!manager.is_ip_allowed("192.168.1.1"));
437    }
438
439    #[test]
440    fn test_metrics_manager_all_ips_allowed() {
441        let manager = MetricsManager::new("test", "1");
442
443        // Empty allowed_ips means all IPs are allowed
444        assert!(manager.is_ip_allowed("127.0.0.1"));
445        assert!(manager.is_ip_allowed("192.168.1.1"));
446        assert!(manager.is_ip_allowed("any-ip"));
447    }
448
449    #[test]
450    fn test_metrics_response() {
451        let manager = MetricsManager::new("test", "node-1");
452
453        // Record some metrics
454        manager.inc_requests_total("GET", 200, "/api/users");
455        manager.set_active_connections(42.0);
456
457        let response = manager.handle_metrics_request();
458        assert_eq!(response.status, 200);
459        assert!(response.content_type.contains("text/plain"));
460        assert!(response.body.contains("sentinel_requests_total"));
461        assert!(response.body.contains("sentinel_active_connections"));
462        assert!(response.body.contains("sentinel_info"));
463    }
464
465    #[test]
466    fn test_request_duration_histogram() {
467        let manager = MetricsManager::new("test", "1");
468
469        manager.observe_request_duration("GET", "/api", 0.05);
470        manager.observe_request_duration("GET", "/api", 0.15);
471        manager.observe_request_duration("GET", "/api", 0.5);
472
473        let response = manager.handle_metrics_request();
474        assert!(response.body.contains("sentinel_request_duration_seconds_bucket"));
475        assert!(response.body.contains("sentinel_request_duration_seconds_sum"));
476        assert!(response.body.contains("sentinel_request_duration_seconds_count"));
477        // Verify count is 3 (with labels, the format is {labels} 3)
478        assert!(response.body.contains("} 3\n") || response.body.contains(" 3\n"));
479    }
480
481    #[test]
482    fn test_custom_path() {
483        let manager = MetricsManager::new("test", "1").path("/internal/metrics");
484        assert_eq!(manager.metrics_path(), "/internal/metrics");
485    }
486
487    #[test]
488    fn test_upstream_metrics() {
489        let manager = MetricsManager::new("test", "1");
490
491        manager.inc_upstream_requests("backend-1", 200, true);
492        manager.observe_upstream_duration("backend-1", 0.1);
493
494        let response = manager.handle_metrics_request();
495        assert!(response.body.contains("sentinel_upstream_requests_total"));
496        assert!(response.body.contains("sentinel_upstream_duration_seconds"));
497    }
498
499    #[test]
500    fn test_agent_metrics() {
501        let manager = MetricsManager::new("test", "1");
502
503        manager.inc_agent_requests("waf", "allow");
504        manager.observe_agent_duration("waf", 0.005);
505
506        let response = manager.handle_metrics_request();
507        assert!(response.body.contains("sentinel_agent_requests_total"));
508        assert!(response.body.contains("sentinel_agent_duration_seconds"));
509    }
510}