1use pingora_http::ResponseHeader;
10use sentinel_agent_protocol::v2::{MetricsCollector, UnifiedMetricsAggregator};
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15pub struct MetricsManager {
20 aggregator: Arc<UnifiedMetricsAggregator>,
22 enabled: bool,
24 path: String,
26 allowed_ips: Vec<String>,
28 pool_metrics: RwLock<HashMap<String, Arc<MetricsCollector>>>,
30}
31
32impl MetricsManager {
33 pub fn new(service_name: impl Into<String>, instance_id: impl Into<String>) -> Self {
35 Self {
36 aggregator: Arc::new(UnifiedMetricsAggregator::new(service_name, instance_id)),
37 enabled: true,
38 path: "/metrics".to_string(),
39 allowed_ips: Vec::new(),
40 pool_metrics: RwLock::new(HashMap::new()),
41 }
42 }
43
44 pub fn with_aggregator(aggregator: Arc<UnifiedMetricsAggregator>) -> Self {
46 Self {
47 aggregator,
48 enabled: true,
49 path: "/metrics".to_string(),
50 allowed_ips: Vec::new(),
51 pool_metrics: RwLock::new(HashMap::new()),
52 }
53 }
54
55 pub fn path(mut self, path: impl Into<String>) -> Self {
57 self.path = path.into();
58 self
59 }
60
61 pub fn allowed_ips(mut self, ips: Vec<String>) -> Self {
63 self.allowed_ips = ips;
64 self
65 }
66
67 pub fn disable(mut self) -> Self {
69 self.enabled = false;
70 self
71 }
72
73 pub fn is_enabled(&self) -> bool {
75 self.enabled
76 }
77
78 pub fn metrics_path(&self) -> &str {
80 &self.path
81 }
82
83 pub fn aggregator(&self) -> &UnifiedMetricsAggregator {
85 &self.aggregator
86 }
87
88 pub fn aggregator_arc(&self) -> Arc<UnifiedMetricsAggregator> {
90 Arc::clone(&self.aggregator)
91 }
92
93 pub fn is_ip_allowed(&self, ip: &str) -> bool {
95 if self.allowed_ips.is_empty() {
96 return true;
97 }
98 self.allowed_ips.iter().any(|allowed| allowed == ip)
99 }
100
101 pub async fn register_pool_metrics(
105 &self,
106 agent_id: impl Into<String>,
107 collector: Arc<MetricsCollector>,
108 ) {
109 self.pool_metrics
110 .write()
111 .await
112 .insert(agent_id.into(), collector);
113 }
114
115 pub async fn unregister_pool_metrics(&self, agent_id: &str) {
117 self.pool_metrics.write().await.remove(agent_id);
118 }
119
120 pub fn handle_metrics_request(&self) -> MetricsResponse {
126 if !self.enabled {
127 return MetricsResponse::not_found();
128 }
129
130 let mut body = self.aggregator.export_prometheus();
132
133 if let Ok(pool_metrics) = self.pool_metrics.try_read() {
136 for (agent_id, collector) in pool_metrics.iter() {
137 let pool_output = collector.export_prometheus();
138 if !pool_output.is_empty() {
139 body.push_str(&format!("\n# Agent pool metrics: {}\n", agent_id));
141 body.push_str(&pool_output);
142 }
143 }
144 }
145
146 MetricsResponse::ok(body)
147 }
148
149 pub fn inc_requests_total(&self, method: &str, status: u16, route: &str) {
155 let mut labels = HashMap::new();
156 labels.insert("method".to_string(), method.to_string());
157 labels.insert("status".to_string(), status.to_string());
158 labels.insert("route".to_string(), route.to_string());
159
160 self.aggregator.increment_counter(
161 "sentinel_requests_total",
162 "Total HTTP requests handled by the proxy",
163 labels,
164 1,
165 );
166 }
167
168 pub fn observe_request_duration(&self, method: &str, route: &str, duration_secs: f64) {
170 let mut labels = HashMap::new();
171 labels.insert("method".to_string(), method.to_string());
172 labels.insert("route".to_string(), route.to_string());
173
174 let buckets = vec![
176 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
177 ];
178
179 self.aggregator.observe_histogram(
180 "sentinel_request_duration_seconds",
181 "HTTP request duration in seconds",
182 labels,
183 &buckets,
184 duration_secs,
185 );
186 }
187
188 pub fn set_active_connections(&self, count: f64) {
190 self.aggregator.set_gauge(
191 "sentinel_active_connections",
192 "Number of active client connections",
193 HashMap::new(),
194 count,
195 );
196 }
197
198 pub fn set_active_requests(&self, count: f64) {
200 self.aggregator.set_gauge(
201 "sentinel_active_requests",
202 "Number of requests currently being processed",
203 HashMap::new(),
204 count,
205 );
206 }
207
208 pub fn inc_upstream_requests(&self, upstream: &str, status: u16, success: bool) {
210 let mut labels = HashMap::new();
211 labels.insert("upstream".to_string(), upstream.to_string());
212 labels.insert("status".to_string(), status.to_string());
213 labels.insert("success".to_string(), success.to_string());
214
215 self.aggregator.increment_counter(
216 "sentinel_upstream_requests_total",
217 "Total requests to upstream servers",
218 labels,
219 1,
220 );
221 }
222
223 pub fn observe_upstream_duration(&self, upstream: &str, duration_secs: f64) {
225 let mut labels = HashMap::new();
226 labels.insert("upstream".to_string(), upstream.to_string());
227
228 let buckets = vec![
229 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
230 ];
231
232 self.aggregator.observe_histogram(
233 "sentinel_upstream_duration_seconds",
234 "Time spent waiting for upstream response",
235 labels,
236 &buckets,
237 duration_secs,
238 );
239 }
240
241 pub fn inc_agent_requests(&self, agent: &str, decision: &str) {
243 let mut labels = HashMap::new();
244 labels.insert("agent".to_string(), agent.to_string());
245 labels.insert("decision".to_string(), decision.to_string());
246
247 self.aggregator.increment_counter(
248 "sentinel_agent_requests_total",
249 "Total requests processed by agents",
250 labels,
251 1,
252 );
253 }
254
255 pub fn observe_agent_duration(&self, agent: &str, duration_secs: f64) {
257 let mut labels = HashMap::new();
258 labels.insert("agent".to_string(), agent.to_string());
259
260 let buckets = vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0];
261
262 self.aggregator.observe_histogram(
263 "sentinel_agent_duration_seconds",
264 "Time spent processing request in agent",
265 labels,
266 &buckets,
267 duration_secs,
268 );
269 }
270
271 pub fn inc_circuit_breaker_trips(&self, upstream: &str) {
273 let mut labels = HashMap::new();
274 labels.insert("upstream".to_string(), upstream.to_string());
275
276 self.aggregator.increment_counter(
277 "sentinel_circuit_breaker_trips_total",
278 "Number of times circuit breaker has tripped",
279 labels,
280 1,
281 );
282 }
283
284 pub fn set_circuit_breaker_state(&self, upstream: &str, open: bool) {
286 let mut labels = HashMap::new();
287 labels.insert("upstream".to_string(), upstream.to_string());
288
289 self.aggregator.set_gauge(
290 "sentinel_circuit_breaker_open",
291 "Whether circuit breaker is open (1) or closed (0)",
292 labels,
293 if open { 1.0 } else { 0.0 },
294 );
295 }
296
297 pub fn inc_rate_limited(&self, route: &str) {
299 let mut labels = HashMap::new();
300 labels.insert("route".to_string(), route.to_string());
301
302 self.aggregator.increment_counter(
303 "sentinel_rate_limited_total",
304 "Total requests rate limited",
305 labels,
306 1,
307 );
308 }
309
310 pub fn inc_cache_access(&self, hit: bool) {
312 let mut labels = HashMap::new();
313 labels.insert(
314 "result".to_string(),
315 if hit { "hit" } else { "miss" }.to_string(),
316 );
317
318 self.aggregator.increment_counter(
319 "sentinel_cache_accesses_total",
320 "Total cache accesses",
321 labels,
322 1,
323 );
324 }
325
326 pub fn set_cache_size(&self, size_bytes: f64) {
328 self.aggregator.set_gauge(
329 "sentinel_cache_size_bytes",
330 "Current cache size in bytes",
331 HashMap::new(),
332 size_bytes,
333 );
334 }
335}
336
337#[derive(Debug)]
339pub struct MetricsResponse {
340 pub status: u16,
342 pub content_type: String,
344 pub body: String,
346}
347
348impl MetricsResponse {
349 pub fn ok(body: String) -> Self {
351 Self {
352 status: 200,
353 content_type: "text/plain; version=0.0.4; charset=utf-8".to_string(),
354 body,
355 }
356 }
357
358 pub fn not_found() -> Self {
360 Self {
361 status: 404,
362 content_type: "text/plain".to_string(),
363 body: "Metrics not found".to_string(),
364 }
365 }
366
367 pub fn forbidden() -> Self {
369 Self {
370 status: 403,
371 content_type: "text/plain".to_string(),
372 body: "Forbidden".to_string(),
373 }
374 }
375
376 pub fn to_header(&self) -> ResponseHeader {
378 let mut header = ResponseHeader::build(self.status, Some(2)).unwrap();
379 header
380 .append_header("Content-Type", &self.content_type)
381 .ok();
382 header
383 .append_header("Content-Length", self.body.len().to_string())
384 .ok();
385 header
386 }
387}
388
389pub mod standard {
391 pub const REQUESTS_TOTAL: &str = "sentinel_requests_total";
393 pub const REQUEST_DURATION: &str = "sentinel_request_duration_seconds";
395 pub const ACTIVE_CONNECTIONS: &str = "sentinel_active_connections";
397 pub const ACTIVE_REQUESTS: &str = "sentinel_active_requests";
399 pub const UPSTREAM_REQUESTS: &str = "sentinel_upstream_requests_total";
401 pub const UPSTREAM_DURATION: &str = "sentinel_upstream_duration_seconds";
403 pub const AGENT_REQUESTS: &str = "sentinel_agent_requests_total";
405 pub const AGENT_DURATION: &str = "sentinel_agent_duration_seconds";
407 pub const CIRCUIT_BREAKER_TRIPS: &str = "sentinel_circuit_breaker_trips_total";
409 pub const CIRCUIT_BREAKER_OPEN: &str = "sentinel_circuit_breaker_open";
411 pub const RATE_LIMITED: &str = "sentinel_rate_limited_total";
413 pub const CACHE_ACCESSES: &str = "sentinel_cache_accesses_total";
415 pub const CACHE_SIZE: &str = "sentinel_cache_size_bytes";
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422
423 #[test]
424 fn test_metrics_manager_creation() {
425 let manager = MetricsManager::new("test-service", "node-1");
426 assert!(manager.is_enabled());
427 assert_eq!(manager.metrics_path(), "/metrics");
428 }
429
430 #[test]
431 fn test_metrics_manager_disabled() {
432 let manager = MetricsManager::new("test", "1").disable();
433 assert!(!manager.is_enabled());
434
435 let response = manager.handle_metrics_request();
436 assert_eq!(response.status, 404);
437 }
438
439 #[test]
440 fn test_metrics_manager_ip_filtering() {
441 let manager = MetricsManager::new("test", "1")
442 .allowed_ips(vec!["127.0.0.1".to_string(), "10.0.0.1".to_string()]);
443
444 assert!(manager.is_ip_allowed("127.0.0.1"));
445 assert!(manager.is_ip_allowed("10.0.0.1"));
446 assert!(!manager.is_ip_allowed("192.168.1.1"));
447 }
448
449 #[test]
450 fn test_metrics_manager_all_ips_allowed() {
451 let manager = MetricsManager::new("test", "1");
452
453 assert!(manager.is_ip_allowed("127.0.0.1"));
455 assert!(manager.is_ip_allowed("192.168.1.1"));
456 assert!(manager.is_ip_allowed("any-ip"));
457 }
458
459 #[test]
460 fn test_metrics_response() {
461 let manager = MetricsManager::new("test", "node-1");
462
463 manager.inc_requests_total("GET", 200, "/api/users");
465 manager.set_active_connections(42.0);
466
467 let response = manager.handle_metrics_request();
468 assert_eq!(response.status, 200);
469 assert!(response.content_type.contains("text/plain"));
470 assert!(response.body.contains("sentinel_requests_total"));
471 assert!(response.body.contains("sentinel_active_connections"));
472 assert!(response.body.contains("sentinel_info"));
473 }
474
475 #[test]
476 fn test_request_duration_histogram() {
477 let manager = MetricsManager::new("test", "1");
478
479 manager.observe_request_duration("GET", "/api", 0.05);
480 manager.observe_request_duration("GET", "/api", 0.15);
481 manager.observe_request_duration("GET", "/api", 0.5);
482
483 let response = manager.handle_metrics_request();
484 assert!(response
485 .body
486 .contains("sentinel_request_duration_seconds_bucket"));
487 assert!(response
488 .body
489 .contains("sentinel_request_duration_seconds_sum"));
490 assert!(response
491 .body
492 .contains("sentinel_request_duration_seconds_count"));
493 assert!(response.body.contains("} 3\n") || response.body.contains(" 3\n"));
495 }
496
497 #[test]
498 fn test_custom_path() {
499 let manager = MetricsManager::new("test", "1").path("/internal/metrics");
500 assert_eq!(manager.metrics_path(), "/internal/metrics");
501 }
502
503 #[test]
504 fn test_upstream_metrics() {
505 let manager = MetricsManager::new("test", "1");
506
507 manager.inc_upstream_requests("backend-1", 200, true);
508 manager.observe_upstream_duration("backend-1", 0.1);
509
510 let response = manager.handle_metrics_request();
511 assert!(response.body.contains("sentinel_upstream_requests_total"));
512 assert!(response.body.contains("sentinel_upstream_duration_seconds"));
513 }
514
515 #[test]
516 fn test_agent_metrics() {
517 let manager = MetricsManager::new("test", "1");
518
519 manager.inc_agent_requests("waf", "allow");
520 manager.observe_agent_duration("waf", 0.005);
521
522 let response = manager.handle_metrics_request();
523 assert!(response.body.contains("sentinel_agent_requests_total"));
524 assert!(response.body.contains("sentinel_agent_duration_seconds"));
525 }
526}