1use anyhow::{Context, Result};
7use prometheus::{
8 register_counter_vec, register_gauge, register_histogram_vec,
9 register_int_counter_vec, register_int_gauge, register_int_gauge_vec, CounterVec, Gauge, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec,
10};
11use std::time::Duration;
12use tracing::{error, info};
13use tracing_subscriber::{fmt, prelude::*, EnvFilter};
14
15pub fn init_tracing() -> Result<()> {
17 let json_layer =
19 if std::env::var("SENTINEL_LOG_FORMAT").unwrap_or_else(|_| "json".to_string()) == "json" {
20 Some(
21 fmt::layer()
22 .json()
23 .with_target(true)
24 .with_thread_ids(true)
25 .with_thread_names(true)
26 .with_file(true)
27 .with_line_number(true),
28 )
29 } else {
30 None
31 };
32
33 let pretty_layer = if std::env::var("SENTINEL_LOG_FORMAT")
35 .unwrap_or_else(|_| "json".to_string())
36 == "pretty"
37 {
38 Some(
39 fmt::layer()
40 .pretty()
41 .with_target(true)
42 .with_thread_ids(true)
43 .with_thread_names(true)
44 .with_file(true)
45 .with_line_number(true),
46 )
47 } else {
48 None
49 };
50
51 let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
53
54 tracing_subscriber::registry()
55 .with(env_filter)
56 .with(json_layer)
57 .with(pretty_layer)
58 .init();
59
60 info!("Tracing initialized");
61 Ok(())
62}
63
64pub struct RequestMetrics {
66 request_duration: HistogramVec,
68 request_count: IntCounterVec,
70 active_requests: IntGauge,
72 upstream_attempts: IntCounterVec,
74 upstream_failures: IntCounterVec,
76 circuit_breaker_state: IntGaugeVec,
78 agent_latency: HistogramVec,
80 agent_timeouts: IntCounterVec,
82 blocked_requests: CounterVec,
84 request_body_size: HistogramVec,
86 response_body_size: HistogramVec,
88 tls_handshake_duration: HistogramVec,
90 connection_pool_size: IntGaugeVec,
92 connection_pool_idle: IntGaugeVec,
93 connection_pool_acquired: IntCounterVec,
94 memory_usage: IntGauge,
96 cpu_usage: Gauge,
97 open_connections: IntGauge,
98}
99
100impl RequestMetrics {
101 pub fn new() -> Result<Self> {
103 let latency_buckets = vec![
105 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
106 ];
107
108 let size_buckets = vec![
110 100.0,
111 1_000.0,
112 10_000.0,
113 100_000.0,
114 1_000_000.0,
115 10_000_000.0,
116 100_000_000.0,
117 ];
118
119 let request_duration = register_histogram_vec!(
120 "sentinel_request_duration_seconds",
121 "Request duration in seconds",
122 &["route", "method"],
123 latency_buckets.clone()
124 )
125 .context("Failed to register request_duration metric")?;
126
127 let request_count = register_int_counter_vec!(
128 "sentinel_requests_total",
129 "Total number of requests",
130 &["route", "method", "status"]
131 )
132 .context("Failed to register request_count metric")?;
133
134 let active_requests = register_int_gauge!(
135 "sentinel_active_requests",
136 "Number of currently active requests"
137 )
138 .context("Failed to register active_requests metric")?;
139
140 let upstream_attempts = register_int_counter_vec!(
141 "sentinel_upstream_attempts_total",
142 "Total upstream connection attempts",
143 &["upstream", "route"]
144 )
145 .context("Failed to register upstream_attempts metric")?;
146
147 let upstream_failures = register_int_counter_vec!(
148 "sentinel_upstream_failures_total",
149 "Total upstream connection failures",
150 &["upstream", "route", "reason"]
151 )
152 .context("Failed to register upstream_failures metric")?;
153
154 let circuit_breaker_state = register_int_gauge_vec!(
155 "sentinel_circuit_breaker_state",
156 "Circuit breaker state (0=closed, 1=open)",
157 &["component", "route"]
158 )
159 .context("Failed to register circuit_breaker_state metric")?;
160
161 let agent_latency = register_histogram_vec!(
162 "sentinel_agent_latency_seconds",
163 "Agent call latency in seconds",
164 &["agent", "event"],
165 latency_buckets.clone()
166 )
167 .context("Failed to register agent_latency metric")?;
168
169 let agent_timeouts = register_int_counter_vec!(
170 "sentinel_agent_timeouts_total",
171 "Total agent call timeouts",
172 &["agent", "event"]
173 )
174 .context("Failed to register agent_timeouts metric")?;
175
176 let blocked_requests = register_counter_vec!(
177 "sentinel_blocked_requests_total",
178 "Total blocked requests by reason",
179 &["reason"]
180 )
181 .context("Failed to register blocked_requests metric")?;
182
183 let request_body_size = register_histogram_vec!(
184 "sentinel_request_body_size_bytes",
185 "Request body size in bytes",
186 &["route"],
187 size_buckets.clone()
188 )
189 .context("Failed to register request_body_size metric")?;
190
191 let response_body_size = register_histogram_vec!(
192 "sentinel_response_body_size_bytes",
193 "Response body size in bytes",
194 &["route"],
195 size_buckets.clone()
196 )
197 .context("Failed to register response_body_size metric")?;
198
199 let tls_handshake_duration = register_histogram_vec!(
200 "sentinel_tls_handshake_duration_seconds",
201 "TLS handshake duration in seconds",
202 &["version"],
203 latency_buckets
204 )
205 .context("Failed to register tls_handshake_duration metric")?;
206
207 let connection_pool_size = register_int_gauge_vec!(
208 "sentinel_connection_pool_size",
209 "Total connections in pool",
210 &["upstream"]
211 )
212 .context("Failed to register connection_pool_size metric")?;
213
214 let connection_pool_idle = register_int_gauge_vec!(
215 "sentinel_connection_pool_idle",
216 "Idle connections in pool",
217 &["upstream"]
218 )
219 .context("Failed to register connection_pool_idle metric")?;
220
221 let connection_pool_acquired = register_int_counter_vec!(
222 "sentinel_connection_pool_acquired_total",
223 "Total connections acquired from pool",
224 &["upstream"]
225 )
226 .context("Failed to register connection_pool_acquired metric")?;
227
228 let memory_usage = register_int_gauge!(
229 "sentinel_memory_usage_bytes",
230 "Current memory usage in bytes"
231 )
232 .context("Failed to register memory_usage metric")?;
233
234 let cpu_usage =
235 register_gauge!("sentinel_cpu_usage_percent", "Current CPU usage percentage")
236 .context("Failed to register cpu_usage metric")?;
237
238 let open_connections =
239 register_int_gauge!("sentinel_open_connections", "Number of open connections")
240 .context("Failed to register open_connections metric")?;
241
242 Ok(Self {
243 request_duration,
244 request_count,
245 active_requests,
246 upstream_attempts,
247 upstream_failures,
248 circuit_breaker_state,
249 agent_latency,
250 agent_timeouts,
251 blocked_requests,
252 request_body_size,
253 response_body_size,
254 tls_handshake_duration,
255 connection_pool_size,
256 connection_pool_idle,
257 connection_pool_acquired,
258 memory_usage,
259 cpu_usage,
260 open_connections,
261 })
262 }
263
264 pub fn record_request(&self, route: &str, method: &str, status: u16, duration: Duration) {
266 self.request_duration
267 .with_label_values(&[route, method])
268 .observe(duration.as_secs_f64());
269
270 self.request_count
271 .with_label_values(&[route, method, &status.to_string()])
272 .inc();
273 }
274
275 pub fn inc_active_requests(&self) {
277 self.active_requests.inc();
278 }
279
280 pub fn dec_active_requests(&self) {
282 self.active_requests.dec();
283 }
284
285 pub fn record_upstream_attempt(&self, upstream: &str, route: &str) {
287 self.upstream_attempts
288 .with_label_values(&[upstream, route])
289 .inc();
290 }
291
292 pub fn record_upstream_failure(&self, upstream: &str, route: &str, reason: &str) {
294 self.upstream_failures
295 .with_label_values(&[upstream, route, reason])
296 .inc();
297 }
298
299 pub fn set_circuit_breaker_state(&self, component: &str, route: &str, is_open: bool) {
301 let state = if is_open { 1 } else { 0 };
302 self.circuit_breaker_state
303 .with_label_values(&[component, route])
304 .set(state);
305 }
306
307 pub fn record_agent_latency(&self, agent: &str, event: &str, duration: Duration) {
309 self.agent_latency
310 .with_label_values(&[agent, event])
311 .observe(duration.as_secs_f64());
312 }
313
314 pub fn record_agent_timeout(&self, agent: &str, event: &str) {
316 self.agent_timeouts.with_label_values(&[agent, event]).inc();
317 }
318
319 pub fn record_blocked_request(&self, reason: &str) {
321 self.blocked_requests.with_label_values(&[reason]).inc();
322 }
323
324 pub fn record_request_body_size(&self, route: &str, size_bytes: usize) {
326 self.request_body_size
327 .with_label_values(&[route])
328 .observe(size_bytes as f64);
329 }
330
331 pub fn record_response_body_size(&self, route: &str, size_bytes: usize) {
333 self.response_body_size
334 .with_label_values(&[route])
335 .observe(size_bytes as f64);
336 }
337
338 pub fn record_tls_handshake(&self, version: &str, duration: Duration) {
340 self.tls_handshake_duration
341 .with_label_values(&[version])
342 .observe(duration.as_secs_f64());
343 }
344
345 pub fn update_connection_pool(&self, upstream: &str, size: i64, idle: i64) {
347 self.connection_pool_size
348 .with_label_values(&[upstream])
349 .set(size);
350 self.connection_pool_idle
351 .with_label_values(&[upstream])
352 .set(idle);
353 }
354
355 pub fn record_connection_acquired(&self, upstream: &str) {
357 self.connection_pool_acquired
358 .with_label_values(&[upstream])
359 .inc();
360 }
361
362 pub fn update_system_metrics(&self) {
364 use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System};
365
366 let mut system = System::new_with_specifics(
368 RefreshKind::new()
369 .with_cpu(CpuRefreshKind::everything())
370 .with_memory(MemoryRefreshKind::everything()),
371 );
372
373 self.memory_usage.set(system.total_memory() as i64);
375
376 system.refresh_cpu_usage();
378 self.cpu_usage.set(system.global_cpu_usage() as f64);
379 }
380
381 pub fn set_open_connections(&self, count: i64) {
383 self.open_connections.set(count);
384 }
385}
386
387#[derive(Debug, serde::Serialize)]
389pub struct AuditLogEntry {
390 pub timestamp: String,
391 pub correlation_id: String,
392 pub event_type: String,
393 pub route: Option<String>,
394 pub client_addr: Option<String>,
395 pub user_agent: Option<String>,
396 pub method: String,
397 pub path: String,
398 pub status: Option<u16>,
399 pub duration_ms: u64,
400 pub upstream: Option<String>,
401 pub waf_decision: Option<WafDecision>,
402 pub agent_decisions: Vec<AgentDecision>,
403 pub error: Option<String>,
404 pub tags: Vec<String>,
405}
406
407#[derive(Debug, serde::Serialize)]
409pub struct WafDecision {
410 pub action: String,
411 pub rule_ids: Vec<String>,
412 pub confidence: f32,
413 pub reason: String,
414 pub matched_data: Option<String>,
415}
416
417#[derive(Debug, serde::Serialize)]
419pub struct AgentDecision {
420 pub agent_name: String,
421 pub event: String,
422 pub action: String,
423 pub latency_ms: u64,
424 pub metadata: serde_json::Value,
425}
426
427impl AuditLogEntry {
428 pub fn new(correlation_id: String, method: String, path: String) -> Self {
430 Self {
431 timestamp: chrono::Utc::now().to_rfc3339(),
432 correlation_id,
433 event_type: "request".to_string(),
434 route: None,
435 client_addr: None,
436 user_agent: None,
437 method,
438 path,
439 status: None,
440 duration_ms: 0,
441 upstream: None,
442 waf_decision: None,
443 agent_decisions: vec![],
444 error: None,
445 tags: vec![],
446 }
447 }
448
449 pub fn write(&self) {
451 match serde_json::to_string(self) {
452 Ok(json) => println!("AUDIT: {}", json),
453 Err(e) => error!("Failed to serialize audit log: {}", e),
454 }
455 }
456}
457
458#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
460pub enum HealthStatus {
461 Healthy,
462 Degraded,
463 Unhealthy,
464}
465
466#[derive(Debug, Clone, serde::Serialize)]
468pub struct ComponentHealth {
469 pub name: String,
470 pub status: HealthStatus,
471 pub last_check: chrono::DateTime<chrono::Utc>,
472 pub consecutive_failures: u32,
473 pub error_message: Option<String>,
474}
475
476pub struct HealthChecker {
478 components: parking_lot::RwLock<Vec<ComponentHealth>>,
479}
480
481impl HealthChecker {
482 pub fn new() -> Self {
484 Self {
485 components: parking_lot::RwLock::new(vec![]),
486 }
487 }
488
489 pub fn update_component(&self, name: String, status: HealthStatus, error: Option<String>) {
491 let mut components = self.components.write();
492
493 if let Some(component) = components.iter_mut().find(|c| c.name == name) {
494 component.status = status;
495 component.last_check = chrono::Utc::now();
496 component.error_message = error;
497
498 if status != HealthStatus::Healthy {
499 component.consecutive_failures += 1;
500 } else {
501 component.consecutive_failures = 0;
502 }
503 } else {
504 components.push(ComponentHealth {
505 name,
506 status,
507 last_check: chrono::Utc::now(),
508 consecutive_failures: if status != HealthStatus::Healthy {
509 1
510 } else {
511 0
512 },
513 error_message: error,
514 });
515 }
516 }
517
518 pub fn get_status(&self) -> HealthStatus {
520 let components = self.components.read();
521
522 if components.is_empty() {
523 return HealthStatus::Healthy;
524 }
525
526 let unhealthy_count = components
527 .iter()
528 .filter(|c| c.status == HealthStatus::Unhealthy)
529 .count();
530 let degraded_count = components
531 .iter()
532 .filter(|c| c.status == HealthStatus::Degraded)
533 .count();
534
535 if unhealthy_count > 0 {
536 HealthStatus::Unhealthy
537 } else if degraded_count > 0 {
538 HealthStatus::Degraded
539 } else {
540 HealthStatus::Healthy
541 }
542 }
543
544 pub fn get_report(&self) -> Vec<ComponentHealth> {
546 self.components.read().clone()
547 }
548}
549
550#[cfg(test)]
551mod tests {
552 use super::*;
553
554 #[test]
555 fn test_metrics_creation() {
556 let metrics = RequestMetrics::new().expect("Failed to create metrics");
557
558 metrics.record_request("test_route", "GET", 200, Duration::from_millis(100));
560
561 metrics.inc_active_requests();
563 metrics.dec_active_requests();
564
565 metrics.record_upstream_attempt("backend1", "test_route");
567 }
568
569 #[test]
570 fn test_audit_log() {
571 let mut entry = AuditLogEntry::new(
572 "test-correlation-id".to_string(),
573 "GET".to_string(),
574 "/api/test".to_string(),
575 );
576
577 entry.status = Some(200);
578 entry.duration_ms = 150;
579 entry.tags.push("test".to_string());
580
581 let json = serde_json::to_string(&entry).expect("Failed to serialize audit log");
584 assert!(json.contains("test-correlation-id"));
585 }
586
587 #[test]
588 fn test_health_checker() {
589 let checker = HealthChecker::new();
590
591 assert_eq!(checker.get_status(), HealthStatus::Healthy);
593
594 checker.update_component("upstream1".to_string(), HealthStatus::Healthy, None);
596 assert_eq!(checker.get_status(), HealthStatus::Healthy);
597
598 checker.update_component(
600 "agent1".to_string(),
601 HealthStatus::Degraded,
602 Some("Slow response".to_string()),
603 );
604 assert_eq!(checker.get_status(), HealthStatus::Degraded);
605
606 checker.update_component(
608 "upstream2".to_string(),
609 HealthStatus::Unhealthy,
610 Some("Connection refused".to_string()),
611 );
612 assert_eq!(checker.get_status(), HealthStatus::Unhealthy);
613 }
614}