1use pingora_http::ResponseHeader;
10use sentinel_agent_protocol::v2::{MetricsCollector, UnifiedMetricsAggregator};
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15pub struct MetricsManager {
20 aggregator: Arc<UnifiedMetricsAggregator>,
22 enabled: bool,
24 path: String,
26 allowed_ips: Vec<String>,
28 pool_metrics: RwLock<HashMap<String, Arc<MetricsCollector>>>,
30}
31
32impl MetricsManager {
33 pub fn new(service_name: impl Into<String>, instance_id: impl Into<String>) -> Self {
35 Self {
36 aggregator: Arc::new(UnifiedMetricsAggregator::new(service_name, instance_id)),
37 enabled: true,
38 path: "/metrics".to_string(),
39 allowed_ips: Vec::new(),
40 pool_metrics: RwLock::new(HashMap::new()),
41 }
42 }
43
44 pub fn with_aggregator(aggregator: Arc<UnifiedMetricsAggregator>) -> Self {
46 Self {
47 aggregator,
48 enabled: true,
49 path: "/metrics".to_string(),
50 allowed_ips: Vec::new(),
51 pool_metrics: RwLock::new(HashMap::new()),
52 }
53 }
54
55 pub fn path(mut self, path: impl Into<String>) -> Self {
57 self.path = path.into();
58 self
59 }
60
61 pub fn allowed_ips(mut self, ips: Vec<String>) -> Self {
63 self.allowed_ips = ips;
64 self
65 }
66
67 pub fn disable(mut self) -> Self {
69 self.enabled = false;
70 self
71 }
72
73 pub fn is_enabled(&self) -> bool {
75 self.enabled
76 }
77
78 pub fn metrics_path(&self) -> &str {
80 &self.path
81 }
82
83 pub fn aggregator(&self) -> &UnifiedMetricsAggregator {
85 &self.aggregator
86 }
87
88 pub fn aggregator_arc(&self) -> Arc<UnifiedMetricsAggregator> {
90 Arc::clone(&self.aggregator)
91 }
92
93 pub fn is_ip_allowed(&self, ip: &str) -> bool {
95 if self.allowed_ips.is_empty() {
96 return true;
97 }
98 self.allowed_ips.iter().any(|allowed| allowed == ip)
99 }
100
101 pub async fn register_pool_metrics(&self, agent_id: impl Into<String>, collector: Arc<MetricsCollector>) {
105 self.pool_metrics.write().await.insert(agent_id.into(), collector);
106 }
107
108 pub async fn unregister_pool_metrics(&self, agent_id: &str) {
110 self.pool_metrics.write().await.remove(agent_id);
111 }
112
113 pub fn handle_metrics_request(&self) -> MetricsResponse {
119 if !self.enabled {
120 return MetricsResponse::not_found();
121 }
122
123 let mut body = self.aggregator.export_prometheus();
125
126 if let Ok(pool_metrics) = self.pool_metrics.try_read() {
129 for (agent_id, collector) in pool_metrics.iter() {
130 let pool_output = collector.export_prometheus();
131 if !pool_output.is_empty() {
132 body.push_str(&format!("\n# Agent pool metrics: {}\n", agent_id));
134 body.push_str(&pool_output);
135 }
136 }
137 }
138
139 MetricsResponse::ok(body)
140 }
141
142 pub fn inc_requests_total(&self, method: &str, status: u16, route: &str) {
148 let mut labels = HashMap::new();
149 labels.insert("method".to_string(), method.to_string());
150 labels.insert("status".to_string(), status.to_string());
151 labels.insert("route".to_string(), route.to_string());
152
153 self.aggregator.increment_counter(
154 "sentinel_requests_total",
155 "Total HTTP requests handled by the proxy",
156 labels,
157 1,
158 );
159 }
160
161 pub fn observe_request_duration(&self, method: &str, route: &str, duration_secs: f64) {
163 let mut labels = HashMap::new();
164 labels.insert("method".to_string(), method.to_string());
165 labels.insert("route".to_string(), route.to_string());
166
167 let buckets = vec![
169 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
170 ];
171
172 self.aggregator.observe_histogram(
173 "sentinel_request_duration_seconds",
174 "HTTP request duration in seconds",
175 labels,
176 &buckets,
177 duration_secs,
178 );
179 }
180
181 pub fn set_active_connections(&self, count: f64) {
183 self.aggregator.set_gauge(
184 "sentinel_active_connections",
185 "Number of active client connections",
186 HashMap::new(),
187 count,
188 );
189 }
190
191 pub fn set_active_requests(&self, count: f64) {
193 self.aggregator.set_gauge(
194 "sentinel_active_requests",
195 "Number of requests currently being processed",
196 HashMap::new(),
197 count,
198 );
199 }
200
201 pub fn inc_upstream_requests(&self, upstream: &str, status: u16, success: bool) {
203 let mut labels = HashMap::new();
204 labels.insert("upstream".to_string(), upstream.to_string());
205 labels.insert("status".to_string(), status.to_string());
206 labels.insert("success".to_string(), success.to_string());
207
208 self.aggregator.increment_counter(
209 "sentinel_upstream_requests_total",
210 "Total requests to upstream servers",
211 labels,
212 1,
213 );
214 }
215
216 pub fn observe_upstream_duration(&self, upstream: &str, duration_secs: f64) {
218 let mut labels = HashMap::new();
219 labels.insert("upstream".to_string(), upstream.to_string());
220
221 let buckets = vec![
222 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
223 ];
224
225 self.aggregator.observe_histogram(
226 "sentinel_upstream_duration_seconds",
227 "Time spent waiting for upstream response",
228 labels,
229 &buckets,
230 duration_secs,
231 );
232 }
233
234 pub fn inc_agent_requests(&self, agent: &str, decision: &str) {
236 let mut labels = HashMap::new();
237 labels.insert("agent".to_string(), agent.to_string());
238 labels.insert("decision".to_string(), decision.to_string());
239
240 self.aggregator.increment_counter(
241 "sentinel_agent_requests_total",
242 "Total requests processed by agents",
243 labels,
244 1,
245 );
246 }
247
248 pub fn observe_agent_duration(&self, agent: &str, duration_secs: f64) {
250 let mut labels = HashMap::new();
251 labels.insert("agent".to_string(), agent.to_string());
252
253 let buckets = vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0];
254
255 self.aggregator.observe_histogram(
256 "sentinel_agent_duration_seconds",
257 "Time spent processing request in agent",
258 labels,
259 &buckets,
260 duration_secs,
261 );
262 }
263
264 pub fn inc_circuit_breaker_trips(&self, upstream: &str) {
266 let mut labels = HashMap::new();
267 labels.insert("upstream".to_string(), upstream.to_string());
268
269 self.aggregator.increment_counter(
270 "sentinel_circuit_breaker_trips_total",
271 "Number of times circuit breaker has tripped",
272 labels,
273 1,
274 );
275 }
276
277 pub fn set_circuit_breaker_state(&self, upstream: &str, open: bool) {
279 let mut labels = HashMap::new();
280 labels.insert("upstream".to_string(), upstream.to_string());
281
282 self.aggregator.set_gauge(
283 "sentinel_circuit_breaker_open",
284 "Whether circuit breaker is open (1) or closed (0)",
285 labels,
286 if open { 1.0 } else { 0.0 },
287 );
288 }
289
290 pub fn inc_rate_limited(&self, route: &str) {
292 let mut labels = HashMap::new();
293 labels.insert("route".to_string(), route.to_string());
294
295 self.aggregator.increment_counter(
296 "sentinel_rate_limited_total",
297 "Total requests rate limited",
298 labels,
299 1,
300 );
301 }
302
303 pub fn inc_cache_access(&self, hit: bool) {
305 let mut labels = HashMap::new();
306 labels.insert("result".to_string(), if hit { "hit" } else { "miss" }.to_string());
307
308 self.aggregator.increment_counter(
309 "sentinel_cache_accesses_total",
310 "Total cache accesses",
311 labels,
312 1,
313 );
314 }
315
316 pub fn set_cache_size(&self, size_bytes: f64) {
318 self.aggregator.set_gauge(
319 "sentinel_cache_size_bytes",
320 "Current cache size in bytes",
321 HashMap::new(),
322 size_bytes,
323 );
324 }
325}
326
327#[derive(Debug)]
329pub struct MetricsResponse {
330 pub status: u16,
332 pub content_type: String,
334 pub body: String,
336}
337
338impl MetricsResponse {
339 pub fn ok(body: String) -> Self {
341 Self {
342 status: 200,
343 content_type: "text/plain; version=0.0.4; charset=utf-8".to_string(),
344 body,
345 }
346 }
347
348 pub fn not_found() -> Self {
350 Self {
351 status: 404,
352 content_type: "text/plain".to_string(),
353 body: "Metrics not found".to_string(),
354 }
355 }
356
357 pub fn forbidden() -> Self {
359 Self {
360 status: 403,
361 content_type: "text/plain".to_string(),
362 body: "Forbidden".to_string(),
363 }
364 }
365
366 pub fn to_header(&self) -> ResponseHeader {
368 let mut header = ResponseHeader::build(self.status, Some(2)).unwrap();
369 header
370 .append_header("Content-Type", &self.content_type)
371 .ok();
372 header
373 .append_header("Content-Length", self.body.len().to_string())
374 .ok();
375 header
376 }
377}
378
379pub mod standard {
381 pub const REQUESTS_TOTAL: &str = "sentinel_requests_total";
383 pub const REQUEST_DURATION: &str = "sentinel_request_duration_seconds";
385 pub const ACTIVE_CONNECTIONS: &str = "sentinel_active_connections";
387 pub const ACTIVE_REQUESTS: &str = "sentinel_active_requests";
389 pub const UPSTREAM_REQUESTS: &str = "sentinel_upstream_requests_total";
391 pub const UPSTREAM_DURATION: &str = "sentinel_upstream_duration_seconds";
393 pub const AGENT_REQUESTS: &str = "sentinel_agent_requests_total";
395 pub const AGENT_DURATION: &str = "sentinel_agent_duration_seconds";
397 pub const CIRCUIT_BREAKER_TRIPS: &str = "sentinel_circuit_breaker_trips_total";
399 pub const CIRCUIT_BREAKER_OPEN: &str = "sentinel_circuit_breaker_open";
401 pub const RATE_LIMITED: &str = "sentinel_rate_limited_total";
403 pub const CACHE_ACCESSES: &str = "sentinel_cache_accesses_total";
405 pub const CACHE_SIZE: &str = "sentinel_cache_size_bytes";
407}
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412
413 #[test]
414 fn test_metrics_manager_creation() {
415 let manager = MetricsManager::new("test-service", "node-1");
416 assert!(manager.is_enabled());
417 assert_eq!(manager.metrics_path(), "/metrics");
418 }
419
420 #[test]
421 fn test_metrics_manager_disabled() {
422 let manager = MetricsManager::new("test", "1").disable();
423 assert!(!manager.is_enabled());
424
425 let response = manager.handle_metrics_request();
426 assert_eq!(response.status, 404);
427 }
428
429 #[test]
430 fn test_metrics_manager_ip_filtering() {
431 let manager = MetricsManager::new("test", "1")
432 .allowed_ips(vec!["127.0.0.1".to_string(), "10.0.0.1".to_string()]);
433
434 assert!(manager.is_ip_allowed("127.0.0.1"));
435 assert!(manager.is_ip_allowed("10.0.0.1"));
436 assert!(!manager.is_ip_allowed("192.168.1.1"));
437 }
438
439 #[test]
440 fn test_metrics_manager_all_ips_allowed() {
441 let manager = MetricsManager::new("test", "1");
442
443 assert!(manager.is_ip_allowed("127.0.0.1"));
445 assert!(manager.is_ip_allowed("192.168.1.1"));
446 assert!(manager.is_ip_allowed("any-ip"));
447 }
448
449 #[test]
450 fn test_metrics_response() {
451 let manager = MetricsManager::new("test", "node-1");
452
453 manager.inc_requests_total("GET", 200, "/api/users");
455 manager.set_active_connections(42.0);
456
457 let response = manager.handle_metrics_request();
458 assert_eq!(response.status, 200);
459 assert!(response.content_type.contains("text/plain"));
460 assert!(response.body.contains("sentinel_requests_total"));
461 assert!(response.body.contains("sentinel_active_connections"));
462 assert!(response.body.contains("sentinel_info"));
463 }
464
465 #[test]
466 fn test_request_duration_histogram() {
467 let manager = MetricsManager::new("test", "1");
468
469 manager.observe_request_duration("GET", "/api", 0.05);
470 manager.observe_request_duration("GET", "/api", 0.15);
471 manager.observe_request_duration("GET", "/api", 0.5);
472
473 let response = manager.handle_metrics_request();
474 assert!(response.body.contains("sentinel_request_duration_seconds_bucket"));
475 assert!(response.body.contains("sentinel_request_duration_seconds_sum"));
476 assert!(response.body.contains("sentinel_request_duration_seconds_count"));
477 assert!(response.body.contains("} 3\n") || response.body.contains(" 3\n"));
479 }
480
481 #[test]
482 fn test_custom_path() {
483 let manager = MetricsManager::new("test", "1").path("/internal/metrics");
484 assert_eq!(manager.metrics_path(), "/internal/metrics");
485 }
486
487 #[test]
488 fn test_upstream_metrics() {
489 let manager = MetricsManager::new("test", "1");
490
491 manager.inc_upstream_requests("backend-1", 200, true);
492 manager.observe_upstream_duration("backend-1", 0.1);
493
494 let response = manager.handle_metrics_request();
495 assert!(response.body.contains("sentinel_upstream_requests_total"));
496 assert!(response.body.contains("sentinel_upstream_duration_seconds"));
497 }
498
499 #[test]
500 fn test_agent_metrics() {
501 let manager = MetricsManager::new("test", "1");
502
503 manager.inc_agent_requests("waf", "allow");
504 manager.observe_agent_duration("waf", 0.005);
505
506 let response = manager.handle_metrics_request();
507 assert!(response.body.contains("sentinel_agent_requests_total"));
508 assert!(response.body.contains("sentinel_agent_duration_seconds"));
509 }
510}