1use anyhow::{Context, Result};
7use prometheus::{
8 register_counter_vec, register_gauge, register_histogram_vec, register_int_counter_vec,
9 register_int_gauge, register_int_gauge_vec, CounterVec, Gauge, HistogramVec, IntCounterVec,
10 IntGauge, IntGaugeVec,
11};
12use std::time::Duration;
13use tracing::{error, info};
14use tracing_subscriber::{fmt, prelude::*, EnvFilter};
15
16pub fn init_tracing() -> Result<()> {
18 let json_layer =
20 if std::env::var("SENTINEL_LOG_FORMAT").unwrap_or_else(|_| "json".to_string()) == "json" {
21 Some(
22 fmt::layer()
23 .json()
24 .with_target(true)
25 .with_thread_ids(true)
26 .with_thread_names(true)
27 .with_file(true)
28 .with_line_number(true),
29 )
30 } else {
31 None
32 };
33
34 let pretty_layer = if std::env::var("SENTINEL_LOG_FORMAT")
36 .unwrap_or_else(|_| "json".to_string())
37 == "pretty"
38 {
39 Some(
40 fmt::layer()
41 .pretty()
42 .with_target(true)
43 .with_thread_ids(true)
44 .with_thread_names(true)
45 .with_file(true)
46 .with_line_number(true),
47 )
48 } else {
49 None
50 };
51
52 let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
54
55 tracing_subscriber::registry()
56 .with(env_filter)
57 .with(json_layer)
58 .with(pretty_layer)
59 .init();
60
61 info!("Tracing initialized");
62 Ok(())
63}
64
65pub struct RequestMetrics {
67 request_duration: HistogramVec,
69 request_count: IntCounterVec,
71 active_requests: IntGauge,
73 upstream_attempts: IntCounterVec,
75 upstream_failures: IntCounterVec,
77 circuit_breaker_state: IntGaugeVec,
79 agent_latency: HistogramVec,
81 agent_timeouts: IntCounterVec,
83 blocked_requests: CounterVec,
85 request_body_size: HistogramVec,
87 response_body_size: HistogramVec,
89 tls_handshake_duration: HistogramVec,
91 connection_pool_size: IntGaugeVec,
93 connection_pool_idle: IntGaugeVec,
94 connection_pool_acquired: IntCounterVec,
95 memory_usage: IntGauge,
97 cpu_usage: Gauge,
98 open_connections: IntGauge,
99 websocket_frames_total: IntCounterVec,
101 websocket_connections_total: IntCounterVec,
102 websocket_inspection_duration: HistogramVec,
103 websocket_frame_size: HistogramVec,
104 decompression_total: IntCounterVec,
106 decompression_ratio: HistogramVec,
107}
108
109impl RequestMetrics {
110 pub fn new() -> Result<Self> {
112 let latency_buckets = vec![
114 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
115 ];
116
117 let size_buckets = vec![
119 100.0,
120 1_000.0,
121 10_000.0,
122 100_000.0,
123 1_000_000.0,
124 10_000_000.0,
125 100_000_000.0,
126 ];
127
128 let request_duration = register_histogram_vec!(
129 "sentinel_request_duration_seconds",
130 "Request duration in seconds",
131 &["route", "method"],
132 latency_buckets.clone()
133 )
134 .context("Failed to register request_duration metric")?;
135
136 let request_count = register_int_counter_vec!(
137 "sentinel_requests_total",
138 "Total number of requests",
139 &["route", "method", "status"]
140 )
141 .context("Failed to register request_count metric")?;
142
143 let active_requests = register_int_gauge!(
144 "sentinel_active_requests",
145 "Number of currently active requests"
146 )
147 .context("Failed to register active_requests metric")?;
148
149 let upstream_attempts = register_int_counter_vec!(
150 "sentinel_upstream_attempts_total",
151 "Total upstream connection attempts",
152 &["upstream", "route"]
153 )
154 .context("Failed to register upstream_attempts metric")?;
155
156 let upstream_failures = register_int_counter_vec!(
157 "sentinel_upstream_failures_total",
158 "Total upstream connection failures",
159 &["upstream", "route", "reason"]
160 )
161 .context("Failed to register upstream_failures metric")?;
162
163 let circuit_breaker_state = register_int_gauge_vec!(
164 "sentinel_circuit_breaker_state",
165 "Circuit breaker state (0=closed, 1=open)",
166 &["component", "route"]
167 )
168 .context("Failed to register circuit_breaker_state metric")?;
169
170 let agent_latency = register_histogram_vec!(
171 "sentinel_agent_latency_seconds",
172 "Agent call latency in seconds",
173 &["agent", "event"],
174 latency_buckets.clone()
175 )
176 .context("Failed to register agent_latency metric")?;
177
178 let agent_timeouts = register_int_counter_vec!(
179 "sentinel_agent_timeouts_total",
180 "Total agent call timeouts",
181 &["agent", "event"]
182 )
183 .context("Failed to register agent_timeouts metric")?;
184
185 let blocked_requests = register_counter_vec!(
186 "sentinel_blocked_requests_total",
187 "Total blocked requests by reason",
188 &["reason"]
189 )
190 .context("Failed to register blocked_requests metric")?;
191
192 let request_body_size = register_histogram_vec!(
193 "sentinel_request_body_size_bytes",
194 "Request body size in bytes",
195 &["route"],
196 size_buckets.clone()
197 )
198 .context("Failed to register request_body_size metric")?;
199
200 let response_body_size = register_histogram_vec!(
201 "sentinel_response_body_size_bytes",
202 "Response body size in bytes",
203 &["route"],
204 size_buckets.clone()
205 )
206 .context("Failed to register response_body_size metric")?;
207
208 let tls_handshake_duration = register_histogram_vec!(
209 "sentinel_tls_handshake_duration_seconds",
210 "TLS handshake duration in seconds",
211 &["version"],
212 latency_buckets
213 )
214 .context("Failed to register tls_handshake_duration metric")?;
215
216 let connection_pool_size = register_int_gauge_vec!(
217 "sentinel_connection_pool_size",
218 "Total connections in pool",
219 &["upstream"]
220 )
221 .context("Failed to register connection_pool_size metric")?;
222
223 let connection_pool_idle = register_int_gauge_vec!(
224 "sentinel_connection_pool_idle",
225 "Idle connections in pool",
226 &["upstream"]
227 )
228 .context("Failed to register connection_pool_idle metric")?;
229
230 let connection_pool_acquired = register_int_counter_vec!(
231 "sentinel_connection_pool_acquired_total",
232 "Total connections acquired from pool",
233 &["upstream"]
234 )
235 .context("Failed to register connection_pool_acquired metric")?;
236
237 let memory_usage = register_int_gauge!(
238 "sentinel_memory_usage_bytes",
239 "Current memory usage in bytes"
240 )
241 .context("Failed to register memory_usage metric")?;
242
243 let cpu_usage =
244 register_gauge!("sentinel_cpu_usage_percent", "Current CPU usage percentage")
245 .context("Failed to register cpu_usage metric")?;
246
247 let open_connections =
248 register_int_gauge!("sentinel_open_connections", "Number of open connections")
249 .context("Failed to register open_connections metric")?;
250
251 let websocket_frames_total = register_int_counter_vec!(
253 "sentinel_websocket_frames_total",
254 "Total WebSocket frames processed",
255 &["route", "direction", "opcode", "decision"]
256 )
257 .context("Failed to register websocket_frames_total metric")?;
258
259 let websocket_connections_total = register_int_counter_vec!(
260 "sentinel_websocket_connections_total",
261 "Total WebSocket connections with inspection enabled",
262 &["route"]
263 )
264 .context("Failed to register websocket_connections_total metric")?;
265
266 let frame_latency_buckets = vec![
268 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5,
269 ];
270
271 let websocket_inspection_duration = register_histogram_vec!(
272 "sentinel_websocket_inspection_duration_seconds",
273 "WebSocket frame inspection duration in seconds",
274 &["route"],
275 frame_latency_buckets
276 )
277 .context("Failed to register websocket_inspection_duration metric")?;
278
279 let frame_size_buckets = vec![
281 64.0, 256.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0,
282 ];
283
284 let websocket_frame_size = register_histogram_vec!(
285 "sentinel_websocket_frame_size_bytes",
286 "WebSocket frame payload size in bytes",
287 &["route", "direction", "opcode"],
288 frame_size_buckets
289 )
290 .context("Failed to register websocket_frame_size metric")?;
291
292 let decompression_total = register_int_counter_vec!(
294 "sentinel_decompression_total",
295 "Total body decompression operations",
296 &["encoding", "result"]
297 )
298 .context("Failed to register decompression_total metric")?;
299
300 let ratio_buckets = vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0];
302
303 let decompression_ratio = register_histogram_vec!(
304 "sentinel_decompression_ratio",
305 "Decompression ratio (decompressed_size / compressed_size)",
306 &["encoding"],
307 ratio_buckets
308 )
309 .context("Failed to register decompression_ratio metric")?;
310
311 Ok(Self {
312 request_duration,
313 request_count,
314 active_requests,
315 upstream_attempts,
316 upstream_failures,
317 circuit_breaker_state,
318 agent_latency,
319 agent_timeouts,
320 blocked_requests,
321 request_body_size,
322 response_body_size,
323 tls_handshake_duration,
324 connection_pool_size,
325 connection_pool_idle,
326 connection_pool_acquired,
327 memory_usage,
328 cpu_usage,
329 open_connections,
330 websocket_frames_total,
331 websocket_connections_total,
332 websocket_inspection_duration,
333 websocket_frame_size,
334 decompression_total,
335 decompression_ratio,
336 })
337 }
338
339 pub fn record_request(&self, route: &str, method: &str, status: u16, duration: Duration) {
341 self.request_duration
342 .with_label_values(&[route, method])
343 .observe(duration.as_secs_f64());
344
345 self.request_count
346 .with_label_values(&[route, method, &status.to_string()])
347 .inc();
348 }
349
350 pub fn inc_active_requests(&self) {
352 self.active_requests.inc();
353 }
354
355 pub fn dec_active_requests(&self) {
357 self.active_requests.dec();
358 }
359
360 pub fn record_upstream_attempt(&self, upstream: &str, route: &str) {
362 self.upstream_attempts
363 .with_label_values(&[upstream, route])
364 .inc();
365 }
366
367 pub fn record_upstream_failure(&self, upstream: &str, route: &str, reason: &str) {
369 self.upstream_failures
370 .with_label_values(&[upstream, route, reason])
371 .inc();
372 }
373
374 pub fn set_circuit_breaker_state(&self, component: &str, route: &str, is_open: bool) {
376 let state = if is_open { 1 } else { 0 };
377 self.circuit_breaker_state
378 .with_label_values(&[component, route])
379 .set(state);
380 }
381
382 pub fn record_agent_latency(&self, agent: &str, event: &str, duration: Duration) {
384 self.agent_latency
385 .with_label_values(&[agent, event])
386 .observe(duration.as_secs_f64());
387 }
388
389 pub fn record_agent_timeout(&self, agent: &str, event: &str) {
391 self.agent_timeouts.with_label_values(&[agent, event]).inc();
392 }
393
394 pub fn record_blocked_request(&self, reason: &str) {
396 self.blocked_requests.with_label_values(&[reason]).inc();
397 }
398
399 pub fn record_request_body_size(&self, route: &str, size_bytes: usize) {
401 self.request_body_size
402 .with_label_values(&[route])
403 .observe(size_bytes as f64);
404 }
405
406 pub fn record_response_body_size(&self, route: &str, size_bytes: usize) {
408 self.response_body_size
409 .with_label_values(&[route])
410 .observe(size_bytes as f64);
411 }
412
413 pub fn record_tls_handshake(&self, version: &str, duration: Duration) {
415 self.tls_handshake_duration
416 .with_label_values(&[version])
417 .observe(duration.as_secs_f64());
418 }
419
420 pub fn update_connection_pool(&self, upstream: &str, size: i64, idle: i64) {
422 self.connection_pool_size
423 .with_label_values(&[upstream])
424 .set(size);
425 self.connection_pool_idle
426 .with_label_values(&[upstream])
427 .set(idle);
428 }
429
430 pub fn record_connection_acquired(&self, upstream: &str) {
432 self.connection_pool_acquired
433 .with_label_values(&[upstream])
434 .inc();
435 }
436
437 pub fn update_system_metrics(&self) {
439 use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System};
440
441 let mut system = System::new_with_specifics(
443 RefreshKind::new()
444 .with_cpu(CpuRefreshKind::everything())
445 .with_memory(MemoryRefreshKind::everything()),
446 );
447
448 self.memory_usage.set(system.total_memory() as i64);
450
451 system.refresh_cpu_usage();
453 self.cpu_usage.set(system.global_cpu_usage() as f64);
454 }
455
456 pub fn set_open_connections(&self, count: i64) {
458 self.open_connections.set(count);
459 }
460
461 pub fn record_websocket_frame(
471 &self,
472 route: &str,
473 direction: &str,
474 opcode: &str,
475 decision: &str,
476 ) {
477 self.websocket_frames_total
478 .with_label_values(&[route, direction, opcode, decision])
479 .inc();
480 }
481
482 pub fn record_websocket_connection(&self, route: &str) {
484 self.websocket_connections_total
485 .with_label_values(&[route])
486 .inc();
487 }
488
489 pub fn record_websocket_inspection_duration(&self, route: &str, duration: Duration) {
491 self.websocket_inspection_duration
492 .with_label_values(&[route])
493 .observe(duration.as_secs_f64());
494 }
495
496 pub fn record_websocket_frame_size(
504 &self,
505 route: &str,
506 direction: &str,
507 opcode: &str,
508 size_bytes: usize,
509 ) {
510 self.websocket_frame_size
511 .with_label_values(&[route, direction, opcode])
512 .observe(size_bytes as f64);
513 }
514
515 pub fn record_decompression_success(&self, encoding: &str, ratio: f64) {
523 self.decompression_total
524 .with_label_values(&[encoding, "success"])
525 .inc();
526 self.decompression_ratio
527 .with_label_values(&[encoding])
528 .observe(ratio);
529 }
530
531 pub fn record_decompression_failure(&self, encoding: &str, reason: &str) {
537 self.decompression_total
538 .with_label_values(&[encoding, reason])
539 .inc();
540 }
541}
542
543#[derive(Debug, serde::Serialize)]
545pub struct AuditLogEntry {
546 pub timestamp: String,
547 pub correlation_id: String,
548 pub event_type: String,
549 pub route: Option<String>,
550 pub client_addr: Option<String>,
551 pub user_agent: Option<String>,
552 pub method: String,
553 pub path: String,
554 pub status: Option<u16>,
555 pub duration_ms: u64,
556 pub upstream: Option<String>,
557 pub waf_decision: Option<WafDecision>,
558 pub agent_decisions: Vec<AgentDecision>,
559 pub error: Option<String>,
560 pub tags: Vec<String>,
561}
562
563#[derive(Debug, serde::Serialize)]
565pub struct WafDecision {
566 pub action: String,
567 pub rule_ids: Vec<String>,
568 pub confidence: f32,
569 pub reason: String,
570 pub matched_data: Option<String>,
571}
572
573#[derive(Debug, serde::Serialize)]
575pub struct AgentDecision {
576 pub agent_name: String,
577 pub event: String,
578 pub action: String,
579 pub latency_ms: u64,
580 pub metadata: serde_json::Value,
581}
582
583impl AuditLogEntry {
584 pub fn new(correlation_id: String, method: String, path: String) -> Self {
586 Self {
587 timestamp: chrono::Utc::now().to_rfc3339(),
588 correlation_id,
589 event_type: "request".to_string(),
590 route: None,
591 client_addr: None,
592 user_agent: None,
593 method,
594 path,
595 status: None,
596 duration_ms: 0,
597 upstream: None,
598 waf_decision: None,
599 agent_decisions: vec![],
600 error: None,
601 tags: vec![],
602 }
603 }
604
605 pub fn write(&self) {
607 match serde_json::to_string(self) {
608 Ok(json) => println!("AUDIT: {}", json),
609 Err(e) => error!("Failed to serialize audit log: {}", e),
610 }
611 }
612}
613
614#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
616pub enum HealthStatus {
617 Healthy,
618 Degraded,
619 Unhealthy,
620}
621
622#[derive(Debug, Clone, serde::Serialize)]
624pub struct ComponentHealth {
625 pub name: String,
626 pub status: HealthStatus,
627 pub last_check: chrono::DateTime<chrono::Utc>,
628 pub consecutive_failures: u32,
629 pub error_message: Option<String>,
630}
631
632pub struct ComponentHealthTracker {
637 components: parking_lot::RwLock<Vec<ComponentHealth>>,
638}
639
640impl Default for ComponentHealthTracker {
641 fn default() -> Self {
642 Self::new()
643 }
644}
645
646impl ComponentHealthTracker {
647 pub fn new() -> Self {
649 Self {
650 components: parking_lot::RwLock::new(vec![]),
651 }
652 }
653
654 pub fn update_component(&self, name: String, status: HealthStatus, error: Option<String>) {
656 let mut components = self.components.write();
657
658 if let Some(component) = components.iter_mut().find(|c| c.name == name) {
659 component.status = status;
660 component.last_check = chrono::Utc::now();
661 component.error_message = error;
662
663 if status != HealthStatus::Healthy {
664 component.consecutive_failures += 1;
665 } else {
666 component.consecutive_failures = 0;
667 }
668 } else {
669 components.push(ComponentHealth {
670 name,
671 status,
672 last_check: chrono::Utc::now(),
673 consecutive_failures: if status != HealthStatus::Healthy {
674 1
675 } else {
676 0
677 },
678 error_message: error,
679 });
680 }
681 }
682
683 pub fn get_status(&self) -> HealthStatus {
685 let components = self.components.read();
686
687 if components.is_empty() {
688 return HealthStatus::Healthy;
689 }
690
691 let unhealthy_count = components
692 .iter()
693 .filter(|c| c.status == HealthStatus::Unhealthy)
694 .count();
695 let degraded_count = components
696 .iter()
697 .filter(|c| c.status == HealthStatus::Degraded)
698 .count();
699
700 if unhealthy_count > 0 {
701 HealthStatus::Unhealthy
702 } else if degraded_count > 0 {
703 HealthStatus::Degraded
704 } else {
705 HealthStatus::Healthy
706 }
707 }
708
709 pub fn get_report(&self) -> Vec<ComponentHealth> {
711 self.components.read().clone()
712 }
713}
714
715#[cfg(test)]
716mod tests {
717 use super::*;
718
719 #[test]
720 fn test_metrics_creation() {
721 let metrics = RequestMetrics::new().expect("Failed to create metrics");
722
723 metrics.record_request("test_route", "GET", 200, Duration::from_millis(100));
725
726 metrics.inc_active_requests();
728 metrics.dec_active_requests();
729
730 metrics.record_upstream_attempt("backend1", "test_route");
732 }
733
734 #[test]
735 fn test_audit_log() {
736 let mut entry = AuditLogEntry::new(
737 "test-correlation-id".to_string(),
738 "GET".to_string(),
739 "/api/test".to_string(),
740 );
741
742 entry.status = Some(200);
743 entry.duration_ms = 150;
744 entry.tags.push("test".to_string());
745
746 let json = serde_json::to_string(&entry).expect("Failed to serialize audit log");
749 assert!(json.contains("test-correlation-id"));
750 }
751
752 #[test]
753 fn test_health_checker() {
754 let checker = ComponentHealthTracker::new();
755
756 assert_eq!(checker.get_status(), HealthStatus::Healthy);
758
759 checker.update_component("upstream1".to_string(), HealthStatus::Healthy, None);
761 assert_eq!(checker.get_status(), HealthStatus::Healthy);
762
763 checker.update_component(
765 "agent1".to_string(),
766 HealthStatus::Degraded,
767 Some("Slow response".to_string()),
768 );
769 assert_eq!(checker.get_status(), HealthStatus::Degraded);
770
771 checker.update_component(
773 "upstream2".to_string(),
774 HealthStatus::Unhealthy,
775 Some("Connection refused".to_string()),
776 );
777 assert_eq!(checker.get_status(), HealthStatus::Unhealthy);
778 }
779}