1use anyhow::{Context, Result};
7use prometheus::{
8 register_counter_vec, register_gauge, register_histogram_vec,
9 register_int_counter_vec, register_int_gauge, register_int_gauge_vec, CounterVec, Gauge, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec,
10};
11use std::time::Duration;
12use tracing::{error, info};
13use tracing_subscriber::{fmt, prelude::*, EnvFilter};
14
15pub fn init_tracing() -> Result<()> {
17 let json_layer =
19 if std::env::var("SENTINEL_LOG_FORMAT").unwrap_or_else(|_| "json".to_string()) == "json" {
20 Some(
21 fmt::layer()
22 .json()
23 .with_target(true)
24 .with_thread_ids(true)
25 .with_thread_names(true)
26 .with_file(true)
27 .with_line_number(true),
28 )
29 } else {
30 None
31 };
32
33 let pretty_layer = if std::env::var("SENTINEL_LOG_FORMAT")
35 .unwrap_or_else(|_| "json".to_string())
36 == "pretty"
37 {
38 Some(
39 fmt::layer()
40 .pretty()
41 .with_target(true)
42 .with_thread_ids(true)
43 .with_thread_names(true)
44 .with_file(true)
45 .with_line_number(true),
46 )
47 } else {
48 None
49 };
50
51 let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
53
54 tracing_subscriber::registry()
55 .with(env_filter)
56 .with(json_layer)
57 .with(pretty_layer)
58 .init();
59
60 info!("Tracing initialized");
61 Ok(())
62}
63
64pub struct RequestMetrics {
66 request_duration: HistogramVec,
68 request_count: IntCounterVec,
70 active_requests: IntGauge,
72 upstream_attempts: IntCounterVec,
74 upstream_failures: IntCounterVec,
76 circuit_breaker_state: IntGaugeVec,
78 agent_latency: HistogramVec,
80 agent_timeouts: IntCounterVec,
82 blocked_requests: CounterVec,
84 request_body_size: HistogramVec,
86 response_body_size: HistogramVec,
88 tls_handshake_duration: HistogramVec,
90 connection_pool_size: IntGaugeVec,
92 connection_pool_idle: IntGaugeVec,
93 connection_pool_acquired: IntCounterVec,
94 memory_usage: IntGauge,
96 cpu_usage: Gauge,
97 open_connections: IntGauge,
98}
99
100impl RequestMetrics {
101 pub fn new() -> Result<Self> {
103 let latency_buckets = vec![
105 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
106 ];
107
108 let size_buckets = vec![
110 100.0,
111 1_000.0,
112 10_000.0,
113 100_000.0,
114 1_000_000.0,
115 10_000_000.0,
116 100_000_000.0,
117 ];
118
119 let request_duration = register_histogram_vec!(
120 "sentinel_request_duration_seconds",
121 "Request duration in seconds",
122 &["route", "method"],
123 latency_buckets.clone()
124 )
125 .context("Failed to register request_duration metric")?;
126
127 let request_count = register_int_counter_vec!(
128 "sentinel_requests_total",
129 "Total number of requests",
130 &["route", "method", "status"]
131 )
132 .context("Failed to register request_count metric")?;
133
134 let active_requests = register_int_gauge!(
135 "sentinel_active_requests",
136 "Number of currently active requests"
137 )
138 .context("Failed to register active_requests metric")?;
139
140 let upstream_attempts = register_int_counter_vec!(
141 "sentinel_upstream_attempts_total",
142 "Total upstream connection attempts",
143 &["upstream", "route"]
144 )
145 .context("Failed to register upstream_attempts metric")?;
146
147 let upstream_failures = register_int_counter_vec!(
148 "sentinel_upstream_failures_total",
149 "Total upstream connection failures",
150 &["upstream", "route", "reason"]
151 )
152 .context("Failed to register upstream_failures metric")?;
153
154 let circuit_breaker_state = register_int_gauge_vec!(
155 "sentinel_circuit_breaker_state",
156 "Circuit breaker state (0=closed, 1=open)",
157 &["component", "route"]
158 )
159 .context("Failed to register circuit_breaker_state metric")?;
160
161 let agent_latency = register_histogram_vec!(
162 "sentinel_agent_latency_seconds",
163 "Agent call latency in seconds",
164 &["agent", "event"],
165 latency_buckets.clone()
166 )
167 .context("Failed to register agent_latency metric")?;
168
169 let agent_timeouts = register_int_counter_vec!(
170 "sentinel_agent_timeouts_total",
171 "Total agent call timeouts",
172 &["agent", "event"]
173 )
174 .context("Failed to register agent_timeouts metric")?;
175
176 let blocked_requests = register_counter_vec!(
177 "sentinel_blocked_requests_total",
178 "Total blocked requests by reason",
179 &["reason"]
180 )
181 .context("Failed to register blocked_requests metric")?;
182
183 let request_body_size = register_histogram_vec!(
184 "sentinel_request_body_size_bytes",
185 "Request body size in bytes",
186 &["route"],
187 size_buckets.clone()
188 )
189 .context("Failed to register request_body_size metric")?;
190
191 let response_body_size = register_histogram_vec!(
192 "sentinel_response_body_size_bytes",
193 "Response body size in bytes",
194 &["route"],
195 size_buckets.clone()
196 )
197 .context("Failed to register response_body_size metric")?;
198
199 let tls_handshake_duration = register_histogram_vec!(
200 "sentinel_tls_handshake_duration_seconds",
201 "TLS handshake duration in seconds",
202 &["version"],
203 latency_buckets
204 )
205 .context("Failed to register tls_handshake_duration metric")?;
206
207 let connection_pool_size = register_int_gauge_vec!(
208 "sentinel_connection_pool_size",
209 "Total connections in pool",
210 &["upstream"]
211 )
212 .context("Failed to register connection_pool_size metric")?;
213
214 let connection_pool_idle = register_int_gauge_vec!(
215 "sentinel_connection_pool_idle",
216 "Idle connections in pool",
217 &["upstream"]
218 )
219 .context("Failed to register connection_pool_idle metric")?;
220
221 let connection_pool_acquired = register_int_counter_vec!(
222 "sentinel_connection_pool_acquired_total",
223 "Total connections acquired from pool",
224 &["upstream"]
225 )
226 .context("Failed to register connection_pool_acquired metric")?;
227
228 let memory_usage = register_int_gauge!(
229 "sentinel_memory_usage_bytes",
230 "Current memory usage in bytes"
231 )
232 .context("Failed to register memory_usage metric")?;
233
234 let cpu_usage =
235 register_gauge!("sentinel_cpu_usage_percent", "Current CPU usage percentage")
236 .context("Failed to register cpu_usage metric")?;
237
238 let open_connections =
239 register_int_gauge!("sentinel_open_connections", "Number of open connections")
240 .context("Failed to register open_connections metric")?;
241
242 Ok(Self {
243 request_duration,
244 request_count,
245 active_requests,
246 upstream_attempts,
247 upstream_failures,
248 circuit_breaker_state,
249 agent_latency,
250 agent_timeouts,
251 blocked_requests,
252 request_body_size,
253 response_body_size,
254 tls_handshake_duration,
255 connection_pool_size,
256 connection_pool_idle,
257 connection_pool_acquired,
258 memory_usage,
259 cpu_usage,
260 open_connections,
261 })
262 }
263
264 pub fn record_request(&self, route: &str, method: &str, status: u16, duration: Duration) {
266 self.request_duration
267 .with_label_values(&[route, method])
268 .observe(duration.as_secs_f64());
269
270 self.request_count
271 .with_label_values(&[route, method, &status.to_string()])
272 .inc();
273 }
274
275 pub fn inc_active_requests(&self) {
277 self.active_requests.inc();
278 }
279
280 pub fn dec_active_requests(&self) {
282 self.active_requests.dec();
283 }
284
285 pub fn record_upstream_attempt(&self, upstream: &str, route: &str) {
287 self.upstream_attempts
288 .with_label_values(&[upstream, route])
289 .inc();
290 }
291
292 pub fn record_upstream_failure(&self, upstream: &str, route: &str, reason: &str) {
294 self.upstream_failures
295 .with_label_values(&[upstream, route, reason])
296 .inc();
297 }
298
299 pub fn set_circuit_breaker_state(&self, component: &str, route: &str, is_open: bool) {
301 let state = if is_open { 1 } else { 0 };
302 self.circuit_breaker_state
303 .with_label_values(&[component, route])
304 .set(state);
305 }
306
307 pub fn record_agent_latency(&self, agent: &str, event: &str, duration: Duration) {
309 self.agent_latency
310 .with_label_values(&[agent, event])
311 .observe(duration.as_secs_f64());
312 }
313
314 pub fn record_agent_timeout(&self, agent: &str, event: &str) {
316 self.agent_timeouts.with_label_values(&[agent, event]).inc();
317 }
318
319 pub fn record_blocked_request(&self, reason: &str) {
321 self.blocked_requests.with_label_values(&[reason]).inc();
322 }
323
324 pub fn record_request_body_size(&self, route: &str, size_bytes: usize) {
326 self.request_body_size
327 .with_label_values(&[route])
328 .observe(size_bytes as f64);
329 }
330
331 pub fn record_response_body_size(&self, route: &str, size_bytes: usize) {
333 self.response_body_size
334 .with_label_values(&[route])
335 .observe(size_bytes as f64);
336 }
337
338 pub fn record_tls_handshake(&self, version: &str, duration: Duration) {
340 self.tls_handshake_duration
341 .with_label_values(&[version])
342 .observe(duration.as_secs_f64());
343 }
344
345 pub fn update_connection_pool(&self, upstream: &str, size: i64, idle: i64) {
347 self.connection_pool_size
348 .with_label_values(&[upstream])
349 .set(size);
350 self.connection_pool_idle
351 .with_label_values(&[upstream])
352 .set(idle);
353 }
354
355 pub fn record_connection_acquired(&self, upstream: &str) {
357 self.connection_pool_acquired
358 .with_label_values(&[upstream])
359 .inc();
360 }
361
362 pub fn update_system_metrics(&self) {
364 use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System};
365
366 let mut system = System::new_with_specifics(
368 RefreshKind::new()
369 .with_cpu(CpuRefreshKind::everything())
370 .with_memory(MemoryRefreshKind::everything()),
371 );
372
373 self.memory_usage.set(system.total_memory() as i64);
375
376 system.refresh_cpu_usage();
378 self.cpu_usage.set(system.global_cpu_usage() as f64);
379 }
380
381 pub fn set_open_connections(&self, count: i64) {
383 self.open_connections.set(count);
384 }
385}
386
387#[derive(Debug, serde::Serialize)]
389pub struct AuditLogEntry {
390 pub timestamp: String,
391 pub correlation_id: String,
392 pub event_type: String,
393 pub route: Option<String>,
394 pub client_addr: Option<String>,
395 pub user_agent: Option<String>,
396 pub method: String,
397 pub path: String,
398 pub status: Option<u16>,
399 pub duration_ms: u64,
400 pub upstream: Option<String>,
401 pub waf_decision: Option<WafDecision>,
402 pub agent_decisions: Vec<AgentDecision>,
403 pub error: Option<String>,
404 pub tags: Vec<String>,
405}
406
407#[derive(Debug, serde::Serialize)]
409pub struct WafDecision {
410 pub action: String,
411 pub rule_ids: Vec<String>,
412 pub confidence: f32,
413 pub reason: String,
414 pub matched_data: Option<String>,
415}
416
417#[derive(Debug, serde::Serialize)]
419pub struct AgentDecision {
420 pub agent_name: String,
421 pub event: String,
422 pub action: String,
423 pub latency_ms: u64,
424 pub metadata: serde_json::Value,
425}
426
427impl AuditLogEntry {
428 pub fn new(correlation_id: String, method: String, path: String) -> Self {
430 Self {
431 timestamp: chrono::Utc::now().to_rfc3339(),
432 correlation_id,
433 event_type: "request".to_string(),
434 route: None,
435 client_addr: None,
436 user_agent: None,
437 method,
438 path,
439 status: None,
440 duration_ms: 0,
441 upstream: None,
442 waf_decision: None,
443 agent_decisions: vec![],
444 error: None,
445 tags: vec![],
446 }
447 }
448
449 pub fn write(&self) {
451 match serde_json::to_string(self) {
452 Ok(json) => println!("AUDIT: {}", json),
453 Err(e) => error!("Failed to serialize audit log: {}", e),
454 }
455 }
456}
457
458#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
460pub enum HealthStatus {
461 Healthy,
462 Degraded,
463 Unhealthy,
464}
465
466#[derive(Debug, Clone, serde::Serialize)]
468pub struct ComponentHealth {
469 pub name: String,
470 pub status: HealthStatus,
471 pub last_check: chrono::DateTime<chrono::Utc>,
472 pub consecutive_failures: u32,
473 pub error_message: Option<String>,
474}
475
476pub struct ComponentHealthTracker {
481 components: parking_lot::RwLock<Vec<ComponentHealth>>,
482}
483
484impl ComponentHealthTracker {
485 pub fn new() -> Self {
487 Self {
488 components: parking_lot::RwLock::new(vec![]),
489 }
490 }
491
492 pub fn update_component(&self, name: String, status: HealthStatus, error: Option<String>) {
494 let mut components = self.components.write();
495
496 if let Some(component) = components.iter_mut().find(|c| c.name == name) {
497 component.status = status;
498 component.last_check = chrono::Utc::now();
499 component.error_message = error;
500
501 if status != HealthStatus::Healthy {
502 component.consecutive_failures += 1;
503 } else {
504 component.consecutive_failures = 0;
505 }
506 } else {
507 components.push(ComponentHealth {
508 name,
509 status,
510 last_check: chrono::Utc::now(),
511 consecutive_failures: if status != HealthStatus::Healthy {
512 1
513 } else {
514 0
515 },
516 error_message: error,
517 });
518 }
519 }
520
521 pub fn get_status(&self) -> HealthStatus {
523 let components = self.components.read();
524
525 if components.is_empty() {
526 return HealthStatus::Healthy;
527 }
528
529 let unhealthy_count = components
530 .iter()
531 .filter(|c| c.status == HealthStatus::Unhealthy)
532 .count();
533 let degraded_count = components
534 .iter()
535 .filter(|c| c.status == HealthStatus::Degraded)
536 .count();
537
538 if unhealthy_count > 0 {
539 HealthStatus::Unhealthy
540 } else if degraded_count > 0 {
541 HealthStatus::Degraded
542 } else {
543 HealthStatus::Healthy
544 }
545 }
546
547 pub fn get_report(&self) -> Vec<ComponentHealth> {
549 self.components.read().clone()
550 }
551}
552
553#[cfg(test)]
554mod tests {
555 use super::*;
556
557 #[test]
558 fn test_metrics_creation() {
559 let metrics = RequestMetrics::new().expect("Failed to create metrics");
560
561 metrics.record_request("test_route", "GET", 200, Duration::from_millis(100));
563
564 metrics.inc_active_requests();
566 metrics.dec_active_requests();
567
568 metrics.record_upstream_attempt("backend1", "test_route");
570 }
571
572 #[test]
573 fn test_audit_log() {
574 let mut entry = AuditLogEntry::new(
575 "test-correlation-id".to_string(),
576 "GET".to_string(),
577 "/api/test".to_string(),
578 );
579
580 entry.status = Some(200);
581 entry.duration_ms = 150;
582 entry.tags.push("test".to_string());
583
584 let json = serde_json::to_string(&entry).expect("Failed to serialize audit log");
587 assert!(json.contains("test-correlation-id"));
588 }
589
590 #[test]
591 fn test_health_checker() {
592 let checker = ComponentHealthTracker::new();
593
594 assert_eq!(checker.get_status(), HealthStatus::Healthy);
596
597 checker.update_component("upstream1".to_string(), HealthStatus::Healthy, None);
599 assert_eq!(checker.get_status(), HealthStatus::Healthy);
600
601 checker.update_component(
603 "agent1".to_string(),
604 HealthStatus::Degraded,
605 Some("Slow response".to_string()),
606 );
607 assert_eq!(checker.get_status(), HealthStatus::Degraded);
608
609 checker.update_component(
611 "upstream2".to_string(),
612 HealthStatus::Unhealthy,
613 Some("Connection refused".to_string()),
614 );
615 assert_eq!(checker.get_status(), HealthStatus::Unhealthy);
616 }
617}