1use anyhow::{Context, Result};
7use prometheus::{
8 register_counter_vec, register_gauge, register_histogram_vec, register_int_counter_vec,
9 register_int_gauge, register_int_gauge_vec, CounterVec, Gauge, HistogramVec, IntCounterVec,
10 IntGauge, IntGaugeVec,
11};
12use std::time::Duration;
13use tracing::{error, info};
14use tracing_subscriber::{fmt, prelude::*, EnvFilter};
15
16pub fn init_tracing() -> Result<()> {
18 let json_layer =
20 if std::env::var("SENTINEL_LOG_FORMAT").unwrap_or_else(|_| "json".to_string()) == "json" {
21 Some(
22 fmt::layer()
23 .json()
24 .with_target(true)
25 .with_thread_ids(true)
26 .with_thread_names(true)
27 .with_file(true)
28 .with_line_number(true),
29 )
30 } else {
31 None
32 };
33
34 let pretty_layer = if std::env::var("SENTINEL_LOG_FORMAT")
36 .unwrap_or_else(|_| "json".to_string())
37 == "pretty"
38 {
39 Some(
40 fmt::layer()
41 .pretty()
42 .with_target(true)
43 .with_thread_ids(true)
44 .with_thread_names(true)
45 .with_file(true)
46 .with_line_number(true),
47 )
48 } else {
49 None
50 };
51
52 let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
54
55 tracing_subscriber::registry()
56 .with(env_filter)
57 .with(json_layer)
58 .with(pretty_layer)
59 .init();
60
61 info!("Tracing initialized");
62 Ok(())
63}
64
65pub struct RequestMetrics {
67 request_duration: HistogramVec,
69 request_count: IntCounterVec,
71 active_requests: IntGauge,
73 upstream_attempts: IntCounterVec,
75 upstream_failures: IntCounterVec,
77 circuit_breaker_state: IntGaugeVec,
79 agent_latency: HistogramVec,
81 agent_timeouts: IntCounterVec,
83 blocked_requests: CounterVec,
85 request_body_size: HistogramVec,
87 response_body_size: HistogramVec,
89 tls_handshake_duration: HistogramVec,
91 connection_pool_size: IntGaugeVec,
93 connection_pool_idle: IntGaugeVec,
94 connection_pool_acquired: IntCounterVec,
95 memory_usage: IntGauge,
97 cpu_usage: Gauge,
98 open_connections: IntGauge,
99 websocket_frames_total: IntCounterVec,
101 websocket_connections_total: IntCounterVec,
102 websocket_inspection_duration: HistogramVec,
103 websocket_frame_size: HistogramVec,
104}
105
106impl RequestMetrics {
107 pub fn new() -> Result<Self> {
109 let latency_buckets = vec![
111 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
112 ];
113
114 let size_buckets = vec![
116 100.0,
117 1_000.0,
118 10_000.0,
119 100_000.0,
120 1_000_000.0,
121 10_000_000.0,
122 100_000_000.0,
123 ];
124
125 let request_duration = register_histogram_vec!(
126 "sentinel_request_duration_seconds",
127 "Request duration in seconds",
128 &["route", "method"],
129 latency_buckets.clone()
130 )
131 .context("Failed to register request_duration metric")?;
132
133 let request_count = register_int_counter_vec!(
134 "sentinel_requests_total",
135 "Total number of requests",
136 &["route", "method", "status"]
137 )
138 .context("Failed to register request_count metric")?;
139
140 let active_requests = register_int_gauge!(
141 "sentinel_active_requests",
142 "Number of currently active requests"
143 )
144 .context("Failed to register active_requests metric")?;
145
146 let upstream_attempts = register_int_counter_vec!(
147 "sentinel_upstream_attempts_total",
148 "Total upstream connection attempts",
149 &["upstream", "route"]
150 )
151 .context("Failed to register upstream_attempts metric")?;
152
153 let upstream_failures = register_int_counter_vec!(
154 "sentinel_upstream_failures_total",
155 "Total upstream connection failures",
156 &["upstream", "route", "reason"]
157 )
158 .context("Failed to register upstream_failures metric")?;
159
160 let circuit_breaker_state = register_int_gauge_vec!(
161 "sentinel_circuit_breaker_state",
162 "Circuit breaker state (0=closed, 1=open)",
163 &["component", "route"]
164 )
165 .context("Failed to register circuit_breaker_state metric")?;
166
167 let agent_latency = register_histogram_vec!(
168 "sentinel_agent_latency_seconds",
169 "Agent call latency in seconds",
170 &["agent", "event"],
171 latency_buckets.clone()
172 )
173 .context("Failed to register agent_latency metric")?;
174
175 let agent_timeouts = register_int_counter_vec!(
176 "sentinel_agent_timeouts_total",
177 "Total agent call timeouts",
178 &["agent", "event"]
179 )
180 .context("Failed to register agent_timeouts metric")?;
181
182 let blocked_requests = register_counter_vec!(
183 "sentinel_blocked_requests_total",
184 "Total blocked requests by reason",
185 &["reason"]
186 )
187 .context("Failed to register blocked_requests metric")?;
188
189 let request_body_size = register_histogram_vec!(
190 "sentinel_request_body_size_bytes",
191 "Request body size in bytes",
192 &["route"],
193 size_buckets.clone()
194 )
195 .context("Failed to register request_body_size metric")?;
196
197 let response_body_size = register_histogram_vec!(
198 "sentinel_response_body_size_bytes",
199 "Response body size in bytes",
200 &["route"],
201 size_buckets.clone()
202 )
203 .context("Failed to register response_body_size metric")?;
204
205 let tls_handshake_duration = register_histogram_vec!(
206 "sentinel_tls_handshake_duration_seconds",
207 "TLS handshake duration in seconds",
208 &["version"],
209 latency_buckets
210 )
211 .context("Failed to register tls_handshake_duration metric")?;
212
213 let connection_pool_size = register_int_gauge_vec!(
214 "sentinel_connection_pool_size",
215 "Total connections in pool",
216 &["upstream"]
217 )
218 .context("Failed to register connection_pool_size metric")?;
219
220 let connection_pool_idle = register_int_gauge_vec!(
221 "sentinel_connection_pool_idle",
222 "Idle connections in pool",
223 &["upstream"]
224 )
225 .context("Failed to register connection_pool_idle metric")?;
226
227 let connection_pool_acquired = register_int_counter_vec!(
228 "sentinel_connection_pool_acquired_total",
229 "Total connections acquired from pool",
230 &["upstream"]
231 )
232 .context("Failed to register connection_pool_acquired metric")?;
233
234 let memory_usage = register_int_gauge!(
235 "sentinel_memory_usage_bytes",
236 "Current memory usage in bytes"
237 )
238 .context("Failed to register memory_usage metric")?;
239
240 let cpu_usage =
241 register_gauge!("sentinel_cpu_usage_percent", "Current CPU usage percentage")
242 .context("Failed to register cpu_usage metric")?;
243
244 let open_connections =
245 register_int_gauge!("sentinel_open_connections", "Number of open connections")
246 .context("Failed to register open_connections metric")?;
247
248 let websocket_frames_total = register_int_counter_vec!(
250 "sentinel_websocket_frames_total",
251 "Total WebSocket frames processed",
252 &["route", "direction", "opcode", "decision"]
253 )
254 .context("Failed to register websocket_frames_total metric")?;
255
256 let websocket_connections_total = register_int_counter_vec!(
257 "sentinel_websocket_connections_total",
258 "Total WebSocket connections with inspection enabled",
259 &["route"]
260 )
261 .context("Failed to register websocket_connections_total metric")?;
262
263 let frame_latency_buckets = vec![
265 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5,
266 ];
267
268 let websocket_inspection_duration = register_histogram_vec!(
269 "sentinel_websocket_inspection_duration_seconds",
270 "WebSocket frame inspection duration in seconds",
271 &["route"],
272 frame_latency_buckets
273 )
274 .context("Failed to register websocket_inspection_duration metric")?;
275
276 let frame_size_buckets = vec![
278 64.0, 256.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0,
279 ];
280
281 let websocket_frame_size = register_histogram_vec!(
282 "sentinel_websocket_frame_size_bytes",
283 "WebSocket frame payload size in bytes",
284 &["route", "direction", "opcode"],
285 frame_size_buckets
286 )
287 .context("Failed to register websocket_frame_size metric")?;
288
289 Ok(Self {
290 request_duration,
291 request_count,
292 active_requests,
293 upstream_attempts,
294 upstream_failures,
295 circuit_breaker_state,
296 agent_latency,
297 agent_timeouts,
298 blocked_requests,
299 request_body_size,
300 response_body_size,
301 tls_handshake_duration,
302 connection_pool_size,
303 connection_pool_idle,
304 connection_pool_acquired,
305 memory_usage,
306 cpu_usage,
307 open_connections,
308 websocket_frames_total,
309 websocket_connections_total,
310 websocket_inspection_duration,
311 websocket_frame_size,
312 })
313 }
314
315 pub fn record_request(&self, route: &str, method: &str, status: u16, duration: Duration) {
317 self.request_duration
318 .with_label_values(&[route, method])
319 .observe(duration.as_secs_f64());
320
321 self.request_count
322 .with_label_values(&[route, method, &status.to_string()])
323 .inc();
324 }
325
326 pub fn inc_active_requests(&self) {
328 self.active_requests.inc();
329 }
330
331 pub fn dec_active_requests(&self) {
333 self.active_requests.dec();
334 }
335
336 pub fn record_upstream_attempt(&self, upstream: &str, route: &str) {
338 self.upstream_attempts
339 .with_label_values(&[upstream, route])
340 .inc();
341 }
342
343 pub fn record_upstream_failure(&self, upstream: &str, route: &str, reason: &str) {
345 self.upstream_failures
346 .with_label_values(&[upstream, route, reason])
347 .inc();
348 }
349
350 pub fn set_circuit_breaker_state(&self, component: &str, route: &str, is_open: bool) {
352 let state = if is_open { 1 } else { 0 };
353 self.circuit_breaker_state
354 .with_label_values(&[component, route])
355 .set(state);
356 }
357
358 pub fn record_agent_latency(&self, agent: &str, event: &str, duration: Duration) {
360 self.agent_latency
361 .with_label_values(&[agent, event])
362 .observe(duration.as_secs_f64());
363 }
364
365 pub fn record_agent_timeout(&self, agent: &str, event: &str) {
367 self.agent_timeouts.with_label_values(&[agent, event]).inc();
368 }
369
370 pub fn record_blocked_request(&self, reason: &str) {
372 self.blocked_requests.with_label_values(&[reason]).inc();
373 }
374
375 pub fn record_request_body_size(&self, route: &str, size_bytes: usize) {
377 self.request_body_size
378 .with_label_values(&[route])
379 .observe(size_bytes as f64);
380 }
381
382 pub fn record_response_body_size(&self, route: &str, size_bytes: usize) {
384 self.response_body_size
385 .with_label_values(&[route])
386 .observe(size_bytes as f64);
387 }
388
389 pub fn record_tls_handshake(&self, version: &str, duration: Duration) {
391 self.tls_handshake_duration
392 .with_label_values(&[version])
393 .observe(duration.as_secs_f64());
394 }
395
396 pub fn update_connection_pool(&self, upstream: &str, size: i64, idle: i64) {
398 self.connection_pool_size
399 .with_label_values(&[upstream])
400 .set(size);
401 self.connection_pool_idle
402 .with_label_values(&[upstream])
403 .set(idle);
404 }
405
406 pub fn record_connection_acquired(&self, upstream: &str) {
408 self.connection_pool_acquired
409 .with_label_values(&[upstream])
410 .inc();
411 }
412
413 pub fn update_system_metrics(&self) {
415 use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System};
416
417 let mut system = System::new_with_specifics(
419 RefreshKind::new()
420 .with_cpu(CpuRefreshKind::everything())
421 .with_memory(MemoryRefreshKind::everything()),
422 );
423
424 self.memory_usage.set(system.total_memory() as i64);
426
427 system.refresh_cpu_usage();
429 self.cpu_usage.set(system.global_cpu_usage() as f64);
430 }
431
432 pub fn set_open_connections(&self, count: i64) {
434 self.open_connections.set(count);
435 }
436
437 pub fn record_websocket_frame(
447 &self,
448 route: &str,
449 direction: &str,
450 opcode: &str,
451 decision: &str,
452 ) {
453 self.websocket_frames_total
454 .with_label_values(&[route, direction, opcode, decision])
455 .inc();
456 }
457
458 pub fn record_websocket_connection(&self, route: &str) {
460 self.websocket_connections_total
461 .with_label_values(&[route])
462 .inc();
463 }
464
465 pub fn record_websocket_inspection_duration(&self, route: &str, duration: Duration) {
467 self.websocket_inspection_duration
468 .with_label_values(&[route])
469 .observe(duration.as_secs_f64());
470 }
471
472 pub fn record_websocket_frame_size(
480 &self,
481 route: &str,
482 direction: &str,
483 opcode: &str,
484 size_bytes: usize,
485 ) {
486 self.websocket_frame_size
487 .with_label_values(&[route, direction, opcode])
488 .observe(size_bytes as f64);
489 }
490}
491
492#[derive(Debug, serde::Serialize)]
494pub struct AuditLogEntry {
495 pub timestamp: String,
496 pub correlation_id: String,
497 pub event_type: String,
498 pub route: Option<String>,
499 pub client_addr: Option<String>,
500 pub user_agent: Option<String>,
501 pub method: String,
502 pub path: String,
503 pub status: Option<u16>,
504 pub duration_ms: u64,
505 pub upstream: Option<String>,
506 pub waf_decision: Option<WafDecision>,
507 pub agent_decisions: Vec<AgentDecision>,
508 pub error: Option<String>,
509 pub tags: Vec<String>,
510}
511
512#[derive(Debug, serde::Serialize)]
514pub struct WafDecision {
515 pub action: String,
516 pub rule_ids: Vec<String>,
517 pub confidence: f32,
518 pub reason: String,
519 pub matched_data: Option<String>,
520}
521
522#[derive(Debug, serde::Serialize)]
524pub struct AgentDecision {
525 pub agent_name: String,
526 pub event: String,
527 pub action: String,
528 pub latency_ms: u64,
529 pub metadata: serde_json::Value,
530}
531
532impl AuditLogEntry {
533 pub fn new(correlation_id: String, method: String, path: String) -> Self {
535 Self {
536 timestamp: chrono::Utc::now().to_rfc3339(),
537 correlation_id,
538 event_type: "request".to_string(),
539 route: None,
540 client_addr: None,
541 user_agent: None,
542 method,
543 path,
544 status: None,
545 duration_ms: 0,
546 upstream: None,
547 waf_decision: None,
548 agent_decisions: vec![],
549 error: None,
550 tags: vec![],
551 }
552 }
553
554 pub fn write(&self) {
556 match serde_json::to_string(self) {
557 Ok(json) => println!("AUDIT: {}", json),
558 Err(e) => error!("Failed to serialize audit log: {}", e),
559 }
560 }
561}
562
563#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
565pub enum HealthStatus {
566 Healthy,
567 Degraded,
568 Unhealthy,
569}
570
571#[derive(Debug, Clone, serde::Serialize)]
573pub struct ComponentHealth {
574 pub name: String,
575 pub status: HealthStatus,
576 pub last_check: chrono::DateTime<chrono::Utc>,
577 pub consecutive_failures: u32,
578 pub error_message: Option<String>,
579}
580
581pub struct ComponentHealthTracker {
586 components: parking_lot::RwLock<Vec<ComponentHealth>>,
587}
588
589impl Default for ComponentHealthTracker {
590 fn default() -> Self {
591 Self::new()
592 }
593}
594
595impl ComponentHealthTracker {
596 pub fn new() -> Self {
598 Self {
599 components: parking_lot::RwLock::new(vec![]),
600 }
601 }
602
603 pub fn update_component(&self, name: String, status: HealthStatus, error: Option<String>) {
605 let mut components = self.components.write();
606
607 if let Some(component) = components.iter_mut().find(|c| c.name == name) {
608 component.status = status;
609 component.last_check = chrono::Utc::now();
610 component.error_message = error;
611
612 if status != HealthStatus::Healthy {
613 component.consecutive_failures += 1;
614 } else {
615 component.consecutive_failures = 0;
616 }
617 } else {
618 components.push(ComponentHealth {
619 name,
620 status,
621 last_check: chrono::Utc::now(),
622 consecutive_failures: if status != HealthStatus::Healthy {
623 1
624 } else {
625 0
626 },
627 error_message: error,
628 });
629 }
630 }
631
632 pub fn get_status(&self) -> HealthStatus {
634 let components = self.components.read();
635
636 if components.is_empty() {
637 return HealthStatus::Healthy;
638 }
639
640 let unhealthy_count = components
641 .iter()
642 .filter(|c| c.status == HealthStatus::Unhealthy)
643 .count();
644 let degraded_count = components
645 .iter()
646 .filter(|c| c.status == HealthStatus::Degraded)
647 .count();
648
649 if unhealthy_count > 0 {
650 HealthStatus::Unhealthy
651 } else if degraded_count > 0 {
652 HealthStatus::Degraded
653 } else {
654 HealthStatus::Healthy
655 }
656 }
657
658 pub fn get_report(&self) -> Vec<ComponentHealth> {
660 self.components.read().clone()
661 }
662}
663
664#[cfg(test)]
665mod tests {
666 use super::*;
667
668 #[test]
669 fn test_metrics_creation() {
670 let metrics = RequestMetrics::new().expect("Failed to create metrics");
671
672 metrics.record_request("test_route", "GET", 200, Duration::from_millis(100));
674
675 metrics.inc_active_requests();
677 metrics.dec_active_requests();
678
679 metrics.record_upstream_attempt("backend1", "test_route");
681 }
682
683 #[test]
684 fn test_audit_log() {
685 let mut entry = AuditLogEntry::new(
686 "test-correlation-id".to_string(),
687 "GET".to_string(),
688 "/api/test".to_string(),
689 );
690
691 entry.status = Some(200);
692 entry.duration_ms = 150;
693 entry.tags.push("test".to_string());
694
695 let json = serde_json::to_string(&entry).expect("Failed to serialize audit log");
698 assert!(json.contains("test-correlation-id"));
699 }
700
701 #[test]
702 fn test_health_checker() {
703 let checker = ComponentHealthTracker::new();
704
705 assert_eq!(checker.get_status(), HealthStatus::Healthy);
707
708 checker.update_component("upstream1".to_string(), HealthStatus::Healthy, None);
710 assert_eq!(checker.get_status(), HealthStatus::Healthy);
711
712 checker.update_component(
714 "agent1".to_string(),
715 HealthStatus::Degraded,
716 Some("Slow response".to_string()),
717 );
718 assert_eq!(checker.get_status(), HealthStatus::Degraded);
719
720 checker.update_component(
722 "upstream2".to_string(),
723 HealthStatus::Unhealthy,
724 Some("Connection refused".to_string()),
725 );
726 assert_eq!(checker.get_status(), HealthStatus::Unhealthy);
727 }
728}