eventuali_core/observability/
mod.rs

1//! Observability module for comprehensive monitoring, tracing, and metrics collection.
2//!
3//! This module provides:
4//! - OpenTelemetry integration with distributed tracing
5//! - Prometheus metrics export
6//! - Structured logging with correlation IDs
7//! - Performance monitoring with minimal overhead (<2%)
8//! - Health monitoring and check endpoints
9//! - Performance profiling with flame graphs and regression detection
10
11pub mod telemetry;
12pub mod metrics;
13pub mod logging;
14pub mod correlation;
15pub mod health;
16pub mod profiling;
17
18pub use telemetry::{
19    ObservabilityConfig, TelemetryProvider, TracingService, 
20    EventTrace, TraceContext, SpanBuilder
21};
22pub use metrics::{
23    MetricsCollector, PrometheusExporter, EventMetrics, 
24    PerformanceMetrics, OperationTimer, MetricLabels
25};
26pub use logging::{
27    StructuredLogger, LogLevel, LogContext, CorrelationLogger,
28    ObservabilityLogger, LogEntry, LogAggregator
29};
30pub use correlation::{
31    CorrelationId, CorrelationContext, CorrelationTracker,
32    RequestContext, TraceCorrelation, generate_correlation_id
33};
34pub use health::{
35    HealthStatus, HealthCheckResult, SystemMetrics, SystemHealthThresholds,
36    HealthReport, ServiceInfo, HealthConfig, HealthChecker, 
37    DatabaseHealthChecker, EventStoreHealthChecker, StreamingHealthChecker,
38    SecurityHealthChecker, TenancyHealthChecker, HealthMonitorService
39};
40pub use profiling::{
41    PerformanceProfiler, PerformanceProfilerBuilder, ProfilingConfig,
42    ProfileType, ProfileEntry, MemoryInfo, IoInfo, CallGraphNode,
43    RegressionDetection, PerformanceSnapshot, RegressionSeverity,
44    FlameGraph, FlameGraphNode, BottleneckAnalysis, Bottleneck,
45    BottleneckType, OptimizationSuggestion
46};
47
48use crate::error::Result;
49use std::sync::Arc;
50// use tokio::sync::RwLock; // Available for future async state management
51
52/// Main observability service that coordinates all monitoring aspects
53#[derive(Debug, Clone)]
54pub struct ObservabilityService {
55    telemetry: Arc<TelemetryProvider>,
56    metrics: Arc<MetricsCollector>,
57    logger: Arc<StructuredLogger>,
58    correlation: Arc<CorrelationTracker>,
59    profiler: Arc<PerformanceProfiler>,
60    #[allow(dead_code)] // Configuration stored but not accessed after initialization
61    config: ObservabilityConfig,
62}
63
64impl ObservabilityService {
65    /// Create a new observability service with the given configuration
66    pub async fn new(config: ObservabilityConfig) -> Result<Self> {
67        let telemetry = Arc::new(TelemetryProvider::new(&config).await?);
68        let metrics = Arc::new(MetricsCollector::new(&config)?);
69        let logger = Arc::new(StructuredLogger::new(&config)?);
70        let correlation = Arc::new(CorrelationTracker::new());
71        
72        // Create profiler with default configuration
73        let profiling_config = ProfilingConfig::default();
74        let profiler = Arc::new(PerformanceProfiler::new(profiling_config));
75
76        Ok(Self {
77            telemetry,
78            metrics,
79            logger,
80            correlation,
81            profiler,
82            config,
83        })
84    }
85
86    /// Initialize observability for the entire system
87    pub async fn initialize(&self) -> Result<()> {
88        self.telemetry.initialize().await?;
89        self.metrics.initialize().await?;
90        self.logger.initialize().await?;
91        
92        tracing::info!("Observability service initialized successfully");
93        Ok(())
94    }
95
96    /// Create a new trace context for an operation
97    pub fn create_trace_context(&self, operation: &str) -> TraceContext {
98        let correlation_id = generate_correlation_id();
99        self.correlation.register(correlation_id.clone());
100        
101        TraceContext::new(operation.to_string(), correlation_id)
102    }
103
104    /// Start timing an operation
105    pub fn start_timer(&self, operation: &str, labels: MetricLabels) -> OperationTimer {
106        self.metrics.start_timer(operation, labels)
107    }
108
109    /// Log an event with full observability context
110    pub fn log_event(&self, level: LogLevel, message: &str, context: &TraceContext) {
111        self.logger.log_with_context(level, message, context);
112    }
113
114    /// Record metrics for an operation
115    pub fn record_metric(&self, name: &str, value: f64, labels: MetricLabels) {
116        self.metrics.record_metric(name, value, labels);
117    }
118
119    /// Get current performance metrics
120    pub async fn get_performance_metrics(&self) -> PerformanceMetrics {
121        self.metrics.get_performance_metrics().await
122    }
123
124    /// Start a profiling session
125    pub async fn start_profiling(
126        &self,
127        profile_type: ProfileType,
128        metadata: std::collections::HashMap<String, String>,
129    ) -> Result<String> {
130        let correlation_id = generate_correlation_id();
131        self.profiler.start_profiling(profile_type, Some(correlation_id), metadata).await
132    }
133
134    /// End a profiling session
135    pub async fn end_profiling(&self, session_id: &str) -> Result<ProfileEntry> {
136        self.profiler.end_profiling(session_id).await
137    }
138
139    /// Generate a flame graph
140    pub async fn generate_flame_graph(
141        &self,
142        profile_type: ProfileType,
143        time_range: Option<(std::time::SystemTime, std::time::SystemTime)>,
144    ) -> Result<FlameGraph> {
145        self.profiler.generate_flame_graph(profile_type, time_range).await
146    }
147
148    /// Detect performance regressions
149    pub async fn detect_regressions(&self, operation: &str) -> Result<Option<RegressionDetection>> {
150        self.profiler.detect_regressions(operation).await
151    }
152
153    /// Identify bottlenecks
154    pub async fn identify_bottlenecks(&self, profile_type: ProfileType) -> Result<BottleneckAnalysis> {
155        self.profiler.identify_bottlenecks(profile_type).await
156    }
157
158    /// Set baseline metrics for regression detection
159    pub async fn set_baseline(&self, operation: &str) -> Result<()> {
160        self.profiler.set_baseline(operation).await
161    }
162
163    /// Get profiling statistics
164    pub async fn get_profiling_statistics(&self) -> Result<std::collections::HashMap<String, serde_json::Value>> {
165        self.profiler.get_statistics().await
166    }
167
168    /// Shutdown the observability service
169    pub async fn shutdown(&self) -> Result<()> {
170        self.telemetry.shutdown().await?;
171        self.metrics.shutdown().await?;
172        self.logger.shutdown().await?;
173        
174        tracing::info!("Observability service shut down successfully");
175        Ok(())
176    }
177}
178
179/// Builder for creating observability service instances
180pub struct ObservabilityServiceBuilder {
181    config: ObservabilityConfig,
182    profiling_config: Option<ProfilingConfig>,
183}
184
185impl ObservabilityServiceBuilder {
186    pub fn new() -> Self {
187        Self {
188            config: ObservabilityConfig::default(),
189            profiling_config: None,
190        }
191    }
192
193    pub fn with_config(mut self, config: ObservabilityConfig) -> Self {
194        self.config = config;
195        self
196    }
197
198    pub fn with_tracing_enabled(mut self, enabled: bool) -> Self {
199        self.config.tracing_enabled = enabled;
200        self
201    }
202
203    pub fn with_metrics_enabled(mut self, enabled: bool) -> Self {
204        self.config.metrics_enabled = enabled;
205        self
206    }
207
208    pub fn with_structured_logging(mut self, enabled: bool) -> Self {
209        self.config.structured_logging = enabled;
210        self
211    }
212
213    pub fn with_profiling_config(mut self, config: ProfilingConfig) -> Self {
214        self.profiling_config = Some(config);
215        self
216    }
217
218    pub async fn build(self) -> Result<ObservabilityService> {
219        let mut service = ObservabilityService::new(self.config).await?;
220        
221        // Replace profiler if custom config was provided
222        if let Some(profiling_config) = self.profiling_config {
223            service.profiler = Arc::new(PerformanceProfiler::new(profiling_config));
224        }
225        
226        Ok(service)
227    }
228}
229
230impl Default for ObservabilityServiceBuilder {
231    fn default() -> Self {
232        Self::new()
233    }
234}