1use anyhow::{Context, Result};
7use prometheus::{
8 register_counter_vec, register_gauge, register_histogram_vec, register_int_counter_vec,
9 register_int_gauge, register_int_gauge_vec, CounterVec, Gauge, HistogramVec, IntCounterVec,
10 IntGauge, IntGaugeVec,
11};
12use std::time::Duration;
13use tracing::{error, info};
14use tracing_subscriber::{fmt, prelude::*, EnvFilter};
15
16pub fn init_tracing() -> Result<()> {
18 let json_layer =
20 if std::env::var("SENTINEL_LOG_FORMAT").unwrap_or_else(|_| "json".to_string()) == "json" {
21 Some(
22 fmt::layer()
23 .json()
24 .with_target(true)
25 .with_thread_ids(true)
26 .with_thread_names(true)
27 .with_file(true)
28 .with_line_number(true),
29 )
30 } else {
31 None
32 };
33
34 let pretty_layer = if std::env::var("SENTINEL_LOG_FORMAT")
36 .unwrap_or_else(|_| "json".to_string())
37 == "pretty"
38 {
39 Some(
40 fmt::layer()
41 .pretty()
42 .with_target(true)
43 .with_thread_ids(true)
44 .with_thread_names(true)
45 .with_file(true)
46 .with_line_number(true),
47 )
48 } else {
49 None
50 };
51
52 let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
54
55 tracing_subscriber::registry()
56 .with(env_filter)
57 .with(json_layer)
58 .with(pretty_layer)
59 .init();
60
61 info!("Tracing initialized");
62 Ok(())
63}
64
65pub struct RequestMetrics {
67 request_duration: HistogramVec,
69 request_count: IntCounterVec,
71 active_requests: IntGauge,
73 upstream_attempts: IntCounterVec,
75 upstream_failures: IntCounterVec,
77 circuit_breaker_state: IntGaugeVec,
79 agent_latency: HistogramVec,
81 agent_timeouts: IntCounterVec,
83 blocked_requests: CounterVec,
85 request_body_size: HistogramVec,
87 response_body_size: HistogramVec,
89 tls_handshake_duration: HistogramVec,
91 connection_pool_size: IntGaugeVec,
93 connection_pool_idle: IntGaugeVec,
94 connection_pool_acquired: IntCounterVec,
95 memory_usage: IntGauge,
97 cpu_usage: Gauge,
98 open_connections: IntGauge,
99 websocket_frames_total: IntCounterVec,
101 websocket_connections_total: IntCounterVec,
102 websocket_inspection_duration: HistogramVec,
103 websocket_frame_size: HistogramVec,
104 decompression_total: IntCounterVec,
106 decompression_ratio: HistogramVec,
107 shadow_requests_total: IntCounterVec,
109 shadow_errors_total: IntCounterVec,
110 shadow_latency_seconds: HistogramVec,
111 pii_detected_total: IntCounterVec,
113}
114
115fn status_str(status: u16) -> &'static str {
118 match status {
119 200 => "200",
120 201 => "201",
121 204 => "204",
122 301 => "301",
123 302 => "302",
124 304 => "304",
125 307 => "307",
126 308 => "308",
127 400 => "400",
128 401 => "401",
129 403 => "403",
130 404 => "404",
131 405 => "405",
132 408 => "408",
133 409 => "409",
134 413 => "413",
135 429 => "429",
136 500 => "500",
137 502 => "502",
138 503 => "503",
139 504 => "504",
140 _ => Box::leak(status.to_string().into_boxed_str()),
144 }
145}
146
147impl RequestMetrics {
148 pub fn new() -> Result<Self> {
150 let latency_buckets = vec![
152 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
153 ];
154
155 let size_buckets = vec![
157 100.0,
158 1_000.0,
159 10_000.0,
160 100_000.0,
161 1_000_000.0,
162 10_000_000.0,
163 100_000_000.0,
164 ];
165
166 let request_duration = register_histogram_vec!(
167 "sentinel_request_duration_seconds",
168 "Request duration in seconds",
169 &["route", "method"],
170 latency_buckets.clone()
171 )
172 .context("Failed to register request_duration metric")?;
173
174 let request_count = register_int_counter_vec!(
175 "sentinel_requests_total",
176 "Total number of requests",
177 &["route", "method", "status"]
178 )
179 .context("Failed to register request_count metric")?;
180
181 let active_requests = register_int_gauge!(
182 "sentinel_active_requests",
183 "Number of currently active requests"
184 )
185 .context("Failed to register active_requests metric")?;
186
187 let upstream_attempts = register_int_counter_vec!(
188 "sentinel_upstream_attempts_total",
189 "Total upstream connection attempts",
190 &["upstream", "route"]
191 )
192 .context("Failed to register upstream_attempts metric")?;
193
194 let upstream_failures = register_int_counter_vec!(
195 "sentinel_upstream_failures_total",
196 "Total upstream connection failures",
197 &["upstream", "route", "reason"]
198 )
199 .context("Failed to register upstream_failures metric")?;
200
201 let circuit_breaker_state = register_int_gauge_vec!(
202 "sentinel_circuit_breaker_state",
203 "Circuit breaker state (0=closed, 1=open)",
204 &["component", "route"]
205 )
206 .context("Failed to register circuit_breaker_state metric")?;
207
208 let agent_latency = register_histogram_vec!(
209 "sentinel_agent_latency_seconds",
210 "Agent call latency in seconds",
211 &["agent", "event"],
212 latency_buckets.clone()
213 )
214 .context("Failed to register agent_latency metric")?;
215
216 let agent_timeouts = register_int_counter_vec!(
217 "sentinel_agent_timeouts_total",
218 "Total agent call timeouts",
219 &["agent", "event"]
220 )
221 .context("Failed to register agent_timeouts metric")?;
222
223 let blocked_requests = register_counter_vec!(
224 "sentinel_blocked_requests_total",
225 "Total blocked requests by reason",
226 &["reason"]
227 )
228 .context("Failed to register blocked_requests metric")?;
229
230 let request_body_size = register_histogram_vec!(
231 "sentinel_request_body_size_bytes",
232 "Request body size in bytes",
233 &["route"],
234 size_buckets.clone()
235 )
236 .context("Failed to register request_body_size metric")?;
237
238 let response_body_size = register_histogram_vec!(
239 "sentinel_response_body_size_bytes",
240 "Response body size in bytes",
241 &["route"],
242 size_buckets.clone()
243 )
244 .context("Failed to register response_body_size metric")?;
245
246 let tls_handshake_duration = register_histogram_vec!(
247 "sentinel_tls_handshake_duration_seconds",
248 "TLS handshake duration in seconds",
249 &["version"],
250 latency_buckets
251 )
252 .context("Failed to register tls_handshake_duration metric")?;
253
254 let connection_pool_size = register_int_gauge_vec!(
255 "sentinel_connection_pool_size",
256 "Total connections in pool",
257 &["upstream"]
258 )
259 .context("Failed to register connection_pool_size metric")?;
260
261 let connection_pool_idle = register_int_gauge_vec!(
262 "sentinel_connection_pool_idle",
263 "Idle connections in pool",
264 &["upstream"]
265 )
266 .context("Failed to register connection_pool_idle metric")?;
267
268 let connection_pool_acquired = register_int_counter_vec!(
269 "sentinel_connection_pool_acquired_total",
270 "Total connections acquired from pool",
271 &["upstream"]
272 )
273 .context("Failed to register connection_pool_acquired metric")?;
274
275 let memory_usage = register_int_gauge!(
276 "sentinel_memory_usage_bytes",
277 "Current memory usage in bytes"
278 )
279 .context("Failed to register memory_usage metric")?;
280
281 let cpu_usage =
282 register_gauge!("sentinel_cpu_usage_percent", "Current CPU usage percentage")
283 .context("Failed to register cpu_usage metric")?;
284
285 let open_connections =
286 register_int_gauge!("sentinel_open_connections", "Number of open connections")
287 .context("Failed to register open_connections metric")?;
288
289 let websocket_frames_total = register_int_counter_vec!(
291 "sentinel_websocket_frames_total",
292 "Total WebSocket frames processed",
293 &["route", "direction", "opcode", "decision"]
294 )
295 .context("Failed to register websocket_frames_total metric")?;
296
297 let websocket_connections_total = register_int_counter_vec!(
298 "sentinel_websocket_connections_total",
299 "Total WebSocket connections with inspection enabled",
300 &["route"]
301 )
302 .context("Failed to register websocket_connections_total metric")?;
303
304 let frame_latency_buckets = vec![
306 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5,
307 ];
308
309 let websocket_inspection_duration = register_histogram_vec!(
310 "sentinel_websocket_inspection_duration_seconds",
311 "WebSocket frame inspection duration in seconds",
312 &["route"],
313 frame_latency_buckets
314 )
315 .context("Failed to register websocket_inspection_duration metric")?;
316
317 let frame_size_buckets = vec![
319 64.0, 256.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0,
320 ];
321
322 let websocket_frame_size = register_histogram_vec!(
323 "sentinel_websocket_frame_size_bytes",
324 "WebSocket frame payload size in bytes",
325 &["route", "direction", "opcode"],
326 frame_size_buckets
327 )
328 .context("Failed to register websocket_frame_size metric")?;
329
330 let decompression_total = register_int_counter_vec!(
332 "sentinel_decompression_total",
333 "Total body decompression operations",
334 &["encoding", "result"]
335 )
336 .context("Failed to register decompression_total metric")?;
337
338 let ratio_buckets = vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0];
340
341 let decompression_ratio = register_histogram_vec!(
342 "sentinel_decompression_ratio",
343 "Decompression ratio (decompressed_size / compressed_size)",
344 &["encoding"],
345 ratio_buckets
346 )
347 .context("Failed to register decompression_ratio metric")?;
348
349 let shadow_requests_total = register_int_counter_vec!(
351 "sentinel_shadow_requests_total",
352 "Total shadow requests sent to mirror upstream",
353 &["route", "upstream", "result"]
354 )
355 .context("Failed to register shadow_requests_total metric")?;
356
357 let shadow_errors_total = register_int_counter_vec!(
358 "sentinel_shadow_errors_total",
359 "Total shadow request errors",
360 &["route", "upstream", "error_type"]
361 )
362 .context("Failed to register shadow_errors_total metric")?;
363
364 let shadow_latency_buckets = vec![
366 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
367 ];
368
369 let shadow_latency_seconds = register_histogram_vec!(
370 "sentinel_shadow_latency_seconds",
371 "Shadow request latency in seconds",
372 &["route", "upstream"],
373 shadow_latency_buckets
374 )
375 .context("Failed to register shadow_latency_seconds metric")?;
376
377 let pii_detected_total = register_int_counter_vec!(
378 "sentinel_pii_detected_total",
379 "Total PII detections in inference responses",
380 &["route", "category"]
381 )
382 .context("Failed to register pii_detected_total metric")?;
383
384 Ok(Self {
385 request_duration,
386 request_count,
387 active_requests,
388 upstream_attempts,
389 upstream_failures,
390 circuit_breaker_state,
391 agent_latency,
392 agent_timeouts,
393 blocked_requests,
394 request_body_size,
395 response_body_size,
396 tls_handshake_duration,
397 connection_pool_size,
398 connection_pool_idle,
399 connection_pool_acquired,
400 memory_usage,
401 cpu_usage,
402 open_connections,
403 websocket_frames_total,
404 websocket_connections_total,
405 websocket_inspection_duration,
406 websocket_frame_size,
407 decompression_total,
408 decompression_ratio,
409 shadow_requests_total,
410 shadow_errors_total,
411 shadow_latency_seconds,
412 pii_detected_total,
413 })
414 }
415
416 pub fn record_request(&self, route: &str, method: &str, status: u16, duration: Duration) {
418 self.request_duration
419 .with_label_values(&[route, method])
420 .observe(duration.as_secs_f64());
421
422 self.request_count
423 .with_label_values(&[route, method, status_str(status)])
424 .inc();
425 }
426
427 pub fn inc_active_requests(&self) {
429 self.active_requests.inc();
430 }
431
432 pub fn dec_active_requests(&self) {
434 self.active_requests.dec();
435 }
436
437 pub fn record_upstream_attempt(&self, upstream: &str, route: &str) {
439 self.upstream_attempts
440 .with_label_values(&[upstream, route])
441 .inc();
442 }
443
444 pub fn record_upstream_failure(&self, upstream: &str, route: &str, reason: &str) {
446 self.upstream_failures
447 .with_label_values(&[upstream, route, reason])
448 .inc();
449 }
450
451 pub fn set_circuit_breaker_state(&self, component: &str, route: &str, is_open: bool) {
453 let state = if is_open { 1 } else { 0 };
454 self.circuit_breaker_state
455 .with_label_values(&[component, route])
456 .set(state);
457 }
458
459 pub fn record_agent_latency(&self, agent: &str, event: &str, duration: Duration) {
461 self.agent_latency
462 .with_label_values(&[agent, event])
463 .observe(duration.as_secs_f64());
464 }
465
466 pub fn record_agent_timeout(&self, agent: &str, event: &str) {
468 self.agent_timeouts.with_label_values(&[agent, event]).inc();
469 }
470
471 pub fn record_blocked_request(&self, reason: &str) {
473 self.blocked_requests.with_label_values(&[reason]).inc();
474 }
475
476 pub fn record_pii_detected(&self, route: &str, category: &str) {
478 self.pii_detected_total
479 .with_label_values(&[route, category])
480 .inc();
481 }
482
483 pub fn record_request_body_size(&self, route: &str, size_bytes: usize) {
485 self.request_body_size
486 .with_label_values(&[route])
487 .observe(size_bytes as f64);
488 }
489
490 pub fn record_response_body_size(&self, route: &str, size_bytes: usize) {
492 self.response_body_size
493 .with_label_values(&[route])
494 .observe(size_bytes as f64);
495 }
496
497 pub fn record_tls_handshake(&self, version: &str, duration: Duration) {
499 self.tls_handshake_duration
500 .with_label_values(&[version])
501 .observe(duration.as_secs_f64());
502 }
503
504 pub fn update_connection_pool(&self, upstream: &str, size: i64, idle: i64) {
506 self.connection_pool_size
507 .with_label_values(&[upstream])
508 .set(size);
509 self.connection_pool_idle
510 .with_label_values(&[upstream])
511 .set(idle);
512 }
513
514 pub fn record_connection_acquired(&self, upstream: &str) {
516 self.connection_pool_acquired
517 .with_label_values(&[upstream])
518 .inc();
519 }
520
521 pub fn update_system_metrics(&self) {
523 use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System};
524
525 let mut system = System::new_with_specifics(
527 RefreshKind::nothing()
528 .with_cpu(CpuRefreshKind::everything())
529 .with_memory(MemoryRefreshKind::everything()),
530 );
531
532 self.memory_usage.set(system.total_memory() as i64);
534
535 system.refresh_cpu_usage();
537 self.cpu_usage.set(system.global_cpu_usage() as f64);
538 }
539
540 pub fn set_open_connections(&self, count: i64) {
542 self.open_connections.set(count);
543 }
544
545 pub fn record_websocket_frame(
555 &self,
556 route: &str,
557 direction: &str,
558 opcode: &str,
559 decision: &str,
560 ) {
561 self.websocket_frames_total
562 .with_label_values(&[route, direction, opcode, decision])
563 .inc();
564 }
565
566 pub fn record_websocket_connection(&self, route: &str) {
568 self.websocket_connections_total
569 .with_label_values(&[route])
570 .inc();
571 }
572
573 pub fn record_websocket_inspection_duration(&self, route: &str, duration: Duration) {
575 self.websocket_inspection_duration
576 .with_label_values(&[route])
577 .observe(duration.as_secs_f64());
578 }
579
580 pub fn record_websocket_frame_size(
588 &self,
589 route: &str,
590 direction: &str,
591 opcode: &str,
592 size_bytes: usize,
593 ) {
594 self.websocket_frame_size
595 .with_label_values(&[route, direction, opcode])
596 .observe(size_bytes as f64);
597 }
598
599 pub fn record_decompression_success(&self, encoding: &str, ratio: f64) {
607 self.decompression_total
608 .with_label_values(&[encoding, "success"])
609 .inc();
610 self.decompression_ratio
611 .with_label_values(&[encoding])
612 .observe(ratio);
613 }
614
615 pub fn record_decompression_failure(&self, encoding: &str, reason: &str) {
621 self.decompression_total
622 .with_label_values(&[encoding, reason])
623 .inc();
624 }
625
626 pub fn record_shadow_success(&self, route: &str, upstream: &str, duration: Duration) {
633 self.shadow_requests_total
634 .with_label_values(&[route, upstream, "success"])
635 .inc();
636 self.shadow_latency_seconds
637 .with_label_values(&[route, upstream])
638 .observe(duration.as_secs_f64());
639 }
640
641 pub fn record_shadow_error(&self, route: &str, upstream: &str, error_type: &str) {
648 self.shadow_requests_total
649 .with_label_values(&[route, upstream, "error"])
650 .inc();
651 self.shadow_errors_total
652 .with_label_values(&[route, upstream, error_type])
653 .inc();
654 }
655
656 pub fn record_shadow_timeout(&self, route: &str, upstream: &str, duration: Duration) {
663 self.shadow_requests_total
664 .with_label_values(&[route, upstream, "timeout"])
665 .inc();
666 self.shadow_errors_total
667 .with_label_values(&[route, upstream, "timeout"])
668 .inc();
669 self.shadow_latency_seconds
670 .with_label_values(&[route, upstream])
671 .observe(duration.as_secs_f64());
672 }
673}
674
675#[derive(Debug, serde::Serialize)]
677pub struct AuditLogEntry {
678 pub timestamp: String,
679 pub correlation_id: String,
680 pub event_type: String,
681 pub route: Option<String>,
682 pub client_addr: Option<String>,
683 pub user_agent: Option<String>,
684 pub method: String,
685 pub path: String,
686 pub status: Option<u16>,
687 pub duration_ms: u64,
688 pub upstream: Option<String>,
689 pub waf_decision: Option<WafDecision>,
690 pub agent_decisions: Vec<AgentDecision>,
691 pub error: Option<String>,
692 pub tags: Vec<String>,
693}
694
695#[derive(Debug, serde::Serialize)]
697pub struct WafDecision {
698 pub action: String,
699 pub rule_ids: Vec<String>,
700 pub confidence: f32,
701 pub reason: String,
702 pub matched_data: Option<String>,
703}
704
705#[derive(Debug, serde::Serialize)]
707pub struct AgentDecision {
708 pub agent_name: String,
709 pub event: String,
710 pub action: String,
711 pub latency_ms: u64,
712 pub metadata: serde_json::Value,
713}
714
715impl AuditLogEntry {
716 pub fn new(correlation_id: String, method: String, path: String) -> Self {
718 Self {
719 timestamp: chrono::Utc::now().to_rfc3339(),
720 correlation_id,
721 event_type: "request".to_string(),
722 route: None,
723 client_addr: None,
724 user_agent: None,
725 method,
726 path,
727 status: None,
728 duration_ms: 0,
729 upstream: None,
730 waf_decision: None,
731 agent_decisions: vec![],
732 error: None,
733 tags: vec![],
734 }
735 }
736
737 pub fn write(&self) {
739 match serde_json::to_string(self) {
740 Ok(json) => println!("AUDIT: {}", json),
741 Err(e) => error!("Failed to serialize audit log: {}", e),
742 }
743 }
744}
745
746#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
748pub enum HealthStatus {
749 Healthy,
750 Degraded,
751 Unhealthy,
752}
753
754#[derive(Debug, Clone, serde::Serialize)]
756pub struct ComponentHealth {
757 pub name: String,
758 pub status: HealthStatus,
759 pub last_check: chrono::DateTime<chrono::Utc>,
760 pub consecutive_failures: u32,
761 pub error_message: Option<String>,
762}
763
764pub struct ComponentHealthTracker {
769 components: parking_lot::RwLock<Vec<ComponentHealth>>,
770}
771
772impl Default for ComponentHealthTracker {
773 fn default() -> Self {
774 Self::new()
775 }
776}
777
778impl ComponentHealthTracker {
779 pub fn new() -> Self {
781 Self {
782 components: parking_lot::RwLock::new(vec![]),
783 }
784 }
785
786 pub fn update_component(&self, name: String, status: HealthStatus, error: Option<String>) {
788 let mut components = self.components.write();
789
790 if let Some(component) = components.iter_mut().find(|c| c.name == name) {
791 component.status = status;
792 component.last_check = chrono::Utc::now();
793 component.error_message = error;
794
795 if status != HealthStatus::Healthy {
796 component.consecutive_failures += 1;
797 } else {
798 component.consecutive_failures = 0;
799 }
800 } else {
801 components.push(ComponentHealth {
802 name,
803 status,
804 last_check: chrono::Utc::now(),
805 consecutive_failures: if status != HealthStatus::Healthy {
806 1
807 } else {
808 0
809 },
810 error_message: error,
811 });
812 }
813 }
814
815 pub fn get_status(&self) -> HealthStatus {
817 let components = self.components.read();
818
819 if components.is_empty() {
820 return HealthStatus::Healthy;
821 }
822
823 let unhealthy_count = components
824 .iter()
825 .filter(|c| c.status == HealthStatus::Unhealthy)
826 .count();
827 let degraded_count = components
828 .iter()
829 .filter(|c| c.status == HealthStatus::Degraded)
830 .count();
831
832 if unhealthy_count > 0 {
833 HealthStatus::Unhealthy
834 } else if degraded_count > 0 {
835 HealthStatus::Degraded
836 } else {
837 HealthStatus::Healthy
838 }
839 }
840
841 pub fn get_report(&self) -> Vec<ComponentHealth> {
843 self.components.read().clone()
844 }
845}
846
847#[cfg(test)]
848mod tests {
849 use super::*;
850
851 #[test]
852 fn test_metrics_creation() {
853 let metrics = RequestMetrics::new().expect("Failed to create metrics");
854
855 metrics.record_request("test_route", "GET", 200, Duration::from_millis(100));
857
858 metrics.inc_active_requests();
860 metrics.dec_active_requests();
861
862 metrics.record_upstream_attempt("backend1", "test_route");
864 }
865
866 #[test]
867 fn test_audit_log() {
868 let mut entry = AuditLogEntry::new(
869 "test-correlation-id".to_string(),
870 "GET".to_string(),
871 "/api/test".to_string(),
872 );
873
874 entry.status = Some(200);
875 entry.duration_ms = 150;
876 entry.tags.push("test".to_string());
877
878 let json = serde_json::to_string(&entry).expect("Failed to serialize audit log");
881 assert!(json.contains("test-correlation-id"));
882 }
883
884 #[test]
885 fn test_health_checker() {
886 let checker = ComponentHealthTracker::new();
887
888 assert_eq!(checker.get_status(), HealthStatus::Healthy);
890
891 checker.update_component("upstream1".to_string(), HealthStatus::Healthy, None);
893 assert_eq!(checker.get_status(), HealthStatus::Healthy);
894
895 checker.update_component(
897 "agent1".to_string(),
898 HealthStatus::Degraded,
899 Some("Slow response".to_string()),
900 );
901 assert_eq!(checker.get_status(), HealthStatus::Degraded);
902
903 checker.update_component(
905 "upstream2".to_string(),
906 HealthStatus::Unhealthy,
907 Some("Connection refused".to_string()),
908 );
909 assert_eq!(checker.get_status(), HealthStatus::Unhealthy);
910 }
911}