sentinel_common/
observability.rs

1//! Observability module for Sentinel proxy
2//!
3//! Provides metrics, logging, and tracing infrastructure with a focus on
4//! production reliability and sleepable operations.
5
6use anyhow::{Context, Result};
7use prometheus::{
8    register_counter_vec, register_gauge, register_histogram_vec, register_int_counter_vec,
9    register_int_gauge, register_int_gauge_vec, CounterVec, Gauge, HistogramVec, IntCounterVec,
10    IntGauge, IntGaugeVec,
11};
12use std::time::Duration;
13use tracing::{error, info};
14use tracing_subscriber::{fmt, prelude::*, EnvFilter};
15
16/// Initialize the tracing/logging subsystem
17pub fn init_tracing() -> Result<()> {
18    // Use JSON format for structured logging in production
19    let json_layer =
20        if std::env::var("SENTINEL_LOG_FORMAT").unwrap_or_else(|_| "json".to_string()) == "json" {
21            Some(
22                fmt::layer()
23                    .json()
24                    .with_target(true)
25                    .with_thread_ids(true)
26                    .with_thread_names(true)
27                    .with_file(true)
28                    .with_line_number(true),
29            )
30        } else {
31            None
32        };
33
34    // Pretty format for development
35    let pretty_layer = if std::env::var("SENTINEL_LOG_FORMAT")
36        .unwrap_or_else(|_| "json".to_string())
37        == "pretty"
38    {
39        Some(
40            fmt::layer()
41                .pretty()
42                .with_target(true)
43                .with_thread_ids(true)
44                .with_thread_names(true)
45                .with_file(true)
46                .with_line_number(true),
47        )
48    } else {
49        None
50    };
51
52    // Configure log level from environment
53    let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
54
55    tracing_subscriber::registry()
56        .with(env_filter)
57        .with(json_layer)
58        .with(pretty_layer)
59        .init();
60
61    info!("Tracing initialized");
62    Ok(())
63}
64
65/// Request metrics collector
66pub struct RequestMetrics {
67    /// Request latency histogram by route
68    request_duration: HistogramVec,
69    /// Request count by route and status code
70    request_count: IntCounterVec,
71    /// Active requests gauge
72    active_requests: IntGauge,
73    /// Upstream connection attempts
74    upstream_attempts: IntCounterVec,
75    /// Upstream failures
76    upstream_failures: IntCounterVec,
77    /// Circuit breaker state (0 = closed, 1 = open)
78    circuit_breaker_state: IntGaugeVec,
79    /// Agent call latency
80    agent_latency: HistogramVec,
81    /// Agent call timeouts
82    agent_timeouts: IntCounterVec,
83    /// Blocked requests by reason
84    blocked_requests: CounterVec,
85    /// Request body size histogram
86    request_body_size: HistogramVec,
87    /// Response body size histogram
88    response_body_size: HistogramVec,
89    /// TLS handshake duration
90    tls_handshake_duration: HistogramVec,
91    /// Connection pool metrics
92    connection_pool_size: IntGaugeVec,
93    connection_pool_idle: IntGaugeVec,
94    connection_pool_acquired: IntCounterVec,
95    /// System metrics
96    memory_usage: IntGauge,
97    cpu_usage: Gauge,
98    open_connections: IntGauge,
99    /// WebSocket metrics
100    websocket_frames_total: IntCounterVec,
101    websocket_connections_total: IntCounterVec,
102    websocket_inspection_duration: HistogramVec,
103    websocket_frame_size: HistogramVec,
104}
105
106impl RequestMetrics {
107    /// Create new metrics collector and register with Prometheus
108    pub fn new() -> Result<Self> {
109        // Define buckets for latency histograms (in seconds)
110        let latency_buckets = vec![
111            0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
112        ];
113
114        // Define buckets for body size (in bytes)
115        let size_buckets = vec![
116            100.0,
117            1_000.0,
118            10_000.0,
119            100_000.0,
120            1_000_000.0,
121            10_000_000.0,
122            100_000_000.0,
123        ];
124
125        let request_duration = register_histogram_vec!(
126            "sentinel_request_duration_seconds",
127            "Request duration in seconds",
128            &["route", "method"],
129            latency_buckets.clone()
130        )
131        .context("Failed to register request_duration metric")?;
132
133        let request_count = register_int_counter_vec!(
134            "sentinel_requests_total",
135            "Total number of requests",
136            &["route", "method", "status"]
137        )
138        .context("Failed to register request_count metric")?;
139
140        let active_requests = register_int_gauge!(
141            "sentinel_active_requests",
142            "Number of currently active requests"
143        )
144        .context("Failed to register active_requests metric")?;
145
146        let upstream_attempts = register_int_counter_vec!(
147            "sentinel_upstream_attempts_total",
148            "Total upstream connection attempts",
149            &["upstream", "route"]
150        )
151        .context("Failed to register upstream_attempts metric")?;
152
153        let upstream_failures = register_int_counter_vec!(
154            "sentinel_upstream_failures_total",
155            "Total upstream connection failures",
156            &["upstream", "route", "reason"]
157        )
158        .context("Failed to register upstream_failures metric")?;
159
160        let circuit_breaker_state = register_int_gauge_vec!(
161            "sentinel_circuit_breaker_state",
162            "Circuit breaker state (0=closed, 1=open)",
163            &["component", "route"]
164        )
165        .context("Failed to register circuit_breaker_state metric")?;
166
167        let agent_latency = register_histogram_vec!(
168            "sentinel_agent_latency_seconds",
169            "Agent call latency in seconds",
170            &["agent", "event"],
171            latency_buckets.clone()
172        )
173        .context("Failed to register agent_latency metric")?;
174
175        let agent_timeouts = register_int_counter_vec!(
176            "sentinel_agent_timeouts_total",
177            "Total agent call timeouts",
178            &["agent", "event"]
179        )
180        .context("Failed to register agent_timeouts metric")?;
181
182        let blocked_requests = register_counter_vec!(
183            "sentinel_blocked_requests_total",
184            "Total blocked requests by reason",
185            &["reason"]
186        )
187        .context("Failed to register blocked_requests metric")?;
188
189        let request_body_size = register_histogram_vec!(
190            "sentinel_request_body_size_bytes",
191            "Request body size in bytes",
192            &["route"],
193            size_buckets.clone()
194        )
195        .context("Failed to register request_body_size metric")?;
196
197        let response_body_size = register_histogram_vec!(
198            "sentinel_response_body_size_bytes",
199            "Response body size in bytes",
200            &["route"],
201            size_buckets.clone()
202        )
203        .context("Failed to register response_body_size metric")?;
204
205        let tls_handshake_duration = register_histogram_vec!(
206            "sentinel_tls_handshake_duration_seconds",
207            "TLS handshake duration in seconds",
208            &["version"],
209            latency_buckets
210        )
211        .context("Failed to register tls_handshake_duration metric")?;
212
213        let connection_pool_size = register_int_gauge_vec!(
214            "sentinel_connection_pool_size",
215            "Total connections in pool",
216            &["upstream"]
217        )
218        .context("Failed to register connection_pool_size metric")?;
219
220        let connection_pool_idle = register_int_gauge_vec!(
221            "sentinel_connection_pool_idle",
222            "Idle connections in pool",
223            &["upstream"]
224        )
225        .context("Failed to register connection_pool_idle metric")?;
226
227        let connection_pool_acquired = register_int_counter_vec!(
228            "sentinel_connection_pool_acquired_total",
229            "Total connections acquired from pool",
230            &["upstream"]
231        )
232        .context("Failed to register connection_pool_acquired metric")?;
233
234        let memory_usage = register_int_gauge!(
235            "sentinel_memory_usage_bytes",
236            "Current memory usage in bytes"
237        )
238        .context("Failed to register memory_usage metric")?;
239
240        let cpu_usage =
241            register_gauge!("sentinel_cpu_usage_percent", "Current CPU usage percentage")
242                .context("Failed to register cpu_usage metric")?;
243
244        let open_connections =
245            register_int_gauge!("sentinel_open_connections", "Number of open connections")
246                .context("Failed to register open_connections metric")?;
247
248        // WebSocket metrics
249        let websocket_frames_total = register_int_counter_vec!(
250            "sentinel_websocket_frames_total",
251            "Total WebSocket frames processed",
252            &["route", "direction", "opcode", "decision"]
253        )
254        .context("Failed to register websocket_frames_total metric")?;
255
256        let websocket_connections_total = register_int_counter_vec!(
257            "sentinel_websocket_connections_total",
258            "Total WebSocket connections with inspection enabled",
259            &["route"]
260        )
261        .context("Failed to register websocket_connections_total metric")?;
262
263        // Use smaller latency buckets for frame inspection (typically fast)
264        let frame_latency_buckets = vec![
265            0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5,
266        ];
267
268        let websocket_inspection_duration = register_histogram_vec!(
269            "sentinel_websocket_inspection_duration_seconds",
270            "WebSocket frame inspection duration in seconds",
271            &["route"],
272            frame_latency_buckets
273        )
274        .context("Failed to register websocket_inspection_duration metric")?;
275
276        // Frame size buckets (bytes)
277        let frame_size_buckets = vec![
278            64.0, 256.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0,
279        ];
280
281        let websocket_frame_size = register_histogram_vec!(
282            "sentinel_websocket_frame_size_bytes",
283            "WebSocket frame payload size in bytes",
284            &["route", "direction", "opcode"],
285            frame_size_buckets
286        )
287        .context("Failed to register websocket_frame_size metric")?;
288
289        Ok(Self {
290            request_duration,
291            request_count,
292            active_requests,
293            upstream_attempts,
294            upstream_failures,
295            circuit_breaker_state,
296            agent_latency,
297            agent_timeouts,
298            blocked_requests,
299            request_body_size,
300            response_body_size,
301            tls_handshake_duration,
302            connection_pool_size,
303            connection_pool_idle,
304            connection_pool_acquired,
305            memory_usage,
306            cpu_usage,
307            open_connections,
308            websocket_frames_total,
309            websocket_connections_total,
310            websocket_inspection_duration,
311            websocket_frame_size,
312        })
313    }
314
315    /// Record a completed request
316    pub fn record_request(&self, route: &str, method: &str, status: u16, duration: Duration) {
317        self.request_duration
318            .with_label_values(&[route, method])
319            .observe(duration.as_secs_f64());
320
321        self.request_count
322            .with_label_values(&[route, method, &status.to_string()])
323            .inc();
324    }
325
326    /// Increment active request counter
327    pub fn inc_active_requests(&self) {
328        self.active_requests.inc();
329    }
330
331    /// Decrement active request counter
332    pub fn dec_active_requests(&self) {
333        self.active_requests.dec();
334    }
335
336    /// Record an upstream attempt
337    pub fn record_upstream_attempt(&self, upstream: &str, route: &str) {
338        self.upstream_attempts
339            .with_label_values(&[upstream, route])
340            .inc();
341    }
342
343    /// Record an upstream failure
344    pub fn record_upstream_failure(&self, upstream: &str, route: &str, reason: &str) {
345        self.upstream_failures
346            .with_label_values(&[upstream, route, reason])
347            .inc();
348    }
349
350    /// Update circuit breaker state
351    pub fn set_circuit_breaker_state(&self, component: &str, route: &str, is_open: bool) {
352        let state = if is_open { 1 } else { 0 };
353        self.circuit_breaker_state
354            .with_label_values(&[component, route])
355            .set(state);
356    }
357
358    /// Record agent call latency
359    pub fn record_agent_latency(&self, agent: &str, event: &str, duration: Duration) {
360        self.agent_latency
361            .with_label_values(&[agent, event])
362            .observe(duration.as_secs_f64());
363    }
364
365    /// Record agent timeout
366    pub fn record_agent_timeout(&self, agent: &str, event: &str) {
367        self.agent_timeouts.with_label_values(&[agent, event]).inc();
368    }
369
370    /// Record a blocked request
371    pub fn record_blocked_request(&self, reason: &str) {
372        self.blocked_requests.with_label_values(&[reason]).inc();
373    }
374
375    /// Record request body size
376    pub fn record_request_body_size(&self, route: &str, size_bytes: usize) {
377        self.request_body_size
378            .with_label_values(&[route])
379            .observe(size_bytes as f64);
380    }
381
382    /// Record response body size
383    pub fn record_response_body_size(&self, route: &str, size_bytes: usize) {
384        self.response_body_size
385            .with_label_values(&[route])
386            .observe(size_bytes as f64);
387    }
388
389    /// Record TLS handshake duration
390    pub fn record_tls_handshake(&self, version: &str, duration: Duration) {
391        self.tls_handshake_duration
392            .with_label_values(&[version])
393            .observe(duration.as_secs_f64());
394    }
395
396    /// Update connection pool metrics
397    pub fn update_connection_pool(&self, upstream: &str, size: i64, idle: i64) {
398        self.connection_pool_size
399            .with_label_values(&[upstream])
400            .set(size);
401        self.connection_pool_idle
402            .with_label_values(&[upstream])
403            .set(idle);
404    }
405
406    /// Record connection acquisition from pool
407    pub fn record_connection_acquired(&self, upstream: &str) {
408        self.connection_pool_acquired
409            .with_label_values(&[upstream])
410            .inc();
411    }
412
413    /// Update system metrics
414    pub fn update_system_metrics(&self) {
415        use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System};
416
417        // Create system with specific refresh kinds
418        let mut system = System::new_with_specifics(
419            RefreshKind::new()
420                .with_cpu(CpuRefreshKind::everything())
421                .with_memory(MemoryRefreshKind::everything()),
422        );
423
424        // Get memory usage
425        self.memory_usage.set(system.total_memory() as i64);
426
427        // Get CPU usage
428        system.refresh_cpu_usage();
429        self.cpu_usage.set(system.global_cpu_usage() as f64);
430    }
431
432    /// Set open connections count
433    pub fn set_open_connections(&self, count: i64) {
434        self.open_connections.set(count);
435    }
436
437    // === WebSocket Metrics ===
438
439    /// Record a WebSocket frame being processed
440    ///
441    /// # Arguments
442    /// * `route` - The route ID
443    /// * `direction` - Frame direction: "c2s" (client to server) or "s2c" (server to client)
444    /// * `opcode` - Frame opcode: "text", "binary", "ping", "pong", "close", "continuation"
445    /// * `decision` - Inspection decision: "allow", "drop", or "close"
446    pub fn record_websocket_frame(
447        &self,
448        route: &str,
449        direction: &str,
450        opcode: &str,
451        decision: &str,
452    ) {
453        self.websocket_frames_total
454            .with_label_values(&[route, direction, opcode, decision])
455            .inc();
456    }
457
458    /// Record a WebSocket connection with inspection enabled
459    pub fn record_websocket_connection(&self, route: &str) {
460        self.websocket_connections_total
461            .with_label_values(&[route])
462            .inc();
463    }
464
465    /// Record WebSocket frame inspection duration
466    pub fn record_websocket_inspection_duration(&self, route: &str, duration: Duration) {
467        self.websocket_inspection_duration
468            .with_label_values(&[route])
469            .observe(duration.as_secs_f64());
470    }
471
472    /// Record WebSocket frame size
473    ///
474    /// # Arguments
475    /// * `route` - The route ID
476    /// * `direction` - Frame direction: "c2s" or "s2c"
477    /// * `opcode` - Frame opcode
478    /// * `size_bytes` - Frame payload size in bytes
479    pub fn record_websocket_frame_size(
480        &self,
481        route: &str,
482        direction: &str,
483        opcode: &str,
484        size_bytes: usize,
485    ) {
486        self.websocket_frame_size
487            .with_label_values(&[route, direction, opcode])
488            .observe(size_bytes as f64);
489    }
490}
491
492/// Structured log entry for audit logging
493#[derive(Debug, serde::Serialize)]
494pub struct AuditLogEntry {
495    pub timestamp: String,
496    pub correlation_id: String,
497    pub event_type: String,
498    pub route: Option<String>,
499    pub client_addr: Option<String>,
500    pub user_agent: Option<String>,
501    pub method: String,
502    pub path: String,
503    pub status: Option<u16>,
504    pub duration_ms: u64,
505    pub upstream: Option<String>,
506    pub waf_decision: Option<WafDecision>,
507    pub agent_decisions: Vec<AgentDecision>,
508    pub error: Option<String>,
509    pub tags: Vec<String>,
510}
511
512/// WAF decision details for audit logging
513#[derive(Debug, serde::Serialize)]
514pub struct WafDecision {
515    pub action: String,
516    pub rule_ids: Vec<String>,
517    pub confidence: f32,
518    pub reason: String,
519    pub matched_data: Option<String>,
520}
521
522/// Agent decision details for audit logging
523#[derive(Debug, serde::Serialize)]
524pub struct AgentDecision {
525    pub agent_name: String,
526    pub event: String,
527    pub action: String,
528    pub latency_ms: u64,
529    pub metadata: serde_json::Value,
530}
531
532impl AuditLogEntry {
533    /// Create a new audit log entry
534    pub fn new(correlation_id: String, method: String, path: String) -> Self {
535        Self {
536            timestamp: chrono::Utc::now().to_rfc3339(),
537            correlation_id,
538            event_type: "request".to_string(),
539            route: None,
540            client_addr: None,
541            user_agent: None,
542            method,
543            path,
544            status: None,
545            duration_ms: 0,
546            upstream: None,
547            waf_decision: None,
548            agent_decisions: vec![],
549            error: None,
550            tags: vec![],
551        }
552    }
553
554    /// Write the audit log entry
555    pub fn write(&self) {
556        match serde_json::to_string(self) {
557            Ok(json) => println!("AUDIT: {}", json),
558            Err(e) => error!("Failed to serialize audit log: {}", e),
559        }
560    }
561}
562
563/// Health check status for components
564#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
565pub enum HealthStatus {
566    Healthy,
567    Degraded,
568    Unhealthy,
569}
570
571/// Component health information
572#[derive(Debug, Clone, serde::Serialize)]
573pub struct ComponentHealth {
574    pub name: String,
575    pub status: HealthStatus,
576    pub last_check: chrono::DateTime<chrono::Utc>,
577    pub consecutive_failures: u32,
578    pub error_message: Option<String>,
579}
580
581/// Global health status aggregator
582///
583/// Tracks the health of all system components (upstreams, agents, etc.)
584/// and provides aggregate status for health endpoints.
585pub struct ComponentHealthTracker {
586    components: parking_lot::RwLock<Vec<ComponentHealth>>,
587}
588
589impl Default for ComponentHealthTracker {
590    fn default() -> Self {
591        Self::new()
592    }
593}
594
595impl ComponentHealthTracker {
596    /// Create new health checker
597    pub fn new() -> Self {
598        Self {
599            components: parking_lot::RwLock::new(vec![]),
600        }
601    }
602
603    /// Update component health
604    pub fn update_component(&self, name: String, status: HealthStatus, error: Option<String>) {
605        let mut components = self.components.write();
606
607        if let Some(component) = components.iter_mut().find(|c| c.name == name) {
608            component.status = status;
609            component.last_check = chrono::Utc::now();
610            component.error_message = error;
611
612            if status != HealthStatus::Healthy {
613                component.consecutive_failures += 1;
614            } else {
615                component.consecutive_failures = 0;
616            }
617        } else {
618            components.push(ComponentHealth {
619                name,
620                status,
621                last_check: chrono::Utc::now(),
622                consecutive_failures: if status != HealthStatus::Healthy {
623                    1
624                } else {
625                    0
626                },
627                error_message: error,
628            });
629        }
630    }
631
632    /// Get overall health status
633    pub fn get_status(&self) -> HealthStatus {
634        let components = self.components.read();
635
636        if components.is_empty() {
637            return HealthStatus::Healthy;
638        }
639
640        let unhealthy_count = components
641            .iter()
642            .filter(|c| c.status == HealthStatus::Unhealthy)
643            .count();
644        let degraded_count = components
645            .iter()
646            .filter(|c| c.status == HealthStatus::Degraded)
647            .count();
648
649        if unhealthy_count > 0 {
650            HealthStatus::Unhealthy
651        } else if degraded_count > 0 {
652            HealthStatus::Degraded
653        } else {
654            HealthStatus::Healthy
655        }
656    }
657
658    /// Get detailed health report
659    pub fn get_report(&self) -> Vec<ComponentHealth> {
660        self.components.read().clone()
661    }
662}
663
664#[cfg(test)]
665mod tests {
666    use super::*;
667
668    #[test]
669    fn test_metrics_creation() {
670        let metrics = RequestMetrics::new().expect("Failed to create metrics");
671
672        // Record a request
673        metrics.record_request("test_route", "GET", 200, Duration::from_millis(100));
674
675        // Verify active requests tracking
676        metrics.inc_active_requests();
677        metrics.dec_active_requests();
678
679        // Record upstream attempt
680        metrics.record_upstream_attempt("backend1", "test_route");
681    }
682
683    #[test]
684    fn test_audit_log() {
685        let mut entry = AuditLogEntry::new(
686            "test-correlation-id".to_string(),
687            "GET".to_string(),
688            "/api/test".to_string(),
689        );
690
691        entry.status = Some(200);
692        entry.duration_ms = 150;
693        entry.tags.push("test".to_string());
694
695        // This would write to stdout in production
696        // For testing, we just verify it serializes correctly
697        let json = serde_json::to_string(&entry).expect("Failed to serialize audit log");
698        assert!(json.contains("test-correlation-id"));
699    }
700
701    #[test]
702    fn test_health_checker() {
703        let checker = ComponentHealthTracker::new();
704
705        // Initially healthy
706        assert_eq!(checker.get_status(), HealthStatus::Healthy);
707
708        // Add healthy component
709        checker.update_component("upstream1".to_string(), HealthStatus::Healthy, None);
710        assert_eq!(checker.get_status(), HealthStatus::Healthy);
711
712        // Add degraded component
713        checker.update_component(
714            "agent1".to_string(),
715            HealthStatus::Degraded,
716            Some("Slow response".to_string()),
717        );
718        assert_eq!(checker.get_status(), HealthStatus::Degraded);
719
720        // Add unhealthy component
721        checker.update_component(
722            "upstream2".to_string(),
723            HealthStatus::Unhealthy,
724            Some("Connection refused".to_string()),
725        );
726        assert_eq!(checker.get_status(), HealthStatus::Unhealthy);
727    }
728}