sentinel_common/
observability.rs

1//! Observability module for Sentinel proxy
2//!
3//! Provides metrics, logging, and tracing infrastructure with a focus on
4//! production reliability and sleepable operations.
5
6use anyhow::{Context, Result};
7use prometheus::{
8    register_counter_vec, register_gauge, register_histogram_vec,
9    register_int_counter_vec, register_int_gauge, register_int_gauge_vec, CounterVec, Gauge, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec,
10};
11use std::time::Duration;
12use tracing::{error, info};
13use tracing_subscriber::{fmt, prelude::*, EnvFilter};
14
15/// Initialize the tracing/logging subsystem
16pub fn init_tracing() -> Result<()> {
17    // Use JSON format for structured logging in production
18    let json_layer =
19        if std::env::var("SENTINEL_LOG_FORMAT").unwrap_or_else(|_| "json".to_string()) == "json" {
20            Some(
21                fmt::layer()
22                    .json()
23                    .with_target(true)
24                    .with_thread_ids(true)
25                    .with_thread_names(true)
26                    .with_file(true)
27                    .with_line_number(true),
28            )
29        } else {
30            None
31        };
32
33    // Pretty format for development
34    let pretty_layer = if std::env::var("SENTINEL_LOG_FORMAT")
35        .unwrap_or_else(|_| "json".to_string())
36        == "pretty"
37    {
38        Some(
39            fmt::layer()
40                .pretty()
41                .with_target(true)
42                .with_thread_ids(true)
43                .with_thread_names(true)
44                .with_file(true)
45                .with_line_number(true),
46        )
47    } else {
48        None
49    };
50
51    // Configure log level from environment
52    let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
53
54    tracing_subscriber::registry()
55        .with(env_filter)
56        .with(json_layer)
57        .with(pretty_layer)
58        .init();
59
60    info!("Tracing initialized");
61    Ok(())
62}
63
64/// Request metrics collector
65pub struct RequestMetrics {
66    /// Request latency histogram by route
67    request_duration: HistogramVec,
68    /// Request count by route and status code
69    request_count: IntCounterVec,
70    /// Active requests gauge
71    active_requests: IntGauge,
72    /// Upstream connection attempts
73    upstream_attempts: IntCounterVec,
74    /// Upstream failures
75    upstream_failures: IntCounterVec,
76    /// Circuit breaker state (0 = closed, 1 = open)
77    circuit_breaker_state: IntGaugeVec,
78    /// Agent call latency
79    agent_latency: HistogramVec,
80    /// Agent call timeouts
81    agent_timeouts: IntCounterVec,
82    /// Blocked requests by reason
83    blocked_requests: CounterVec,
84    /// Request body size histogram
85    request_body_size: HistogramVec,
86    /// Response body size histogram
87    response_body_size: HistogramVec,
88    /// TLS handshake duration
89    tls_handshake_duration: HistogramVec,
90    /// Connection pool metrics
91    connection_pool_size: IntGaugeVec,
92    connection_pool_idle: IntGaugeVec,
93    connection_pool_acquired: IntCounterVec,
94    /// System metrics
95    memory_usage: IntGauge,
96    cpu_usage: Gauge,
97    open_connections: IntGauge,
98}
99
100impl RequestMetrics {
101    /// Create new metrics collector and register with Prometheus
102    pub fn new() -> Result<Self> {
103        // Define buckets for latency histograms (in seconds)
104        let latency_buckets = vec![
105            0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
106        ];
107
108        // Define buckets for body size (in bytes)
109        let size_buckets = vec![
110            100.0,
111            1_000.0,
112            10_000.0,
113            100_000.0,
114            1_000_000.0,
115            10_000_000.0,
116            100_000_000.0,
117        ];
118
119        let request_duration = register_histogram_vec!(
120            "sentinel_request_duration_seconds",
121            "Request duration in seconds",
122            &["route", "method"],
123            latency_buckets.clone()
124        )
125        .context("Failed to register request_duration metric")?;
126
127        let request_count = register_int_counter_vec!(
128            "sentinel_requests_total",
129            "Total number of requests",
130            &["route", "method", "status"]
131        )
132        .context("Failed to register request_count metric")?;
133
134        let active_requests = register_int_gauge!(
135            "sentinel_active_requests",
136            "Number of currently active requests"
137        )
138        .context("Failed to register active_requests metric")?;
139
140        let upstream_attempts = register_int_counter_vec!(
141            "sentinel_upstream_attempts_total",
142            "Total upstream connection attempts",
143            &["upstream", "route"]
144        )
145        .context("Failed to register upstream_attempts metric")?;
146
147        let upstream_failures = register_int_counter_vec!(
148            "sentinel_upstream_failures_total",
149            "Total upstream connection failures",
150            &["upstream", "route", "reason"]
151        )
152        .context("Failed to register upstream_failures metric")?;
153
154        let circuit_breaker_state = register_int_gauge_vec!(
155            "sentinel_circuit_breaker_state",
156            "Circuit breaker state (0=closed, 1=open)",
157            &["component", "route"]
158        )
159        .context("Failed to register circuit_breaker_state metric")?;
160
161        let agent_latency = register_histogram_vec!(
162            "sentinel_agent_latency_seconds",
163            "Agent call latency in seconds",
164            &["agent", "event"],
165            latency_buckets.clone()
166        )
167        .context("Failed to register agent_latency metric")?;
168
169        let agent_timeouts = register_int_counter_vec!(
170            "sentinel_agent_timeouts_total",
171            "Total agent call timeouts",
172            &["agent", "event"]
173        )
174        .context("Failed to register agent_timeouts metric")?;
175
176        let blocked_requests = register_counter_vec!(
177            "sentinel_blocked_requests_total",
178            "Total blocked requests by reason",
179            &["reason"]
180        )
181        .context("Failed to register blocked_requests metric")?;
182
183        let request_body_size = register_histogram_vec!(
184            "sentinel_request_body_size_bytes",
185            "Request body size in bytes",
186            &["route"],
187            size_buckets.clone()
188        )
189        .context("Failed to register request_body_size metric")?;
190
191        let response_body_size = register_histogram_vec!(
192            "sentinel_response_body_size_bytes",
193            "Response body size in bytes",
194            &["route"],
195            size_buckets.clone()
196        )
197        .context("Failed to register response_body_size metric")?;
198
199        let tls_handshake_duration = register_histogram_vec!(
200            "sentinel_tls_handshake_duration_seconds",
201            "TLS handshake duration in seconds",
202            &["version"],
203            latency_buckets
204        )
205        .context("Failed to register tls_handshake_duration metric")?;
206
207        let connection_pool_size = register_int_gauge_vec!(
208            "sentinel_connection_pool_size",
209            "Total connections in pool",
210            &["upstream"]
211        )
212        .context("Failed to register connection_pool_size metric")?;
213
214        let connection_pool_idle = register_int_gauge_vec!(
215            "sentinel_connection_pool_idle",
216            "Idle connections in pool",
217            &["upstream"]
218        )
219        .context("Failed to register connection_pool_idle metric")?;
220
221        let connection_pool_acquired = register_int_counter_vec!(
222            "sentinel_connection_pool_acquired_total",
223            "Total connections acquired from pool",
224            &["upstream"]
225        )
226        .context("Failed to register connection_pool_acquired metric")?;
227
228        let memory_usage = register_int_gauge!(
229            "sentinel_memory_usage_bytes",
230            "Current memory usage in bytes"
231        )
232        .context("Failed to register memory_usage metric")?;
233
234        let cpu_usage =
235            register_gauge!("sentinel_cpu_usage_percent", "Current CPU usage percentage")
236                .context("Failed to register cpu_usage metric")?;
237
238        let open_connections =
239            register_int_gauge!("sentinel_open_connections", "Number of open connections")
240                .context("Failed to register open_connections metric")?;
241
242        Ok(Self {
243            request_duration,
244            request_count,
245            active_requests,
246            upstream_attempts,
247            upstream_failures,
248            circuit_breaker_state,
249            agent_latency,
250            agent_timeouts,
251            blocked_requests,
252            request_body_size,
253            response_body_size,
254            tls_handshake_duration,
255            connection_pool_size,
256            connection_pool_idle,
257            connection_pool_acquired,
258            memory_usage,
259            cpu_usage,
260            open_connections,
261        })
262    }
263
264    /// Record a completed request
265    pub fn record_request(&self, route: &str, method: &str, status: u16, duration: Duration) {
266        self.request_duration
267            .with_label_values(&[route, method])
268            .observe(duration.as_secs_f64());
269
270        self.request_count
271            .with_label_values(&[route, method, &status.to_string()])
272            .inc();
273    }
274
275    /// Increment active request counter
276    pub fn inc_active_requests(&self) {
277        self.active_requests.inc();
278    }
279
280    /// Decrement active request counter
281    pub fn dec_active_requests(&self) {
282        self.active_requests.dec();
283    }
284
285    /// Record an upstream attempt
286    pub fn record_upstream_attempt(&self, upstream: &str, route: &str) {
287        self.upstream_attempts
288            .with_label_values(&[upstream, route])
289            .inc();
290    }
291
292    /// Record an upstream failure
293    pub fn record_upstream_failure(&self, upstream: &str, route: &str, reason: &str) {
294        self.upstream_failures
295            .with_label_values(&[upstream, route, reason])
296            .inc();
297    }
298
299    /// Update circuit breaker state
300    pub fn set_circuit_breaker_state(&self, component: &str, route: &str, is_open: bool) {
301        let state = if is_open { 1 } else { 0 };
302        self.circuit_breaker_state
303            .with_label_values(&[component, route])
304            .set(state);
305    }
306
307    /// Record agent call latency
308    pub fn record_agent_latency(&self, agent: &str, event: &str, duration: Duration) {
309        self.agent_latency
310            .with_label_values(&[agent, event])
311            .observe(duration.as_secs_f64());
312    }
313
314    /// Record agent timeout
315    pub fn record_agent_timeout(&self, agent: &str, event: &str) {
316        self.agent_timeouts.with_label_values(&[agent, event]).inc();
317    }
318
319    /// Record a blocked request
320    pub fn record_blocked_request(&self, reason: &str) {
321        self.blocked_requests.with_label_values(&[reason]).inc();
322    }
323
324    /// Record request body size
325    pub fn record_request_body_size(&self, route: &str, size_bytes: usize) {
326        self.request_body_size
327            .with_label_values(&[route])
328            .observe(size_bytes as f64);
329    }
330
331    /// Record response body size
332    pub fn record_response_body_size(&self, route: &str, size_bytes: usize) {
333        self.response_body_size
334            .with_label_values(&[route])
335            .observe(size_bytes as f64);
336    }
337
338    /// Record TLS handshake duration
339    pub fn record_tls_handshake(&self, version: &str, duration: Duration) {
340        self.tls_handshake_duration
341            .with_label_values(&[version])
342            .observe(duration.as_secs_f64());
343    }
344
345    /// Update connection pool metrics
346    pub fn update_connection_pool(&self, upstream: &str, size: i64, idle: i64) {
347        self.connection_pool_size
348            .with_label_values(&[upstream])
349            .set(size);
350        self.connection_pool_idle
351            .with_label_values(&[upstream])
352            .set(idle);
353    }
354
355    /// Record connection acquisition from pool
356    pub fn record_connection_acquired(&self, upstream: &str) {
357        self.connection_pool_acquired
358            .with_label_values(&[upstream])
359            .inc();
360    }
361
362    /// Update system metrics
363    pub fn update_system_metrics(&self) {
364        use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System};
365
366        // Create system with specific refresh kinds
367        let mut system = System::new_with_specifics(
368            RefreshKind::new()
369                .with_cpu(CpuRefreshKind::everything())
370                .with_memory(MemoryRefreshKind::everything()),
371        );
372
373        // Get memory usage
374        self.memory_usage.set(system.total_memory() as i64);
375
376        // Get CPU usage
377        system.refresh_cpu_usage();
378        self.cpu_usage.set(system.global_cpu_usage() as f64);
379    }
380
381    /// Set open connections count
382    pub fn set_open_connections(&self, count: i64) {
383        self.open_connections.set(count);
384    }
385}
386
387/// Structured log entry for audit logging
388#[derive(Debug, serde::Serialize)]
389pub struct AuditLogEntry {
390    pub timestamp: String,
391    pub correlation_id: String,
392    pub event_type: String,
393    pub route: Option<String>,
394    pub client_addr: Option<String>,
395    pub user_agent: Option<String>,
396    pub method: String,
397    pub path: String,
398    pub status: Option<u16>,
399    pub duration_ms: u64,
400    pub upstream: Option<String>,
401    pub waf_decision: Option<WafDecision>,
402    pub agent_decisions: Vec<AgentDecision>,
403    pub error: Option<String>,
404    pub tags: Vec<String>,
405}
406
407/// WAF decision details for audit logging
408#[derive(Debug, serde::Serialize)]
409pub struct WafDecision {
410    pub action: String,
411    pub rule_ids: Vec<String>,
412    pub confidence: f32,
413    pub reason: String,
414    pub matched_data: Option<String>,
415}
416
417/// Agent decision details for audit logging
418#[derive(Debug, serde::Serialize)]
419pub struct AgentDecision {
420    pub agent_name: String,
421    pub event: String,
422    pub action: String,
423    pub latency_ms: u64,
424    pub metadata: serde_json::Value,
425}
426
427impl AuditLogEntry {
428    /// Create a new audit log entry
429    pub fn new(correlation_id: String, method: String, path: String) -> Self {
430        Self {
431            timestamp: chrono::Utc::now().to_rfc3339(),
432            correlation_id,
433            event_type: "request".to_string(),
434            route: None,
435            client_addr: None,
436            user_agent: None,
437            method,
438            path,
439            status: None,
440            duration_ms: 0,
441            upstream: None,
442            waf_decision: None,
443            agent_decisions: vec![],
444            error: None,
445            tags: vec![],
446        }
447    }
448
449    /// Write the audit log entry
450    pub fn write(&self) {
451        match serde_json::to_string(self) {
452            Ok(json) => println!("AUDIT: {}", json),
453            Err(e) => error!("Failed to serialize audit log: {}", e),
454        }
455    }
456}
457
458/// Health check status for components
459#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
460pub enum HealthStatus {
461    Healthy,
462    Degraded,
463    Unhealthy,
464}
465
466/// Component health information
467#[derive(Debug, Clone, serde::Serialize)]
468pub struct ComponentHealth {
469    pub name: String,
470    pub status: HealthStatus,
471    pub last_check: chrono::DateTime<chrono::Utc>,
472    pub consecutive_failures: u32,
473    pub error_message: Option<String>,
474}
475
476/// Global health status aggregator
477///
478/// Tracks the health of all system components (upstreams, agents, etc.)
479/// and provides aggregate status for health endpoints.
480pub struct ComponentHealthTracker {
481    components: parking_lot::RwLock<Vec<ComponentHealth>>,
482}
483
484impl ComponentHealthTracker {
485    /// Create new health checker
486    pub fn new() -> Self {
487        Self {
488            components: parking_lot::RwLock::new(vec![]),
489        }
490    }
491
492    /// Update component health
493    pub fn update_component(&self, name: String, status: HealthStatus, error: Option<String>) {
494        let mut components = self.components.write();
495
496        if let Some(component) = components.iter_mut().find(|c| c.name == name) {
497            component.status = status;
498            component.last_check = chrono::Utc::now();
499            component.error_message = error;
500
501            if status != HealthStatus::Healthy {
502                component.consecutive_failures += 1;
503            } else {
504                component.consecutive_failures = 0;
505            }
506        } else {
507            components.push(ComponentHealth {
508                name,
509                status,
510                last_check: chrono::Utc::now(),
511                consecutive_failures: if status != HealthStatus::Healthy {
512                    1
513                } else {
514                    0
515                },
516                error_message: error,
517            });
518        }
519    }
520
521    /// Get overall health status
522    pub fn get_status(&self) -> HealthStatus {
523        let components = self.components.read();
524
525        if components.is_empty() {
526            return HealthStatus::Healthy;
527        }
528
529        let unhealthy_count = components
530            .iter()
531            .filter(|c| c.status == HealthStatus::Unhealthy)
532            .count();
533        let degraded_count = components
534            .iter()
535            .filter(|c| c.status == HealthStatus::Degraded)
536            .count();
537
538        if unhealthy_count > 0 {
539            HealthStatus::Unhealthy
540        } else if degraded_count > 0 {
541            HealthStatus::Degraded
542        } else {
543            HealthStatus::Healthy
544        }
545    }
546
547    /// Get detailed health report
548    pub fn get_report(&self) -> Vec<ComponentHealth> {
549        self.components.read().clone()
550    }
551}
552
553#[cfg(test)]
554mod tests {
555    use super::*;
556
557    #[test]
558    fn test_metrics_creation() {
559        let metrics = RequestMetrics::new().expect("Failed to create metrics");
560
561        // Record a request
562        metrics.record_request("test_route", "GET", 200, Duration::from_millis(100));
563
564        // Verify active requests tracking
565        metrics.inc_active_requests();
566        metrics.dec_active_requests();
567
568        // Record upstream attempt
569        metrics.record_upstream_attempt("backend1", "test_route");
570    }
571
572    #[test]
573    fn test_audit_log() {
574        let mut entry = AuditLogEntry::new(
575            "test-correlation-id".to_string(),
576            "GET".to_string(),
577            "/api/test".to_string(),
578        );
579
580        entry.status = Some(200);
581        entry.duration_ms = 150;
582        entry.tags.push("test".to_string());
583
584        // This would write to stdout in production
585        // For testing, we just verify it serializes correctly
586        let json = serde_json::to_string(&entry).expect("Failed to serialize audit log");
587        assert!(json.contains("test-correlation-id"));
588    }
589
590    #[test]
591    fn test_health_checker() {
592        let checker = ComponentHealthTracker::new();
593
594        // Initially healthy
595        assert_eq!(checker.get_status(), HealthStatus::Healthy);
596
597        // Add healthy component
598        checker.update_component("upstream1".to_string(), HealthStatus::Healthy, None);
599        assert_eq!(checker.get_status(), HealthStatus::Healthy);
600
601        // Add degraded component
602        checker.update_component(
603            "agent1".to_string(),
604            HealthStatus::Degraded,
605            Some("Slow response".to_string()),
606        );
607        assert_eq!(checker.get_status(), HealthStatus::Degraded);
608
609        // Add unhealthy component
610        checker.update_component(
611            "upstream2".to_string(),
612            HealthStatus::Unhealthy,
613            Some("Connection refused".to_string()),
614        );
615        assert_eq!(checker.get_status(), HealthStatus::Unhealthy);
616    }
617}