Skip to main content

sentinel_proxy/
metrics.rs

1//! Prometheus metrics endpoint for Sentinel proxy.
2//!
3//! This module provides:
4//! - An HTTP endpoint for Prometheus to scrape metrics
5//! - Integration with the UnifiedMetricsAggregator
6//! - Standard proxy metrics (requests, latencies, errors)
7//! - Agent pool metrics from v2 agents
8
9use pingora_http::ResponseHeader;
10use sentinel_agent_protocol::v2::{MetricsCollector, UnifiedMetricsAggregator};
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15/// Metrics manager for the proxy.
16///
17/// This manages all proxy metrics and provides a Prometheus-compatible
18/// export endpoint.
19pub struct MetricsManager {
20    /// The unified metrics aggregator
21    aggregator: Arc<UnifiedMetricsAggregator>,
22    /// Whether metrics are enabled
23    enabled: bool,
24    /// Path for the metrics endpoint
25    path: String,
26    /// Allowed IP addresses for metrics access (empty = all allowed)
27    allowed_ips: Vec<String>,
28    /// Pool metrics collectors from v2 agents (agent_id -> collector)
29    pool_metrics: RwLock<HashMap<String, Arc<MetricsCollector>>>,
30}
31
32impl MetricsManager {
33    /// Create a new metrics manager.
34    pub fn new(service_name: impl Into<String>, instance_id: impl Into<String>) -> Self {
35        Self {
36            aggregator: Arc::new(UnifiedMetricsAggregator::new(service_name, instance_id)),
37            enabled: true,
38            path: "/metrics".to_string(),
39            allowed_ips: Vec::new(),
40            pool_metrics: RwLock::new(HashMap::new()),
41        }
42    }
43
44    /// Create from an existing aggregator.
45    pub fn with_aggregator(aggregator: Arc<UnifiedMetricsAggregator>) -> Self {
46        Self {
47            aggregator,
48            enabled: true,
49            path: "/metrics".to_string(),
50            allowed_ips: Vec::new(),
51            pool_metrics: RwLock::new(HashMap::new()),
52        }
53    }
54
55    /// Set the metrics endpoint path.
56    pub fn path(mut self, path: impl Into<String>) -> Self {
57        self.path = path.into();
58        self
59    }
60
61    /// Set allowed IPs for metrics access.
62    pub fn allowed_ips(mut self, ips: Vec<String>) -> Self {
63        self.allowed_ips = ips;
64        self
65    }
66
67    /// Disable metrics collection.
68    pub fn disable(mut self) -> Self {
69        self.enabled = false;
70        self
71    }
72
73    /// Check if metrics are enabled.
74    pub fn is_enabled(&self) -> bool {
75        self.enabled
76    }
77
78    /// Get the metrics path.
79    pub fn metrics_path(&self) -> &str {
80        &self.path
81    }
82
83    /// Get a reference to the aggregator.
84    pub fn aggregator(&self) -> &UnifiedMetricsAggregator {
85        &self.aggregator
86    }
87
88    /// Get an Arc to the aggregator.
89    pub fn aggregator_arc(&self) -> Arc<UnifiedMetricsAggregator> {
90        Arc::clone(&self.aggregator)
91    }
92
93    /// Check if an IP is allowed to access metrics.
94    pub fn is_ip_allowed(&self, ip: &str) -> bool {
95        if self.allowed_ips.is_empty() {
96            return true;
97        }
98        self.allowed_ips.iter().any(|allowed| allowed == ip)
99    }
100
101    /// Register a pool metrics collector for a v2 agent.
102    ///
103    /// Pool metrics will be included in the /metrics output.
104    pub async fn register_pool_metrics(
105        &self,
106        agent_id: impl Into<String>,
107        collector: Arc<MetricsCollector>,
108    ) {
109        self.pool_metrics
110            .write()
111            .await
112            .insert(agent_id.into(), collector);
113    }
114
115    /// Unregister a pool metrics collector.
116    pub async fn unregister_pool_metrics(&self, agent_id: &str) {
117        self.pool_metrics.write().await.remove(agent_id);
118    }
119
120    /// Handle a metrics request.
121    ///
122    /// Returns the Prometheus text format metrics body, including:
123    /// - Proxy metrics from the UnifiedMetricsAggregator
124    /// - Pool metrics from all registered v2 agent pools
125    pub fn handle_metrics_request(&self) -> MetricsResponse {
126        if !self.enabled {
127            return MetricsResponse::not_found();
128        }
129
130        // Export proxy metrics
131        let mut body = self.aggregator.export_prometheus();
132
133        // Append pool metrics from all registered v2 agents
134        // Use try_read to avoid blocking - if lock is held, skip pool metrics this scrape
135        if let Ok(pool_metrics) = self.pool_metrics.try_read() {
136            for (agent_id, collector) in pool_metrics.iter() {
137                let pool_output = collector.export_prometheus();
138                if !pool_output.is_empty() {
139                    // Add a comment separator for clarity
140                    body.push_str(&format!("\n# Agent pool metrics: {}\n", agent_id));
141                    body.push_str(&pool_output);
142                }
143            }
144        }
145
146        MetricsResponse::ok(body)
147    }
148
149    // -------------------------------------------------------------------------
150    // Convenience methods for recording proxy metrics
151    // -------------------------------------------------------------------------
152
153    /// Increment total requests counter.
154    pub fn inc_requests_total(&self, method: &str, status: u16, route: &str) {
155        let mut labels = HashMap::new();
156        labels.insert("method".to_string(), method.to_string());
157        labels.insert("status".to_string(), status.to_string());
158        labels.insert("route".to_string(), route.to_string());
159
160        self.aggregator.increment_counter(
161            "sentinel_requests_total",
162            "Total HTTP requests handled by the proxy",
163            labels,
164            1,
165        );
166    }
167
168    /// Record request duration.
169    pub fn observe_request_duration(&self, method: &str, route: &str, duration_secs: f64) {
170        let mut labels = HashMap::new();
171        labels.insert("method".to_string(), method.to_string());
172        labels.insert("route".to_string(), route.to_string());
173
174        // Standard latency buckets
175        let buckets = vec![
176            0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
177        ];
178
179        self.aggregator.observe_histogram(
180            "sentinel_request_duration_seconds",
181            "HTTP request duration in seconds",
182            labels,
183            &buckets,
184            duration_secs,
185        );
186    }
187
188    /// Set active connections gauge.
189    pub fn set_active_connections(&self, count: f64) {
190        self.aggregator.set_gauge(
191            "sentinel_active_connections",
192            "Number of active client connections",
193            HashMap::new(),
194            count,
195        );
196    }
197
198    /// Set active requests gauge.
199    pub fn set_active_requests(&self, count: f64) {
200        self.aggregator.set_gauge(
201            "sentinel_active_requests",
202            "Number of requests currently being processed",
203            HashMap::new(),
204            count,
205        );
206    }
207
208    /// Increment upstream requests.
209    pub fn inc_upstream_requests(&self, upstream: &str, status: u16, success: bool) {
210        let mut labels = HashMap::new();
211        labels.insert("upstream".to_string(), upstream.to_string());
212        labels.insert("status".to_string(), status.to_string());
213        labels.insert("success".to_string(), success.to_string());
214
215        self.aggregator.increment_counter(
216            "sentinel_upstream_requests_total",
217            "Total requests to upstream servers",
218            labels,
219            1,
220        );
221    }
222
223    /// Record upstream latency.
224    pub fn observe_upstream_duration(&self, upstream: &str, duration_secs: f64) {
225        let mut labels = HashMap::new();
226        labels.insert("upstream".to_string(), upstream.to_string());
227
228        let buckets = vec![
229            0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
230        ];
231
232        self.aggregator.observe_histogram(
233            "sentinel_upstream_duration_seconds",
234            "Time spent waiting for upstream response",
235            labels,
236            &buckets,
237            duration_secs,
238        );
239    }
240
241    /// Increment agent requests.
242    pub fn inc_agent_requests(&self, agent: &str, decision: &str) {
243        let mut labels = HashMap::new();
244        labels.insert("agent".to_string(), agent.to_string());
245        labels.insert("decision".to_string(), decision.to_string());
246
247        self.aggregator.increment_counter(
248            "sentinel_agent_requests_total",
249            "Total requests processed by agents",
250            labels,
251            1,
252        );
253    }
254
255    /// Record agent processing time.
256    pub fn observe_agent_duration(&self, agent: &str, duration_secs: f64) {
257        let mut labels = HashMap::new();
258        labels.insert("agent".to_string(), agent.to_string());
259
260        let buckets = vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0];
261
262        self.aggregator.observe_histogram(
263            "sentinel_agent_duration_seconds",
264            "Time spent processing request in agent",
265            labels,
266            &buckets,
267            duration_secs,
268        );
269    }
270
271    /// Increment circuit breaker trips.
272    pub fn inc_circuit_breaker_trips(&self, upstream: &str) {
273        let mut labels = HashMap::new();
274        labels.insert("upstream".to_string(), upstream.to_string());
275
276        self.aggregator.increment_counter(
277            "sentinel_circuit_breaker_trips_total",
278            "Number of times circuit breaker has tripped",
279            labels,
280            1,
281        );
282    }
283
284    /// Set circuit breaker state.
285    pub fn set_circuit_breaker_state(&self, upstream: &str, open: bool) {
286        let mut labels = HashMap::new();
287        labels.insert("upstream".to_string(), upstream.to_string());
288
289        self.aggregator.set_gauge(
290            "sentinel_circuit_breaker_open",
291            "Whether circuit breaker is open (1) or closed (0)",
292            labels,
293            if open { 1.0 } else { 0.0 },
294        );
295    }
296
297    /// Increment rate limited requests.
298    pub fn inc_rate_limited(&self, route: &str) {
299        let mut labels = HashMap::new();
300        labels.insert("route".to_string(), route.to_string());
301
302        self.aggregator.increment_counter(
303            "sentinel_rate_limited_total",
304            "Total requests rate limited",
305            labels,
306            1,
307        );
308    }
309
310    /// Increment cache hits/misses.
311    pub fn inc_cache_access(&self, hit: bool) {
312        let mut labels = HashMap::new();
313        labels.insert(
314            "result".to_string(),
315            if hit { "hit" } else { "miss" }.to_string(),
316        );
317
318        self.aggregator.increment_counter(
319            "sentinel_cache_accesses_total",
320            "Total cache accesses",
321            labels,
322            1,
323        );
324    }
325
326    /// Set cache size.
327    pub fn set_cache_size(&self, size_bytes: f64) {
328        self.aggregator.set_gauge(
329            "sentinel_cache_size_bytes",
330            "Current cache size in bytes",
331            HashMap::new(),
332            size_bytes,
333        );
334    }
335}
336
337/// Response for metrics requests.
338#[derive(Debug)]
339pub struct MetricsResponse {
340    /// HTTP status code
341    pub status: u16,
342    /// Content type
343    pub content_type: String,
344    /// Response body
345    pub body: String,
346}
347
348impl MetricsResponse {
349    /// Create a successful metrics response.
350    pub fn ok(body: String) -> Self {
351        Self {
352            status: 200,
353            content_type: "text/plain; version=0.0.4; charset=utf-8".to_string(),
354            body,
355        }
356    }
357
358    /// Create a 404 response.
359    pub fn not_found() -> Self {
360        Self {
361            status: 404,
362            content_type: "text/plain".to_string(),
363            body: "Metrics not found".to_string(),
364        }
365    }
366
367    /// Create a 403 response.
368    pub fn forbidden() -> Self {
369        Self {
370            status: 403,
371            content_type: "text/plain".to_string(),
372            body: "Forbidden".to_string(),
373        }
374    }
375
376    /// Convert to HTTP response header.
377    pub fn to_header(&self) -> ResponseHeader {
378        let mut header = ResponseHeader::build(self.status, Some(2)).unwrap();
379        header
380            .append_header("Content-Type", &self.content_type)
381            .ok();
382        header
383            .append_header("Content-Length", self.body.len().to_string())
384            .ok();
385        header
386    }
387}
388
389/// Standard metric names for Sentinel proxy.
390pub mod standard {
391    /// Total HTTP requests
392    pub const REQUESTS_TOTAL: &str = "sentinel_requests_total";
393    /// Request duration histogram
394    pub const REQUEST_DURATION: &str = "sentinel_request_duration_seconds";
395    /// Active connections gauge
396    pub const ACTIVE_CONNECTIONS: &str = "sentinel_active_connections";
397    /// Active requests gauge
398    pub const ACTIVE_REQUESTS: &str = "sentinel_active_requests";
399    /// Upstream requests total
400    pub const UPSTREAM_REQUESTS: &str = "sentinel_upstream_requests_total";
401    /// Upstream duration histogram
402    pub const UPSTREAM_DURATION: &str = "sentinel_upstream_duration_seconds";
403    /// Agent requests total
404    pub const AGENT_REQUESTS: &str = "sentinel_agent_requests_total";
405    /// Agent duration histogram
406    pub const AGENT_DURATION: &str = "sentinel_agent_duration_seconds";
407    /// Circuit breaker trips
408    pub const CIRCUIT_BREAKER_TRIPS: &str = "sentinel_circuit_breaker_trips_total";
409    /// Circuit breaker state
410    pub const CIRCUIT_BREAKER_OPEN: &str = "sentinel_circuit_breaker_open";
411    /// Rate limited requests
412    pub const RATE_LIMITED: &str = "sentinel_rate_limited_total";
413    /// Cache accesses
414    pub const CACHE_ACCESSES: &str = "sentinel_cache_accesses_total";
415    /// Cache size
416    pub const CACHE_SIZE: &str = "sentinel_cache_size_bytes";
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422
423    #[test]
424    fn test_metrics_manager_creation() {
425        let manager = MetricsManager::new("test-service", "node-1");
426        assert!(manager.is_enabled());
427        assert_eq!(manager.metrics_path(), "/metrics");
428    }
429
430    #[test]
431    fn test_metrics_manager_disabled() {
432        let manager = MetricsManager::new("test", "1").disable();
433        assert!(!manager.is_enabled());
434
435        let response = manager.handle_metrics_request();
436        assert_eq!(response.status, 404);
437    }
438
439    #[test]
440    fn test_metrics_manager_ip_filtering() {
441        let manager = MetricsManager::new("test", "1")
442            .allowed_ips(vec!["127.0.0.1".to_string(), "10.0.0.1".to_string()]);
443
444        assert!(manager.is_ip_allowed("127.0.0.1"));
445        assert!(manager.is_ip_allowed("10.0.0.1"));
446        assert!(!manager.is_ip_allowed("192.168.1.1"));
447    }
448
449    #[test]
450    fn test_metrics_manager_all_ips_allowed() {
451        let manager = MetricsManager::new("test", "1");
452
453        // Empty allowed_ips means all IPs are allowed
454        assert!(manager.is_ip_allowed("127.0.0.1"));
455        assert!(manager.is_ip_allowed("192.168.1.1"));
456        assert!(manager.is_ip_allowed("any-ip"));
457    }
458
459    #[test]
460    fn test_metrics_response() {
461        let manager = MetricsManager::new("test", "node-1");
462
463        // Record some metrics
464        manager.inc_requests_total("GET", 200, "/api/users");
465        manager.set_active_connections(42.0);
466
467        let response = manager.handle_metrics_request();
468        assert_eq!(response.status, 200);
469        assert!(response.content_type.contains("text/plain"));
470        assert!(response.body.contains("sentinel_requests_total"));
471        assert!(response.body.contains("sentinel_active_connections"));
472        assert!(response.body.contains("sentinel_info"));
473    }
474
475    #[test]
476    fn test_request_duration_histogram() {
477        let manager = MetricsManager::new("test", "1");
478
479        manager.observe_request_duration("GET", "/api", 0.05);
480        manager.observe_request_duration("GET", "/api", 0.15);
481        manager.observe_request_duration("GET", "/api", 0.5);
482
483        let response = manager.handle_metrics_request();
484        assert!(response
485            .body
486            .contains("sentinel_request_duration_seconds_bucket"));
487        assert!(response
488            .body
489            .contains("sentinel_request_duration_seconds_sum"));
490        assert!(response
491            .body
492            .contains("sentinel_request_duration_seconds_count"));
493        // Verify count is 3 (with labels, the format is {labels} 3)
494        assert!(response.body.contains("} 3\n") || response.body.contains(" 3\n"));
495    }
496
497    #[test]
498    fn test_custom_path() {
499        let manager = MetricsManager::new("test", "1").path("/internal/metrics");
500        assert_eq!(manager.metrics_path(), "/internal/metrics");
501    }
502
503    #[test]
504    fn test_upstream_metrics() {
505        let manager = MetricsManager::new("test", "1");
506
507        manager.inc_upstream_requests("backend-1", 200, true);
508        manager.observe_upstream_duration("backend-1", 0.1);
509
510        let response = manager.handle_metrics_request();
511        assert!(response.body.contains("sentinel_upstream_requests_total"));
512        assert!(response.body.contains("sentinel_upstream_duration_seconds"));
513    }
514
515    #[test]
516    fn test_agent_metrics() {
517        let manager = MetricsManager::new("test", "1");
518
519        manager.inc_agent_requests("waf", "allow");
520        manager.observe_agent_duration("waf", 0.005);
521
522        let response = manager.handle_metrics_request();
523        assert!(response.body.contains("sentinel_agent_requests_total"));
524        assert!(response.body.contains("sentinel_agent_duration_seconds"));
525    }
526}