1use pingora_http::ResponseHeader;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13use grapsus_agent_protocol::v2::{MetricsCollector, UnifiedMetricsAggregator};
14
15pub struct MetricsManager {
20 aggregator: Arc<UnifiedMetricsAggregator>,
22 enabled: bool,
24 path: String,
26 allowed_ips: Vec<String>,
28 pool_metrics: RwLock<HashMap<String, Arc<MetricsCollector>>>,
30}
31
32impl MetricsManager {
33 pub fn new(service_name: impl Into<String>, instance_id: impl Into<String>) -> Self {
35 Self {
36 aggregator: Arc::new(UnifiedMetricsAggregator::new(service_name, instance_id)),
37 enabled: true,
38 path: "/metrics".to_string(),
39 allowed_ips: Vec::new(),
40 pool_metrics: RwLock::new(HashMap::new()),
41 }
42 }
43
44 pub fn with_aggregator(aggregator: Arc<UnifiedMetricsAggregator>) -> Self {
46 Self {
47 aggregator,
48 enabled: true,
49 path: "/metrics".to_string(),
50 allowed_ips: Vec::new(),
51 pool_metrics: RwLock::new(HashMap::new()),
52 }
53 }
54
55 pub fn from_config(
61 config: &grapsus_config::MetricsConfig,
62 service_name: impl Into<String>,
63 instance_id: impl Into<String>,
64 ) -> Self {
65 let mut manager = Self::new(service_name, instance_id);
66 manager.enabled = config.enabled;
67 manager.path = config.path.clone();
68 manager
69 }
70
71 pub fn path(mut self, path: impl Into<String>) -> Self {
73 self.path = path.into();
74 self
75 }
76
77 pub fn allowed_ips(mut self, ips: Vec<String>) -> Self {
79 self.allowed_ips = ips;
80 self
81 }
82
83 pub fn disable(mut self) -> Self {
85 self.enabled = false;
86 self
87 }
88
89 pub fn is_enabled(&self) -> bool {
91 self.enabled
92 }
93
94 pub fn metrics_path(&self) -> &str {
96 &self.path
97 }
98
99 pub fn aggregator(&self) -> &UnifiedMetricsAggregator {
101 &self.aggregator
102 }
103
104 pub fn aggregator_arc(&self) -> Arc<UnifiedMetricsAggregator> {
106 Arc::clone(&self.aggregator)
107 }
108
109 pub fn is_ip_allowed(&self, ip: &str) -> bool {
111 if self.allowed_ips.is_empty() {
112 return true;
113 }
114 self.allowed_ips.iter().any(|allowed| allowed == ip)
115 }
116
117 pub async fn register_pool_metrics(
121 &self,
122 agent_id: impl Into<String>,
123 collector: Arc<MetricsCollector>,
124 ) {
125 self.pool_metrics
126 .write()
127 .await
128 .insert(agent_id.into(), collector);
129 }
130
131 pub async fn unregister_pool_metrics(&self, agent_id: &str) {
133 self.pool_metrics.write().await.remove(agent_id);
134 }
135
136 pub fn handle_metrics_request(&self) -> MetricsResponse {
142 if !self.enabled {
143 return MetricsResponse::not_found();
144 }
145
146 let mut body = self.aggregator.export_prometheus();
148
149 if let Ok(pool_metrics) = self.pool_metrics.try_read() {
152 for (agent_id, collector) in pool_metrics.iter() {
153 let pool_output = collector.export_prometheus();
154 if !pool_output.is_empty() {
155 body.push_str(&format!("\n# Agent pool metrics: {}\n", agent_id));
157 body.push_str(&pool_output);
158 }
159 }
160 }
161
162 MetricsResponse::ok(body)
163 }
164
165 pub fn inc_requests_total(&self, method: &str, status: u16, route: &str) {
171 let mut labels = HashMap::new();
172 labels.insert("method".to_string(), method.to_string());
173 labels.insert("status".to_string(), status.to_string());
174 labels.insert("route".to_string(), route.to_string());
175
176 self.aggregator.increment_counter(
177 "grapsus_requests_total",
178 "Total HTTP requests handled by the proxy",
179 labels,
180 1,
181 );
182 }
183
184 pub fn observe_request_duration(&self, method: &str, route: &str, duration_secs: f64) {
186 let mut labels = HashMap::new();
187 labels.insert("method".to_string(), method.to_string());
188 labels.insert("route".to_string(), route.to_string());
189
190 let buckets = vec![
192 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
193 ];
194
195 self.aggregator.observe_histogram(
196 "grapsus_request_duration_seconds",
197 "HTTP request duration in seconds",
198 labels,
199 &buckets,
200 duration_secs,
201 );
202 }
203
204 pub fn set_active_connections(&self, count: f64) {
206 self.aggregator.set_gauge(
207 "grapsus_active_connections",
208 "Number of active client connections",
209 HashMap::new(),
210 count,
211 );
212 }
213
214 pub fn set_active_requests(&self, count: f64) {
216 self.aggregator.set_gauge(
217 "grapsus_active_requests",
218 "Number of requests currently being processed",
219 HashMap::new(),
220 count,
221 );
222 }
223
224 pub fn inc_upstream_requests(&self, upstream: &str, status: u16, success: bool) {
226 let mut labels = HashMap::new();
227 labels.insert("upstream".to_string(), upstream.to_string());
228 labels.insert("status".to_string(), status.to_string());
229 labels.insert("success".to_string(), success.to_string());
230
231 self.aggregator.increment_counter(
232 "grapsus_upstream_requests_total",
233 "Total requests to upstream servers",
234 labels,
235 1,
236 );
237 }
238
239 pub fn observe_upstream_duration(&self, upstream: &str, duration_secs: f64) {
241 let mut labels = HashMap::new();
242 labels.insert("upstream".to_string(), upstream.to_string());
243
244 let buckets = vec![
245 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
246 ];
247
248 self.aggregator.observe_histogram(
249 "grapsus_upstream_duration_seconds",
250 "Time spent waiting for upstream response",
251 labels,
252 &buckets,
253 duration_secs,
254 );
255 }
256
257 pub fn inc_agent_requests(&self, agent: &str, decision: &str) {
259 let mut labels = HashMap::new();
260 labels.insert("agent".to_string(), agent.to_string());
261 labels.insert("decision".to_string(), decision.to_string());
262
263 self.aggregator.increment_counter(
264 "grapsus_agent_requests_total",
265 "Total requests processed by agents",
266 labels,
267 1,
268 );
269 }
270
271 pub fn observe_agent_duration(&self, agent: &str, duration_secs: f64) {
273 let mut labels = HashMap::new();
274 labels.insert("agent".to_string(), agent.to_string());
275
276 let buckets = vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0];
277
278 self.aggregator.observe_histogram(
279 "grapsus_agent_duration_seconds",
280 "Time spent processing request in agent",
281 labels,
282 &buckets,
283 duration_secs,
284 );
285 }
286
287 pub fn inc_circuit_breaker_trips(&self, upstream: &str) {
289 let mut labels = HashMap::new();
290 labels.insert("upstream".to_string(), upstream.to_string());
291
292 self.aggregator.increment_counter(
293 "grapsus_circuit_breaker_trips_total",
294 "Number of times circuit breaker has tripped",
295 labels,
296 1,
297 );
298 }
299
300 pub fn set_circuit_breaker_state(&self, upstream: &str, open: bool) {
302 let mut labels = HashMap::new();
303 labels.insert("upstream".to_string(), upstream.to_string());
304
305 self.aggregator.set_gauge(
306 "grapsus_circuit_breaker_open",
307 "Whether circuit breaker is open (1) or closed (0)",
308 labels,
309 if open { 1.0 } else { 0.0 },
310 );
311 }
312
313 pub fn inc_rate_limited(&self, route: &str) {
315 let mut labels = HashMap::new();
316 labels.insert("route".to_string(), route.to_string());
317
318 self.aggregator.increment_counter(
319 "grapsus_rate_limited_total",
320 "Total requests rate limited",
321 labels,
322 1,
323 );
324 }
325
326 pub fn inc_cache_access(&self, hit: bool) {
328 let mut labels = HashMap::new();
329 labels.insert(
330 "result".to_string(),
331 if hit { "hit" } else { "miss" }.to_string(),
332 );
333
334 self.aggregator.increment_counter(
335 "grapsus_cache_accesses_total",
336 "Total cache accesses",
337 labels,
338 1,
339 );
340 }
341
342 pub fn set_cache_size(&self, size_bytes: f64) {
344 self.aggregator.set_gauge(
345 "grapsus_cache_size_bytes",
346 "Current cache size in bytes",
347 HashMap::new(),
348 size_bytes,
349 );
350 }
351}
352
353#[derive(Debug)]
355pub struct MetricsResponse {
356 pub status: u16,
358 pub content_type: String,
360 pub body: String,
362}
363
364impl MetricsResponse {
365 pub fn ok(body: String) -> Self {
367 Self {
368 status: 200,
369 content_type: "text/plain; version=0.0.4; charset=utf-8".to_string(),
370 body,
371 }
372 }
373
374 pub fn not_found() -> Self {
376 Self {
377 status: 404,
378 content_type: "text/plain".to_string(),
379 body: "Metrics not found".to_string(),
380 }
381 }
382
383 pub fn forbidden() -> Self {
385 Self {
386 status: 403,
387 content_type: "text/plain".to_string(),
388 body: "Forbidden".to_string(),
389 }
390 }
391
392 pub fn to_header(&self) -> ResponseHeader {
394 let mut header = ResponseHeader::build(self.status, Some(2)).unwrap();
395 header
396 .append_header("Content-Type", &self.content_type)
397 .ok();
398 header
399 .append_header("Content-Length", self.body.len().to_string())
400 .ok();
401 header
402 }
403}
404
405pub mod standard {
407 pub const REQUESTS_TOTAL: &str = "grapsus_requests_total";
409 pub const REQUEST_DURATION: &str = "grapsus_request_duration_seconds";
411 pub const ACTIVE_CONNECTIONS: &str = "grapsus_active_connections";
413 pub const ACTIVE_REQUESTS: &str = "grapsus_active_requests";
415 pub const UPSTREAM_REQUESTS: &str = "grapsus_upstream_requests_total";
417 pub const UPSTREAM_DURATION: &str = "grapsus_upstream_duration_seconds";
419 pub const AGENT_REQUESTS: &str = "grapsus_agent_requests_total";
421 pub const AGENT_DURATION: &str = "grapsus_agent_duration_seconds";
423 pub const CIRCUIT_BREAKER_TRIPS: &str = "grapsus_circuit_breaker_trips_total";
425 pub const CIRCUIT_BREAKER_OPEN: &str = "grapsus_circuit_breaker_open";
427 pub const RATE_LIMITED: &str = "grapsus_rate_limited_total";
429 pub const CACHE_ACCESSES: &str = "grapsus_cache_accesses_total";
431 pub const CACHE_SIZE: &str = "grapsus_cache_size_bytes";
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438
439 #[test]
440 fn test_metrics_manager_creation() {
441 let manager = MetricsManager::new("test-service", "node-1");
442 assert!(manager.is_enabled());
443 assert_eq!(manager.metrics_path(), "/metrics");
444 }
445
446 #[test]
447 fn test_metrics_manager_disabled() {
448 let manager = MetricsManager::new("test", "1").disable();
449 assert!(!manager.is_enabled());
450
451 let response = manager.handle_metrics_request();
452 assert_eq!(response.status, 404);
453 }
454
455 #[test]
456 fn test_metrics_manager_ip_filtering() {
457 let manager = MetricsManager::new("test", "1")
458 .allowed_ips(vec!["127.0.0.1".to_string(), "10.0.0.1".to_string()]);
459
460 assert!(manager.is_ip_allowed("127.0.0.1"));
461 assert!(manager.is_ip_allowed("10.0.0.1"));
462 assert!(!manager.is_ip_allowed("192.168.1.1"));
463 }
464
465 #[test]
466 fn test_metrics_manager_all_ips_allowed() {
467 let manager = MetricsManager::new("test", "1");
468
469 assert!(manager.is_ip_allowed("127.0.0.1"));
471 assert!(manager.is_ip_allowed("192.168.1.1"));
472 assert!(manager.is_ip_allowed("any-ip"));
473 }
474
475 #[test]
476 fn test_metrics_response() {
477 let manager = MetricsManager::new("test", "node-1");
478
479 manager.inc_requests_total("GET", 200, "/api/users");
481 manager.set_active_connections(42.0);
482
483 let response = manager.handle_metrics_request();
484 assert_eq!(response.status, 200);
485 assert!(response.content_type.contains("text/plain"));
486 assert!(response.body.contains("grapsus_requests_total"));
487 assert!(response.body.contains("grapsus_active_connections"));
488 assert!(response.body.contains("grapsus_info"));
489 }
490
491 #[test]
492 fn test_request_duration_histogram() {
493 let manager = MetricsManager::new("test", "1");
494
495 manager.observe_request_duration("GET", "/api", 0.05);
496 manager.observe_request_duration("GET", "/api", 0.15);
497 manager.observe_request_duration("GET", "/api", 0.5);
498
499 let response = manager.handle_metrics_request();
500 assert!(response
501 .body
502 .contains("grapsus_request_duration_seconds_bucket"));
503 assert!(response
504 .body
505 .contains("grapsus_request_duration_seconds_sum"));
506 assert!(response
507 .body
508 .contains("grapsus_request_duration_seconds_count"));
509 assert!(response.body.contains("} 3\n") || response.body.contains(" 3\n"));
511 }
512
513 #[test]
514 fn test_custom_path() {
515 let manager = MetricsManager::new("test", "1").path("/internal/metrics");
516 assert_eq!(manager.metrics_path(), "/internal/metrics");
517 }
518
519 #[test]
520 fn test_upstream_metrics() {
521 let manager = MetricsManager::new("test", "1");
522
523 manager.inc_upstream_requests("backend-1", 200, true);
524 manager.observe_upstream_duration("backend-1", 0.1);
525
526 let response = manager.handle_metrics_request();
527 assert!(response.body.contains("grapsus_upstream_requests_total"));
528 assert!(response.body.contains("grapsus_upstream_duration_seconds"));
529 }
530
531 #[test]
532 fn test_agent_metrics() {
533 let manager = MetricsManager::new("test", "1");
534
535 manager.inc_agent_requests("waf", "allow");
536 manager.observe_agent_duration("waf", 0.005);
537
538 let response = manager.handle_metrics_request();
539 assert!(response.body.contains("grapsus_agent_requests_total"));
540 assert!(response.body.contains("grapsus_agent_duration_seconds"));
541 }
542}