sentinel_common/
observability.rs

1//! Observability module for Sentinel proxy
2//!
3//! Provides metrics, logging, and tracing infrastructure with a focus on
4//! production reliability and sleepable operations.
5
6use anyhow::{Context, Result};
7use prometheus::{
8    register_counter_vec, register_gauge, register_histogram_vec, register_int_counter_vec,
9    register_int_gauge, register_int_gauge_vec, CounterVec, Gauge, HistogramVec, IntCounterVec,
10    IntGauge, IntGaugeVec,
11};
12use std::time::Duration;
13use tracing::{error, info};
14use tracing_subscriber::{fmt, prelude::*, EnvFilter};
15
16/// Initialize the tracing/logging subsystem
17pub fn init_tracing() -> Result<()> {
18    // Use JSON format for structured logging in production
19    let json_layer =
20        if std::env::var("SENTINEL_LOG_FORMAT").unwrap_or_else(|_| "json".to_string()) == "json" {
21            Some(
22                fmt::layer()
23                    .json()
24                    .with_target(true)
25                    .with_thread_ids(true)
26                    .with_thread_names(true)
27                    .with_file(true)
28                    .with_line_number(true),
29            )
30        } else {
31            None
32        };
33
34    // Pretty format for development
35    let pretty_layer = if std::env::var("SENTINEL_LOG_FORMAT")
36        .unwrap_or_else(|_| "json".to_string())
37        == "pretty"
38    {
39        Some(
40            fmt::layer()
41                .pretty()
42                .with_target(true)
43                .with_thread_ids(true)
44                .with_thread_names(true)
45                .with_file(true)
46                .with_line_number(true),
47        )
48    } else {
49        None
50    };
51
52    // Configure log level from environment
53    let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
54
55    tracing_subscriber::registry()
56        .with(env_filter)
57        .with(json_layer)
58        .with(pretty_layer)
59        .init();
60
61    info!("Tracing initialized");
62    Ok(())
63}
64
65/// Request metrics collector
66pub struct RequestMetrics {
67    /// Request latency histogram by route
68    request_duration: HistogramVec,
69    /// Request count by route and status code
70    request_count: IntCounterVec,
71    /// Active requests gauge
72    active_requests: IntGauge,
73    /// Upstream connection attempts
74    upstream_attempts: IntCounterVec,
75    /// Upstream failures
76    upstream_failures: IntCounterVec,
77    /// Circuit breaker state (0 = closed, 1 = open)
78    circuit_breaker_state: IntGaugeVec,
79    /// Agent call latency
80    agent_latency: HistogramVec,
81    /// Agent call timeouts
82    agent_timeouts: IntCounterVec,
83    /// Blocked requests by reason
84    blocked_requests: CounterVec,
85    /// Request body size histogram
86    request_body_size: HistogramVec,
87    /// Response body size histogram
88    response_body_size: HistogramVec,
89    /// TLS handshake duration
90    tls_handshake_duration: HistogramVec,
91    /// Connection pool metrics
92    connection_pool_size: IntGaugeVec,
93    connection_pool_idle: IntGaugeVec,
94    connection_pool_acquired: IntCounterVec,
95    /// System metrics
96    memory_usage: IntGauge,
97    cpu_usage: Gauge,
98    open_connections: IntGauge,
99    /// WebSocket metrics
100    websocket_frames_total: IntCounterVec,
101    websocket_connections_total: IntCounterVec,
102    websocket_inspection_duration: HistogramVec,
103    websocket_frame_size: HistogramVec,
104    /// Body decompression metrics
105    decompression_total: IntCounterVec,
106    decompression_ratio: HistogramVec,
107}
108
109impl RequestMetrics {
110    /// Create new metrics collector and register with Prometheus
111    pub fn new() -> Result<Self> {
112        // Define buckets for latency histograms (in seconds)
113        let latency_buckets = vec![
114            0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
115        ];
116
117        // Define buckets for body size (in bytes)
118        let size_buckets = vec![
119            100.0,
120            1_000.0,
121            10_000.0,
122            100_000.0,
123            1_000_000.0,
124            10_000_000.0,
125            100_000_000.0,
126        ];
127
128        let request_duration = register_histogram_vec!(
129            "sentinel_request_duration_seconds",
130            "Request duration in seconds",
131            &["route", "method"],
132            latency_buckets.clone()
133        )
134        .context("Failed to register request_duration metric")?;
135
136        let request_count = register_int_counter_vec!(
137            "sentinel_requests_total",
138            "Total number of requests",
139            &["route", "method", "status"]
140        )
141        .context("Failed to register request_count metric")?;
142
143        let active_requests = register_int_gauge!(
144            "sentinel_active_requests",
145            "Number of currently active requests"
146        )
147        .context("Failed to register active_requests metric")?;
148
149        let upstream_attempts = register_int_counter_vec!(
150            "sentinel_upstream_attempts_total",
151            "Total upstream connection attempts",
152            &["upstream", "route"]
153        )
154        .context("Failed to register upstream_attempts metric")?;
155
156        let upstream_failures = register_int_counter_vec!(
157            "sentinel_upstream_failures_total",
158            "Total upstream connection failures",
159            &["upstream", "route", "reason"]
160        )
161        .context("Failed to register upstream_failures metric")?;
162
163        let circuit_breaker_state = register_int_gauge_vec!(
164            "sentinel_circuit_breaker_state",
165            "Circuit breaker state (0=closed, 1=open)",
166            &["component", "route"]
167        )
168        .context("Failed to register circuit_breaker_state metric")?;
169
170        let agent_latency = register_histogram_vec!(
171            "sentinel_agent_latency_seconds",
172            "Agent call latency in seconds",
173            &["agent", "event"],
174            latency_buckets.clone()
175        )
176        .context("Failed to register agent_latency metric")?;
177
178        let agent_timeouts = register_int_counter_vec!(
179            "sentinel_agent_timeouts_total",
180            "Total agent call timeouts",
181            &["agent", "event"]
182        )
183        .context("Failed to register agent_timeouts metric")?;
184
185        let blocked_requests = register_counter_vec!(
186            "sentinel_blocked_requests_total",
187            "Total blocked requests by reason",
188            &["reason"]
189        )
190        .context("Failed to register blocked_requests metric")?;
191
192        let request_body_size = register_histogram_vec!(
193            "sentinel_request_body_size_bytes",
194            "Request body size in bytes",
195            &["route"],
196            size_buckets.clone()
197        )
198        .context("Failed to register request_body_size metric")?;
199
200        let response_body_size = register_histogram_vec!(
201            "sentinel_response_body_size_bytes",
202            "Response body size in bytes",
203            &["route"],
204            size_buckets.clone()
205        )
206        .context("Failed to register response_body_size metric")?;
207
208        let tls_handshake_duration = register_histogram_vec!(
209            "sentinel_tls_handshake_duration_seconds",
210            "TLS handshake duration in seconds",
211            &["version"],
212            latency_buckets
213        )
214        .context("Failed to register tls_handshake_duration metric")?;
215
216        let connection_pool_size = register_int_gauge_vec!(
217            "sentinel_connection_pool_size",
218            "Total connections in pool",
219            &["upstream"]
220        )
221        .context("Failed to register connection_pool_size metric")?;
222
223        let connection_pool_idle = register_int_gauge_vec!(
224            "sentinel_connection_pool_idle",
225            "Idle connections in pool",
226            &["upstream"]
227        )
228        .context("Failed to register connection_pool_idle metric")?;
229
230        let connection_pool_acquired = register_int_counter_vec!(
231            "sentinel_connection_pool_acquired_total",
232            "Total connections acquired from pool",
233            &["upstream"]
234        )
235        .context("Failed to register connection_pool_acquired metric")?;
236
237        let memory_usage = register_int_gauge!(
238            "sentinel_memory_usage_bytes",
239            "Current memory usage in bytes"
240        )
241        .context("Failed to register memory_usage metric")?;
242
243        let cpu_usage =
244            register_gauge!("sentinel_cpu_usage_percent", "Current CPU usage percentage")
245                .context("Failed to register cpu_usage metric")?;
246
247        let open_connections =
248            register_int_gauge!("sentinel_open_connections", "Number of open connections")
249                .context("Failed to register open_connections metric")?;
250
251        // WebSocket metrics
252        let websocket_frames_total = register_int_counter_vec!(
253            "sentinel_websocket_frames_total",
254            "Total WebSocket frames processed",
255            &["route", "direction", "opcode", "decision"]
256        )
257        .context("Failed to register websocket_frames_total metric")?;
258
259        let websocket_connections_total = register_int_counter_vec!(
260            "sentinel_websocket_connections_total",
261            "Total WebSocket connections with inspection enabled",
262            &["route"]
263        )
264        .context("Failed to register websocket_connections_total metric")?;
265
266        // Use smaller latency buckets for frame inspection (typically fast)
267        let frame_latency_buckets = vec![
268            0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5,
269        ];
270
271        let websocket_inspection_duration = register_histogram_vec!(
272            "sentinel_websocket_inspection_duration_seconds",
273            "WebSocket frame inspection duration in seconds",
274            &["route"],
275            frame_latency_buckets
276        )
277        .context("Failed to register websocket_inspection_duration metric")?;
278
279        // Frame size buckets (bytes)
280        let frame_size_buckets = vec![
281            64.0, 256.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0,
282        ];
283
284        let websocket_frame_size = register_histogram_vec!(
285            "sentinel_websocket_frame_size_bytes",
286            "WebSocket frame payload size in bytes",
287            &["route", "direction", "opcode"],
288            frame_size_buckets
289        )
290        .context("Failed to register websocket_frame_size metric")?;
291
292        // Body decompression metrics
293        let decompression_total = register_int_counter_vec!(
294            "sentinel_decompression_total",
295            "Total body decompression operations",
296            &["encoding", "result"]
297        )
298        .context("Failed to register decompression_total metric")?;
299
300        // Decompression ratio buckets (compressed:decompressed)
301        let ratio_buckets = vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0];
302
303        let decompression_ratio = register_histogram_vec!(
304            "sentinel_decompression_ratio",
305            "Decompression ratio (decompressed_size / compressed_size)",
306            &["encoding"],
307            ratio_buckets
308        )
309        .context("Failed to register decompression_ratio metric")?;
310
311        Ok(Self {
312            request_duration,
313            request_count,
314            active_requests,
315            upstream_attempts,
316            upstream_failures,
317            circuit_breaker_state,
318            agent_latency,
319            agent_timeouts,
320            blocked_requests,
321            request_body_size,
322            response_body_size,
323            tls_handshake_duration,
324            connection_pool_size,
325            connection_pool_idle,
326            connection_pool_acquired,
327            memory_usage,
328            cpu_usage,
329            open_connections,
330            websocket_frames_total,
331            websocket_connections_total,
332            websocket_inspection_duration,
333            websocket_frame_size,
334            decompression_total,
335            decompression_ratio,
336        })
337    }
338
339    /// Record a completed request
340    pub fn record_request(&self, route: &str, method: &str, status: u16, duration: Duration) {
341        self.request_duration
342            .with_label_values(&[route, method])
343            .observe(duration.as_secs_f64());
344
345        self.request_count
346            .with_label_values(&[route, method, &status.to_string()])
347            .inc();
348    }
349
350    /// Increment active request counter
351    pub fn inc_active_requests(&self) {
352        self.active_requests.inc();
353    }
354
355    /// Decrement active request counter
356    pub fn dec_active_requests(&self) {
357        self.active_requests.dec();
358    }
359
360    /// Record an upstream attempt
361    pub fn record_upstream_attempt(&self, upstream: &str, route: &str) {
362        self.upstream_attempts
363            .with_label_values(&[upstream, route])
364            .inc();
365    }
366
367    /// Record an upstream failure
368    pub fn record_upstream_failure(&self, upstream: &str, route: &str, reason: &str) {
369        self.upstream_failures
370            .with_label_values(&[upstream, route, reason])
371            .inc();
372    }
373
374    /// Update circuit breaker state
375    pub fn set_circuit_breaker_state(&self, component: &str, route: &str, is_open: bool) {
376        let state = if is_open { 1 } else { 0 };
377        self.circuit_breaker_state
378            .with_label_values(&[component, route])
379            .set(state);
380    }
381
382    /// Record agent call latency
383    pub fn record_agent_latency(&self, agent: &str, event: &str, duration: Duration) {
384        self.agent_latency
385            .with_label_values(&[agent, event])
386            .observe(duration.as_secs_f64());
387    }
388
389    /// Record agent timeout
390    pub fn record_agent_timeout(&self, agent: &str, event: &str) {
391        self.agent_timeouts.with_label_values(&[agent, event]).inc();
392    }
393
394    /// Record a blocked request
395    pub fn record_blocked_request(&self, reason: &str) {
396        self.blocked_requests.with_label_values(&[reason]).inc();
397    }
398
399    /// Record request body size
400    pub fn record_request_body_size(&self, route: &str, size_bytes: usize) {
401        self.request_body_size
402            .with_label_values(&[route])
403            .observe(size_bytes as f64);
404    }
405
406    /// Record response body size
407    pub fn record_response_body_size(&self, route: &str, size_bytes: usize) {
408        self.response_body_size
409            .with_label_values(&[route])
410            .observe(size_bytes as f64);
411    }
412
413    /// Record TLS handshake duration
414    pub fn record_tls_handshake(&self, version: &str, duration: Duration) {
415        self.tls_handshake_duration
416            .with_label_values(&[version])
417            .observe(duration.as_secs_f64());
418    }
419
420    /// Update connection pool metrics
421    pub fn update_connection_pool(&self, upstream: &str, size: i64, idle: i64) {
422        self.connection_pool_size
423            .with_label_values(&[upstream])
424            .set(size);
425        self.connection_pool_idle
426            .with_label_values(&[upstream])
427            .set(idle);
428    }
429
430    /// Record connection acquisition from pool
431    pub fn record_connection_acquired(&self, upstream: &str) {
432        self.connection_pool_acquired
433            .with_label_values(&[upstream])
434            .inc();
435    }
436
437    /// Update system metrics
438    pub fn update_system_metrics(&self) {
439        use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System};
440
441        // Create system with specific refresh kinds
442        let mut system = System::new_with_specifics(
443            RefreshKind::new()
444                .with_cpu(CpuRefreshKind::everything())
445                .with_memory(MemoryRefreshKind::everything()),
446        );
447
448        // Get memory usage
449        self.memory_usage.set(system.total_memory() as i64);
450
451        // Get CPU usage
452        system.refresh_cpu_usage();
453        self.cpu_usage.set(system.global_cpu_usage() as f64);
454    }
455
456    /// Set open connections count
457    pub fn set_open_connections(&self, count: i64) {
458        self.open_connections.set(count);
459    }
460
461    // === WebSocket Metrics ===
462
463    /// Record a WebSocket frame being processed
464    ///
465    /// # Arguments
466    /// * `route` - The route ID
467    /// * `direction` - Frame direction: "c2s" (client to server) or "s2c" (server to client)
468    /// * `opcode` - Frame opcode: "text", "binary", "ping", "pong", "close", "continuation"
469    /// * `decision` - Inspection decision: "allow", "drop", or "close"
470    pub fn record_websocket_frame(
471        &self,
472        route: &str,
473        direction: &str,
474        opcode: &str,
475        decision: &str,
476    ) {
477        self.websocket_frames_total
478            .with_label_values(&[route, direction, opcode, decision])
479            .inc();
480    }
481
482    /// Record a WebSocket connection with inspection enabled
483    pub fn record_websocket_connection(&self, route: &str) {
484        self.websocket_connections_total
485            .with_label_values(&[route])
486            .inc();
487    }
488
489    /// Record WebSocket frame inspection duration
490    pub fn record_websocket_inspection_duration(&self, route: &str, duration: Duration) {
491        self.websocket_inspection_duration
492            .with_label_values(&[route])
493            .observe(duration.as_secs_f64());
494    }
495
496    /// Record WebSocket frame size
497    ///
498    /// # Arguments
499    /// * `route` - The route ID
500    /// * `direction` - Frame direction: "c2s" or "s2c"
501    /// * `opcode` - Frame opcode
502    /// * `size_bytes` - Frame payload size in bytes
503    pub fn record_websocket_frame_size(
504        &self,
505        route: &str,
506        direction: &str,
507        opcode: &str,
508        size_bytes: usize,
509    ) {
510        self.websocket_frame_size
511            .with_label_values(&[route, direction, opcode])
512            .observe(size_bytes as f64);
513    }
514
515    // === Decompression Metrics ===
516
517    /// Record a successful body decompression
518    ///
519    /// # Arguments
520    /// * `encoding` - Content-Encoding (gzip, deflate, br)
521    /// * `ratio` - Decompression ratio (decompressed_size / compressed_size)
522    pub fn record_decompression_success(&self, encoding: &str, ratio: f64) {
523        self.decompression_total
524            .with_label_values(&[encoding, "success"])
525            .inc();
526        self.decompression_ratio
527            .with_label_values(&[encoding])
528            .observe(ratio);
529    }
530
531    /// Record a failed body decompression
532    ///
533    /// # Arguments
534    /// * `encoding` - Content-Encoding (gzip, deflate, br)
535    /// * `reason` - Failure reason (ratio_exceeded, size_exceeded, invalid_data, unsupported)
536    pub fn record_decompression_failure(&self, encoding: &str, reason: &str) {
537        self.decompression_total
538            .with_label_values(&[encoding, reason])
539            .inc();
540    }
541}
542
543/// Structured log entry for audit logging
544#[derive(Debug, serde::Serialize)]
545pub struct AuditLogEntry {
546    pub timestamp: String,
547    pub correlation_id: String,
548    pub event_type: String,
549    pub route: Option<String>,
550    pub client_addr: Option<String>,
551    pub user_agent: Option<String>,
552    pub method: String,
553    pub path: String,
554    pub status: Option<u16>,
555    pub duration_ms: u64,
556    pub upstream: Option<String>,
557    pub waf_decision: Option<WafDecision>,
558    pub agent_decisions: Vec<AgentDecision>,
559    pub error: Option<String>,
560    pub tags: Vec<String>,
561}
562
563/// WAF decision details for audit logging
564#[derive(Debug, serde::Serialize)]
565pub struct WafDecision {
566    pub action: String,
567    pub rule_ids: Vec<String>,
568    pub confidence: f32,
569    pub reason: String,
570    pub matched_data: Option<String>,
571}
572
573/// Agent decision details for audit logging
574#[derive(Debug, serde::Serialize)]
575pub struct AgentDecision {
576    pub agent_name: String,
577    pub event: String,
578    pub action: String,
579    pub latency_ms: u64,
580    pub metadata: serde_json::Value,
581}
582
583impl AuditLogEntry {
584    /// Create a new audit log entry
585    pub fn new(correlation_id: String, method: String, path: String) -> Self {
586        Self {
587            timestamp: chrono::Utc::now().to_rfc3339(),
588            correlation_id,
589            event_type: "request".to_string(),
590            route: None,
591            client_addr: None,
592            user_agent: None,
593            method,
594            path,
595            status: None,
596            duration_ms: 0,
597            upstream: None,
598            waf_decision: None,
599            agent_decisions: vec![],
600            error: None,
601            tags: vec![],
602        }
603    }
604
605    /// Write the audit log entry
606    pub fn write(&self) {
607        match serde_json::to_string(self) {
608            Ok(json) => println!("AUDIT: {}", json),
609            Err(e) => error!("Failed to serialize audit log: {}", e),
610        }
611    }
612}
613
614/// Health check status for components
615#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
616pub enum HealthStatus {
617    Healthy,
618    Degraded,
619    Unhealthy,
620}
621
622/// Component health information
623#[derive(Debug, Clone, serde::Serialize)]
624pub struct ComponentHealth {
625    pub name: String,
626    pub status: HealthStatus,
627    pub last_check: chrono::DateTime<chrono::Utc>,
628    pub consecutive_failures: u32,
629    pub error_message: Option<String>,
630}
631
632/// Global health status aggregator
633///
634/// Tracks the health of all system components (upstreams, agents, etc.)
635/// and provides aggregate status for health endpoints.
636pub struct ComponentHealthTracker {
637    components: parking_lot::RwLock<Vec<ComponentHealth>>,
638}
639
640impl Default for ComponentHealthTracker {
641    fn default() -> Self {
642        Self::new()
643    }
644}
645
646impl ComponentHealthTracker {
647    /// Create new health checker
648    pub fn new() -> Self {
649        Self {
650            components: parking_lot::RwLock::new(vec![]),
651        }
652    }
653
654    /// Update component health
655    pub fn update_component(&self, name: String, status: HealthStatus, error: Option<String>) {
656        let mut components = self.components.write();
657
658        if let Some(component) = components.iter_mut().find(|c| c.name == name) {
659            component.status = status;
660            component.last_check = chrono::Utc::now();
661            component.error_message = error;
662
663            if status != HealthStatus::Healthy {
664                component.consecutive_failures += 1;
665            } else {
666                component.consecutive_failures = 0;
667            }
668        } else {
669            components.push(ComponentHealth {
670                name,
671                status,
672                last_check: chrono::Utc::now(),
673                consecutive_failures: if status != HealthStatus::Healthy {
674                    1
675                } else {
676                    0
677                },
678                error_message: error,
679            });
680        }
681    }
682
683    /// Get overall health status
684    pub fn get_status(&self) -> HealthStatus {
685        let components = self.components.read();
686
687        if components.is_empty() {
688            return HealthStatus::Healthy;
689        }
690
691        let unhealthy_count = components
692            .iter()
693            .filter(|c| c.status == HealthStatus::Unhealthy)
694            .count();
695        let degraded_count = components
696            .iter()
697            .filter(|c| c.status == HealthStatus::Degraded)
698            .count();
699
700        if unhealthy_count > 0 {
701            HealthStatus::Unhealthy
702        } else if degraded_count > 0 {
703            HealthStatus::Degraded
704        } else {
705            HealthStatus::Healthy
706        }
707    }
708
709    /// Get detailed health report
710    pub fn get_report(&self) -> Vec<ComponentHealth> {
711        self.components.read().clone()
712    }
713}
714
715#[cfg(test)]
716mod tests {
717    use super::*;
718
719    #[test]
720    fn test_metrics_creation() {
721        let metrics = RequestMetrics::new().expect("Failed to create metrics");
722
723        // Record a request
724        metrics.record_request("test_route", "GET", 200, Duration::from_millis(100));
725
726        // Verify active requests tracking
727        metrics.inc_active_requests();
728        metrics.dec_active_requests();
729
730        // Record upstream attempt
731        metrics.record_upstream_attempt("backend1", "test_route");
732    }
733
734    #[test]
735    fn test_audit_log() {
736        let mut entry = AuditLogEntry::new(
737            "test-correlation-id".to_string(),
738            "GET".to_string(),
739            "/api/test".to_string(),
740        );
741
742        entry.status = Some(200);
743        entry.duration_ms = 150;
744        entry.tags.push("test".to_string());
745
746        // This would write to stdout in production
747        // For testing, we just verify it serializes correctly
748        let json = serde_json::to_string(&entry).expect("Failed to serialize audit log");
749        assert!(json.contains("test-correlation-id"));
750    }
751
752    #[test]
753    fn test_health_checker() {
754        let checker = ComponentHealthTracker::new();
755
756        // Initially healthy
757        assert_eq!(checker.get_status(), HealthStatus::Healthy);
758
759        // Add healthy component
760        checker.update_component("upstream1".to_string(), HealthStatus::Healthy, None);
761        assert_eq!(checker.get_status(), HealthStatus::Healthy);
762
763        // Add degraded component
764        checker.update_component(
765            "agent1".to_string(),
766            HealthStatus::Degraded,
767            Some("Slow response".to_string()),
768        );
769        assert_eq!(checker.get_status(), HealthStatus::Degraded);
770
771        // Add unhealthy component
772        checker.update_component(
773            "upstream2".to_string(),
774            HealthStatus::Unhealthy,
775            Some("Connection refused".to_string()),
776        );
777        assert_eq!(checker.get_status(), HealthStatus::Unhealthy);
778    }
779}