1use anyhow::{Context, Result};
7use prometheus::{
8 register_counter_vec, register_gauge, register_histogram_vec, register_int_counter_vec,
9 register_int_gauge, register_int_gauge_vec, CounterVec, Gauge, HistogramVec, IntCounterVec,
10 IntGauge, IntGaugeVec,
11};
12use std::time::Duration;
13use tracing::{error, info};
14use tracing_subscriber::{fmt, prelude::*, EnvFilter};
15
16pub fn init_tracing() -> Result<()> {
18 let json_layer =
20 if std::env::var("SENTINEL_LOG_FORMAT").unwrap_or_else(|_| "json".to_string()) == "json" {
21 Some(
22 fmt::layer()
23 .json()
24 .with_target(true)
25 .with_thread_ids(true)
26 .with_thread_names(true)
27 .with_file(true)
28 .with_line_number(true),
29 )
30 } else {
31 None
32 };
33
34 let pretty_layer = if std::env::var("SENTINEL_LOG_FORMAT")
36 .unwrap_or_else(|_| "json".to_string())
37 == "pretty"
38 {
39 Some(
40 fmt::layer()
41 .pretty()
42 .with_target(true)
43 .with_thread_ids(true)
44 .with_thread_names(true)
45 .with_file(true)
46 .with_line_number(true),
47 )
48 } else {
49 None
50 };
51
52 let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
54
55 tracing_subscriber::registry()
56 .with(env_filter)
57 .with(json_layer)
58 .with(pretty_layer)
59 .init();
60
61 info!("Tracing initialized");
62 Ok(())
63}
64
65pub struct RequestMetrics {
67 request_duration: HistogramVec,
69 request_count: IntCounterVec,
71 active_requests: IntGauge,
73 upstream_attempts: IntCounterVec,
75 upstream_failures: IntCounterVec,
77 circuit_breaker_state: IntGaugeVec,
79 agent_latency: HistogramVec,
81 agent_timeouts: IntCounterVec,
83 blocked_requests: CounterVec,
85 request_body_size: HistogramVec,
87 response_body_size: HistogramVec,
89 tls_handshake_duration: HistogramVec,
91 connection_pool_size: IntGaugeVec,
93 connection_pool_idle: IntGaugeVec,
94 connection_pool_acquired: IntCounterVec,
95 memory_usage: IntGauge,
97 cpu_usage: Gauge,
98 open_connections: IntGauge,
99 websocket_frames_total: IntCounterVec,
101 websocket_connections_total: IntCounterVec,
102 websocket_inspection_duration: HistogramVec,
103 websocket_frame_size: HistogramVec,
104 decompression_total: IntCounterVec,
106 decompression_ratio: HistogramVec,
107 shadow_requests_total: IntCounterVec,
109 shadow_errors_total: IntCounterVec,
110 shadow_latency_seconds: HistogramVec,
111 pii_detected_total: IntCounterVec,
113}
114
115impl RequestMetrics {
116 pub fn new() -> Result<Self> {
118 let latency_buckets = vec![
120 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
121 ];
122
123 let size_buckets = vec![
125 100.0,
126 1_000.0,
127 10_000.0,
128 100_000.0,
129 1_000_000.0,
130 10_000_000.0,
131 100_000_000.0,
132 ];
133
134 let request_duration = register_histogram_vec!(
135 "sentinel_request_duration_seconds",
136 "Request duration in seconds",
137 &["route", "method"],
138 latency_buckets.clone()
139 )
140 .context("Failed to register request_duration metric")?;
141
142 let request_count = register_int_counter_vec!(
143 "sentinel_requests_total",
144 "Total number of requests",
145 &["route", "method", "status"]
146 )
147 .context("Failed to register request_count metric")?;
148
149 let active_requests = register_int_gauge!(
150 "sentinel_active_requests",
151 "Number of currently active requests"
152 )
153 .context("Failed to register active_requests metric")?;
154
155 let upstream_attempts = register_int_counter_vec!(
156 "sentinel_upstream_attempts_total",
157 "Total upstream connection attempts",
158 &["upstream", "route"]
159 )
160 .context("Failed to register upstream_attempts metric")?;
161
162 let upstream_failures = register_int_counter_vec!(
163 "sentinel_upstream_failures_total",
164 "Total upstream connection failures",
165 &["upstream", "route", "reason"]
166 )
167 .context("Failed to register upstream_failures metric")?;
168
169 let circuit_breaker_state = register_int_gauge_vec!(
170 "sentinel_circuit_breaker_state",
171 "Circuit breaker state (0=closed, 1=open)",
172 &["component", "route"]
173 )
174 .context("Failed to register circuit_breaker_state metric")?;
175
176 let agent_latency = register_histogram_vec!(
177 "sentinel_agent_latency_seconds",
178 "Agent call latency in seconds",
179 &["agent", "event"],
180 latency_buckets.clone()
181 )
182 .context("Failed to register agent_latency metric")?;
183
184 let agent_timeouts = register_int_counter_vec!(
185 "sentinel_agent_timeouts_total",
186 "Total agent call timeouts",
187 &["agent", "event"]
188 )
189 .context("Failed to register agent_timeouts metric")?;
190
191 let blocked_requests = register_counter_vec!(
192 "sentinel_blocked_requests_total",
193 "Total blocked requests by reason",
194 &["reason"]
195 )
196 .context("Failed to register blocked_requests metric")?;
197
198 let request_body_size = register_histogram_vec!(
199 "sentinel_request_body_size_bytes",
200 "Request body size in bytes",
201 &["route"],
202 size_buckets.clone()
203 )
204 .context("Failed to register request_body_size metric")?;
205
206 let response_body_size = register_histogram_vec!(
207 "sentinel_response_body_size_bytes",
208 "Response body size in bytes",
209 &["route"],
210 size_buckets.clone()
211 )
212 .context("Failed to register response_body_size metric")?;
213
214 let tls_handshake_duration = register_histogram_vec!(
215 "sentinel_tls_handshake_duration_seconds",
216 "TLS handshake duration in seconds",
217 &["version"],
218 latency_buckets
219 )
220 .context("Failed to register tls_handshake_duration metric")?;
221
222 let connection_pool_size = register_int_gauge_vec!(
223 "sentinel_connection_pool_size",
224 "Total connections in pool",
225 &["upstream"]
226 )
227 .context("Failed to register connection_pool_size metric")?;
228
229 let connection_pool_idle = register_int_gauge_vec!(
230 "sentinel_connection_pool_idle",
231 "Idle connections in pool",
232 &["upstream"]
233 )
234 .context("Failed to register connection_pool_idle metric")?;
235
236 let connection_pool_acquired = register_int_counter_vec!(
237 "sentinel_connection_pool_acquired_total",
238 "Total connections acquired from pool",
239 &["upstream"]
240 )
241 .context("Failed to register connection_pool_acquired metric")?;
242
243 let memory_usage = register_int_gauge!(
244 "sentinel_memory_usage_bytes",
245 "Current memory usage in bytes"
246 )
247 .context("Failed to register memory_usage metric")?;
248
249 let cpu_usage =
250 register_gauge!("sentinel_cpu_usage_percent", "Current CPU usage percentage")
251 .context("Failed to register cpu_usage metric")?;
252
253 let open_connections =
254 register_int_gauge!("sentinel_open_connections", "Number of open connections")
255 .context("Failed to register open_connections metric")?;
256
257 let websocket_frames_total = register_int_counter_vec!(
259 "sentinel_websocket_frames_total",
260 "Total WebSocket frames processed",
261 &["route", "direction", "opcode", "decision"]
262 )
263 .context("Failed to register websocket_frames_total metric")?;
264
265 let websocket_connections_total = register_int_counter_vec!(
266 "sentinel_websocket_connections_total",
267 "Total WebSocket connections with inspection enabled",
268 &["route"]
269 )
270 .context("Failed to register websocket_connections_total metric")?;
271
272 let frame_latency_buckets = vec![
274 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5,
275 ];
276
277 let websocket_inspection_duration = register_histogram_vec!(
278 "sentinel_websocket_inspection_duration_seconds",
279 "WebSocket frame inspection duration in seconds",
280 &["route"],
281 frame_latency_buckets
282 )
283 .context("Failed to register websocket_inspection_duration metric")?;
284
285 let frame_size_buckets = vec![
287 64.0, 256.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0,
288 ];
289
290 let websocket_frame_size = register_histogram_vec!(
291 "sentinel_websocket_frame_size_bytes",
292 "WebSocket frame payload size in bytes",
293 &["route", "direction", "opcode"],
294 frame_size_buckets
295 )
296 .context("Failed to register websocket_frame_size metric")?;
297
298 let decompression_total = register_int_counter_vec!(
300 "sentinel_decompression_total",
301 "Total body decompression operations",
302 &["encoding", "result"]
303 )
304 .context("Failed to register decompression_total metric")?;
305
306 let ratio_buckets = vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0];
308
309 let decompression_ratio = register_histogram_vec!(
310 "sentinel_decompression_ratio",
311 "Decompression ratio (decompressed_size / compressed_size)",
312 &["encoding"],
313 ratio_buckets
314 )
315 .context("Failed to register decompression_ratio metric")?;
316
317 let shadow_requests_total = register_int_counter_vec!(
319 "sentinel_shadow_requests_total",
320 "Total shadow requests sent to mirror upstream",
321 &["route", "upstream", "result"]
322 )
323 .context("Failed to register shadow_requests_total metric")?;
324
325 let shadow_errors_total = register_int_counter_vec!(
326 "sentinel_shadow_errors_total",
327 "Total shadow request errors",
328 &["route", "upstream", "error_type"]
329 )
330 .context("Failed to register shadow_errors_total metric")?;
331
332 let shadow_latency_buckets = vec![
334 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
335 ];
336
337 let shadow_latency_seconds = register_histogram_vec!(
338 "sentinel_shadow_latency_seconds",
339 "Shadow request latency in seconds",
340 &["route", "upstream"],
341 shadow_latency_buckets
342 )
343 .context("Failed to register shadow_latency_seconds metric")?;
344
345 let pii_detected_total = register_int_counter_vec!(
346 "sentinel_pii_detected_total",
347 "Total PII detections in inference responses",
348 &["route", "category"]
349 )
350 .context("Failed to register pii_detected_total metric")?;
351
352 Ok(Self {
353 request_duration,
354 request_count,
355 active_requests,
356 upstream_attempts,
357 upstream_failures,
358 circuit_breaker_state,
359 agent_latency,
360 agent_timeouts,
361 blocked_requests,
362 request_body_size,
363 response_body_size,
364 tls_handshake_duration,
365 connection_pool_size,
366 connection_pool_idle,
367 connection_pool_acquired,
368 memory_usage,
369 cpu_usage,
370 open_connections,
371 websocket_frames_total,
372 websocket_connections_total,
373 websocket_inspection_duration,
374 websocket_frame_size,
375 decompression_total,
376 decompression_ratio,
377 shadow_requests_total,
378 shadow_errors_total,
379 shadow_latency_seconds,
380 pii_detected_total,
381 })
382 }
383
384 pub fn record_request(&self, route: &str, method: &str, status: u16, duration: Duration) {
386 self.request_duration
387 .with_label_values(&[route, method])
388 .observe(duration.as_secs_f64());
389
390 self.request_count
391 .with_label_values(&[route, method, &status.to_string()])
392 .inc();
393 }
394
395 pub fn inc_active_requests(&self) {
397 self.active_requests.inc();
398 }
399
400 pub fn dec_active_requests(&self) {
402 self.active_requests.dec();
403 }
404
405 pub fn record_upstream_attempt(&self, upstream: &str, route: &str) {
407 self.upstream_attempts
408 .with_label_values(&[upstream, route])
409 .inc();
410 }
411
412 pub fn record_upstream_failure(&self, upstream: &str, route: &str, reason: &str) {
414 self.upstream_failures
415 .with_label_values(&[upstream, route, reason])
416 .inc();
417 }
418
419 pub fn set_circuit_breaker_state(&self, component: &str, route: &str, is_open: bool) {
421 let state = if is_open { 1 } else { 0 };
422 self.circuit_breaker_state
423 .with_label_values(&[component, route])
424 .set(state);
425 }
426
427 pub fn record_agent_latency(&self, agent: &str, event: &str, duration: Duration) {
429 self.agent_latency
430 .with_label_values(&[agent, event])
431 .observe(duration.as_secs_f64());
432 }
433
434 pub fn record_agent_timeout(&self, agent: &str, event: &str) {
436 self.agent_timeouts.with_label_values(&[agent, event]).inc();
437 }
438
439 pub fn record_blocked_request(&self, reason: &str) {
441 self.blocked_requests.with_label_values(&[reason]).inc();
442 }
443
444 pub fn record_pii_detected(&self, route: &str, category: &str) {
446 self.pii_detected_total
447 .with_label_values(&[route, category])
448 .inc();
449 }
450
451 pub fn record_request_body_size(&self, route: &str, size_bytes: usize) {
453 self.request_body_size
454 .with_label_values(&[route])
455 .observe(size_bytes as f64);
456 }
457
458 pub fn record_response_body_size(&self, route: &str, size_bytes: usize) {
460 self.response_body_size
461 .with_label_values(&[route])
462 .observe(size_bytes as f64);
463 }
464
465 pub fn record_tls_handshake(&self, version: &str, duration: Duration) {
467 self.tls_handshake_duration
468 .with_label_values(&[version])
469 .observe(duration.as_secs_f64());
470 }
471
472 pub fn update_connection_pool(&self, upstream: &str, size: i64, idle: i64) {
474 self.connection_pool_size
475 .with_label_values(&[upstream])
476 .set(size);
477 self.connection_pool_idle
478 .with_label_values(&[upstream])
479 .set(idle);
480 }
481
482 pub fn record_connection_acquired(&self, upstream: &str) {
484 self.connection_pool_acquired
485 .with_label_values(&[upstream])
486 .inc();
487 }
488
489 pub fn update_system_metrics(&self) {
491 use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System};
492
493 let mut system = System::new_with_specifics(
495 RefreshKind::new()
496 .with_cpu(CpuRefreshKind::everything())
497 .with_memory(MemoryRefreshKind::everything()),
498 );
499
500 self.memory_usage.set(system.total_memory() as i64);
502
503 system.refresh_cpu_usage();
505 self.cpu_usage.set(system.global_cpu_usage() as f64);
506 }
507
508 pub fn set_open_connections(&self, count: i64) {
510 self.open_connections.set(count);
511 }
512
513 pub fn record_websocket_frame(
523 &self,
524 route: &str,
525 direction: &str,
526 opcode: &str,
527 decision: &str,
528 ) {
529 self.websocket_frames_total
530 .with_label_values(&[route, direction, opcode, decision])
531 .inc();
532 }
533
534 pub fn record_websocket_connection(&self, route: &str) {
536 self.websocket_connections_total
537 .with_label_values(&[route])
538 .inc();
539 }
540
541 pub fn record_websocket_inspection_duration(&self, route: &str, duration: Duration) {
543 self.websocket_inspection_duration
544 .with_label_values(&[route])
545 .observe(duration.as_secs_f64());
546 }
547
548 pub fn record_websocket_frame_size(
556 &self,
557 route: &str,
558 direction: &str,
559 opcode: &str,
560 size_bytes: usize,
561 ) {
562 self.websocket_frame_size
563 .with_label_values(&[route, direction, opcode])
564 .observe(size_bytes as f64);
565 }
566
567 pub fn record_decompression_success(&self, encoding: &str, ratio: f64) {
575 self.decompression_total
576 .with_label_values(&[encoding, "success"])
577 .inc();
578 self.decompression_ratio
579 .with_label_values(&[encoding])
580 .observe(ratio);
581 }
582
583 pub fn record_decompression_failure(&self, encoding: &str, reason: &str) {
589 self.decompression_total
590 .with_label_values(&[encoding, reason])
591 .inc();
592 }
593
594 pub fn record_shadow_success(&self, route: &str, upstream: &str, duration: Duration) {
601 self.shadow_requests_total
602 .with_label_values(&[route, upstream, "success"])
603 .inc();
604 self.shadow_latency_seconds
605 .with_label_values(&[route, upstream])
606 .observe(duration.as_secs_f64());
607 }
608
609 pub fn record_shadow_error(&self, route: &str, upstream: &str, error_type: &str) {
616 self.shadow_requests_total
617 .with_label_values(&[route, upstream, "error"])
618 .inc();
619 self.shadow_errors_total
620 .with_label_values(&[route, upstream, error_type])
621 .inc();
622 }
623
624 pub fn record_shadow_timeout(&self, route: &str, upstream: &str, duration: Duration) {
631 self.shadow_requests_total
632 .with_label_values(&[route, upstream, "timeout"])
633 .inc();
634 self.shadow_errors_total
635 .with_label_values(&[route, upstream, "timeout"])
636 .inc();
637 self.shadow_latency_seconds
638 .with_label_values(&[route, upstream])
639 .observe(duration.as_secs_f64());
640 }
641}
642
643#[derive(Debug, serde::Serialize)]
645pub struct AuditLogEntry {
646 pub timestamp: String,
647 pub correlation_id: String,
648 pub event_type: String,
649 pub route: Option<String>,
650 pub client_addr: Option<String>,
651 pub user_agent: Option<String>,
652 pub method: String,
653 pub path: String,
654 pub status: Option<u16>,
655 pub duration_ms: u64,
656 pub upstream: Option<String>,
657 pub waf_decision: Option<WafDecision>,
658 pub agent_decisions: Vec<AgentDecision>,
659 pub error: Option<String>,
660 pub tags: Vec<String>,
661}
662
663#[derive(Debug, serde::Serialize)]
665pub struct WafDecision {
666 pub action: String,
667 pub rule_ids: Vec<String>,
668 pub confidence: f32,
669 pub reason: String,
670 pub matched_data: Option<String>,
671}
672
673#[derive(Debug, serde::Serialize)]
675pub struct AgentDecision {
676 pub agent_name: String,
677 pub event: String,
678 pub action: String,
679 pub latency_ms: u64,
680 pub metadata: serde_json::Value,
681}
682
683impl AuditLogEntry {
684 pub fn new(correlation_id: String, method: String, path: String) -> Self {
686 Self {
687 timestamp: chrono::Utc::now().to_rfc3339(),
688 correlation_id,
689 event_type: "request".to_string(),
690 route: None,
691 client_addr: None,
692 user_agent: None,
693 method,
694 path,
695 status: None,
696 duration_ms: 0,
697 upstream: None,
698 waf_decision: None,
699 agent_decisions: vec![],
700 error: None,
701 tags: vec![],
702 }
703 }
704
705 pub fn write(&self) {
707 match serde_json::to_string(self) {
708 Ok(json) => println!("AUDIT: {}", json),
709 Err(e) => error!("Failed to serialize audit log: {}", e),
710 }
711 }
712}
713
714#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
716pub enum HealthStatus {
717 Healthy,
718 Degraded,
719 Unhealthy,
720}
721
722#[derive(Debug, Clone, serde::Serialize)]
724pub struct ComponentHealth {
725 pub name: String,
726 pub status: HealthStatus,
727 pub last_check: chrono::DateTime<chrono::Utc>,
728 pub consecutive_failures: u32,
729 pub error_message: Option<String>,
730}
731
732pub struct ComponentHealthTracker {
737 components: parking_lot::RwLock<Vec<ComponentHealth>>,
738}
739
740impl Default for ComponentHealthTracker {
741 fn default() -> Self {
742 Self::new()
743 }
744}
745
746impl ComponentHealthTracker {
747 pub fn new() -> Self {
749 Self {
750 components: parking_lot::RwLock::new(vec![]),
751 }
752 }
753
754 pub fn update_component(&self, name: String, status: HealthStatus, error: Option<String>) {
756 let mut components = self.components.write();
757
758 if let Some(component) = components.iter_mut().find(|c| c.name == name) {
759 component.status = status;
760 component.last_check = chrono::Utc::now();
761 component.error_message = error;
762
763 if status != HealthStatus::Healthy {
764 component.consecutive_failures += 1;
765 } else {
766 component.consecutive_failures = 0;
767 }
768 } else {
769 components.push(ComponentHealth {
770 name,
771 status,
772 last_check: chrono::Utc::now(),
773 consecutive_failures: if status != HealthStatus::Healthy {
774 1
775 } else {
776 0
777 },
778 error_message: error,
779 });
780 }
781 }
782
783 pub fn get_status(&self) -> HealthStatus {
785 let components = self.components.read();
786
787 if components.is_empty() {
788 return HealthStatus::Healthy;
789 }
790
791 let unhealthy_count = components
792 .iter()
793 .filter(|c| c.status == HealthStatus::Unhealthy)
794 .count();
795 let degraded_count = components
796 .iter()
797 .filter(|c| c.status == HealthStatus::Degraded)
798 .count();
799
800 if unhealthy_count > 0 {
801 HealthStatus::Unhealthy
802 } else if degraded_count > 0 {
803 HealthStatus::Degraded
804 } else {
805 HealthStatus::Healthy
806 }
807 }
808
809 pub fn get_report(&self) -> Vec<ComponentHealth> {
811 self.components.read().clone()
812 }
813}
814
815#[cfg(test)]
816mod tests {
817 use super::*;
818
819 #[test]
820 fn test_metrics_creation() {
821 let metrics = RequestMetrics::new().expect("Failed to create metrics");
822
823 metrics.record_request("test_route", "GET", 200, Duration::from_millis(100));
825
826 metrics.inc_active_requests();
828 metrics.dec_active_requests();
829
830 metrics.record_upstream_attempt("backend1", "test_route");
832 }
833
834 #[test]
835 fn test_audit_log() {
836 let mut entry = AuditLogEntry::new(
837 "test-correlation-id".to_string(),
838 "GET".to_string(),
839 "/api/test".to_string(),
840 );
841
842 entry.status = Some(200);
843 entry.duration_ms = 150;
844 entry.tags.push("test".to_string());
845
846 let json = serde_json::to_string(&entry).expect("Failed to serialize audit log");
849 assert!(json.contains("test-correlation-id"));
850 }
851
852 #[test]
853 fn test_health_checker() {
854 let checker = ComponentHealthTracker::new();
855
856 assert_eq!(checker.get_status(), HealthStatus::Healthy);
858
859 checker.update_component("upstream1".to_string(), HealthStatus::Healthy, None);
861 assert_eq!(checker.get_status(), HealthStatus::Healthy);
862
863 checker.update_component(
865 "agent1".to_string(),
866 HealthStatus::Degraded,
867 Some("Slow response".to_string()),
868 );
869 assert_eq!(checker.get_status(), HealthStatus::Degraded);
870
871 checker.update_component(
873 "upstream2".to_string(),
874 HealthStatus::Unhealthy,
875 Some("Connection refused".to_string()),
876 );
877 assert_eq!(checker.get_status(), HealthStatus::Unhealthy);
878 }
879}