eventuali_core/observability/
metrics.rs

1//! Metrics collection and Prometheus export module
2//!
3//! Provides comprehensive metrics collection with minimal performance overhead.
4
5use crate::error::{EventualiError, Result};
6use crate::observability::ObservabilityConfig;
7use metrics::Label;
8use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
9// Prometheus encoders - not currently used but available for direct export
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13use std::time::{Duration, Instant};
14use tokio::sync::RwLock;
15
16/// Labels for categorizing metrics
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct MetricLabels {
19    pub labels: HashMap<String, String>,
20}
21
22impl MetricLabels {
23    /// Create new metric labels
24    pub fn new() -> Self {
25        Self {
26            labels: HashMap::new(),
27        }
28    }
29
30    /// Add a label
31    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
32        self.labels.insert(key.into(), value.into());
33        self
34    }
35
36    /// Convert to metrics library format
37    pub fn to_metrics_labels(&self) -> Vec<Label> {
38        self.labels
39            .iter()
40            .map(|(k, v)| Label::new(k.clone(), v.clone()))
41            .collect()
42    }
43}
44
45impl Default for MetricLabels {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51/// Timer for measuring operation duration
52pub struct OperationTimer {
53    name: String,
54    labels: MetricLabels,
55    start_time: Instant,
56}
57
58impl OperationTimer {
59    pub fn new(name: String, labels: MetricLabels) -> Self {
60        Self {
61            name,
62            labels,
63            start_time: Instant::now(),
64        }
65    }
66
67    /// Stop the timer and record the duration
68    pub fn stop(self) {
69        let duration = self.start_time.elapsed();
70        tracing::debug!(
71            metric_type = "histogram",
72            metric_name = %self.name,
73            duration_seconds = duration.as_secs_f64(),
74            ?self.labels,
75            "Operation timer stopped"
76        );
77    }
78
79    /// Get elapsed time without stopping the timer
80    pub fn elapsed(&self) -> Duration {
81        self.start_time.elapsed()
82    }
83}
84
85/// Comprehensive performance metrics
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct PerformanceMetrics {
88    pub events_created_total: u64,
89    pub events_stored_total: u64,
90    pub events_loaded_total: u64,
91    pub operations_duration_seconds: HashMap<String, f64>,
92    pub error_count_total: u64,
93    pub active_connections: u64,
94    pub memory_usage_bytes: u64,
95    pub cpu_usage_percent: f64,
96    pub throughput_events_per_second: f64,
97    pub database_query_duration_seconds: f64,
98    pub cache_hit_ratio: f64,
99    pub timestamp: chrono::DateTime<chrono::Utc>,
100}
101
102impl Default for PerformanceMetrics {
103    fn default() -> Self {
104        Self {
105            events_created_total: 0,
106            events_stored_total: 0,
107            events_loaded_total: 0,
108            operations_duration_seconds: HashMap::new(),
109            error_count_total: 0,
110            active_connections: 0,
111            memory_usage_bytes: 0,
112            cpu_usage_percent: 0.0,
113            throughput_events_per_second: 0.0,
114            database_query_duration_seconds: 0.0,
115            cache_hit_ratio: 0.0,
116            timestamp: chrono::Utc::now(),
117        }
118    }
119}
120
121/// Event-specific metrics
122#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct EventMetrics {
124    pub event_type: String,
125    pub aggregate_type: String,
126    pub tenant_id: Option<String>,
127    pub operation_duration_ms: f64,
128    pub payload_size_bytes: u64,
129    pub serialization_duration_ms: f64,
130    pub storage_duration_ms: f64,
131    pub success: bool,
132    pub error_type: Option<String>,
133    pub timestamp: chrono::DateTime<chrono::Utc>,
134}
135
136/// Main metrics collector
137pub struct MetricsCollector {
138    #[allow(dead_code)]
139    prometheus_handle: Option<PrometheusHandle>,
140    config: ObservabilityConfig,
141    performance_metrics: Arc<RwLock<PerformanceMetrics>>,
142    counters: Arc<Mutex<HashMap<String, u64>>>,
143    gauges: Arc<Mutex<HashMap<String, f64>>>,
144    histograms: Arc<Mutex<HashMap<String, Vec<f64>>>>,
145}
146
147impl MetricsCollector {
148    /// Create a new metrics collector
149    pub fn new(config: &ObservabilityConfig) -> Result<Self> {
150        let prometheus_handle = if config.metrics_enabled {
151            match PrometheusBuilder::new().install() {
152                Ok(()) => {
153                    tracing::info!("Prometheus recorder installed successfully");
154                    None // For now, we don't store the handle
155                }
156                Err(e) => {
157                    tracing::warn!("Failed to install Prometheus recorder: {}", e);
158                    None
159                }
160            }
161        } else {
162            None
163        };
164
165        Ok(Self {
166            prometheus_handle,
167            config: config.clone(),
168            performance_metrics: Arc::new(RwLock::new(PerformanceMetrics::default())),
169            counters: Arc::new(Mutex::new(HashMap::new())),
170            gauges: Arc::new(Mutex::new(HashMap::new())),
171            histograms: Arc::new(Mutex::new(HashMap::new())),
172        })
173    }
174
175    /// Initialize the metrics collector
176    pub async fn initialize(&self) -> Result<()> {
177        tracing::info!(
178            metrics_enabled = self.config.metrics_enabled,
179            prometheus_endpoint = ?self.config.prometheus_endpoint,
180            "Metrics collector initialized"
181        );
182
183        Ok(())
184    }
185
186    /// Start a timer for an operation
187    pub fn start_timer(&self, operation: &str, labels: MetricLabels) -> OperationTimer {
188        let name = format!("eventuali_{operation}_duration_seconds");
189        OperationTimer::new(name, labels)
190    }
191
192    /// Record a counter metric
193    pub fn increment_counter(&self, name: &str, labels: MetricLabels) {
194        // Use tracing for now, can be extended to actual metrics later
195        tracing::debug!(
196            metric_type = "counter",
197            metric_name = name,
198            value = 1,
199            ?labels,
200            "Counter incremented"
201        );
202        
203        // Also track locally for aggregation
204        if let Ok(mut counters) = self.counters.lock() {
205            *counters.entry(name.to_string()).or_insert(0) += 1;
206        }
207    }
208
209    /// Record a gauge metric
210    pub fn record_gauge(&self, name: &str, value: f64, labels: MetricLabels) {
211        tracing::debug!(
212            metric_type = "gauge",
213            metric_name = name,
214            value = value,
215            ?labels,
216            "Gauge recorded"
217        );
218        
219        // Also track locally
220        if let Ok(mut gauges) = self.gauges.lock() {
221            gauges.insert(name.to_string(), value);
222        }
223    }
224
225    /// Record a histogram metric
226    pub fn record_metric(&self, name: &str, value: f64, labels: MetricLabels) {
227        tracing::debug!(
228            metric_type = "histogram",
229            metric_name = name,
230            value = value,
231            ?labels,
232            "Histogram recorded"
233        );
234        
235        // Also track locally
236        if let Ok(mut histograms) = self.histograms.lock() {
237            histograms.entry(name.to_string()).or_insert_with(Vec::new).push(value);
238        }
239    }
240
241    /// Record event-specific metrics
242    pub async fn record_event_metrics(&self, metrics: EventMetrics) {
243        let labels = MetricLabels::new()
244            .with_label("event_type", &metrics.event_type)
245            .with_label("aggregate_type", &metrics.aggregate_type)
246            .with_label("success", metrics.success.to_string());
247
248        // Increment event counter
249        self.increment_counter("eventuali_events_processed_total", labels.clone());
250
251        // Record durations
252        self.record_metric("eventuali_event_operation_duration_seconds", 
253                          metrics.operation_duration_ms / 1000.0, labels.clone());
254        
255        self.record_metric("eventuali_event_serialization_duration_seconds", 
256                          metrics.serialization_duration_ms / 1000.0, labels.clone());
257        
258        self.record_metric("eventuali_event_storage_duration_seconds", 
259                          metrics.storage_duration_ms / 1000.0, labels.clone());
260
261        // Record payload size
262        self.record_metric("eventuali_event_payload_size_bytes", 
263                          metrics.payload_size_bytes as f64, labels.clone());
264
265        // Record errors
266        if !metrics.success {
267            let error_labels = labels.with_label("error_type", 
268                metrics.error_type.as_deref().unwrap_or("unknown"));
269            self.increment_counter("eventuali_event_errors_total", error_labels);
270        }
271    }
272
273    /// Get current performance metrics
274    pub async fn get_performance_metrics(&self) -> PerformanceMetrics {
275        let mut metrics = self.performance_metrics.write().await;
276        
277        // Update counters from local tracking
278        if let Ok(counters) = self.counters.lock() {
279            metrics.events_created_total = *counters.get("eventuali_events_created_total").unwrap_or(&0);
280            metrics.events_stored_total = *counters.get("eventuali_events_stored_total").unwrap_or(&0);
281            metrics.events_loaded_total = *counters.get("eventuali_events_loaded_total").unwrap_or(&0);
282            metrics.error_count_total = *counters.get("eventuali_errors_total").unwrap_or(&0);
283        }
284
285        // Update gauges from local tracking
286        if let Ok(gauges) = self.gauges.lock() {
287            metrics.active_connections = *gauges.get("eventuali_active_connections").unwrap_or(&0.0) as u64;
288            metrics.memory_usage_bytes = *gauges.get("eventuali_memory_usage_bytes").unwrap_or(&0.0) as u64;
289            metrics.cpu_usage_percent = *gauges.get("eventuali_cpu_usage_percent").unwrap_or(&0.0);
290        }
291
292        // Calculate throughput (simplified)
293        let duration = chrono::Utc::now() - metrics.timestamp;
294        if duration.num_seconds() > 0 {
295            metrics.throughput_events_per_second = 
296                metrics.events_created_total as f64 / duration.num_seconds() as f64;
297        }
298
299        metrics.timestamp = chrono::Utc::now();
300        metrics.clone()
301    }
302
303    /// Shutdown the metrics collector
304    pub async fn shutdown(&self) -> Result<()> {
305        tracing::info!("Metrics collector shut down successfully");
306        Ok(())
307    }
308}
309
310impl std::fmt::Debug for MetricsCollector {
311    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
312        f.debug_struct("MetricsCollector")
313            .field("prometheus_handle", &"[PrometheusHandle]")
314            .field("config", &self.config)
315            .field("performance_metrics", &"[PerformanceMetrics]")
316            .field("counters", &"[Counters]")
317            .field("gauges", &"[Gauges]")
318            .field("histograms", &"[Histograms]")
319            .finish()
320    }
321}
322
323/// Prometheus exporter for metrics
324pub struct PrometheusExporter {
325    handle: PrometheusHandle,
326}
327
328impl PrometheusExporter {
329    pub fn new(handle: PrometheusHandle) -> Self {
330        Self { handle }
331    }
332
333    /// Get metrics in Prometheus format
334    pub fn get_metrics(&self) -> String {
335        self.handle.render()
336    }
337
338    /// Get metrics as bytes
339    pub fn get_metrics_bytes(&self) -> Vec<u8> {
340        self.get_metrics().into_bytes()
341    }
342
343    /// Export metrics to a file
344    pub async fn export_to_file(&self, path: &str) -> Result<()> {
345        let metrics = self.get_metrics();
346        tokio::fs::write(path, metrics).await
347            .map_err(|e| EventualiError::ObservabilityError(format!("Failed to export metrics: {e}")))?;
348        Ok(())
349    }
350
351    /// Serve metrics over HTTP (simplified)
352    pub fn metrics_endpoint_handler(&self) -> String {
353        self.get_metrics()
354    }
355}
356
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361
362    #[test]
363    fn test_metric_labels() {
364        let labels = MetricLabels::new()
365            .with_label("service", "eventuali")
366            .with_label("version", "0.1.0");
367
368        assert_eq!(labels.labels.len(), 2);
369        assert_eq!(labels.labels.get("service"), Some(&"eventuali".to_string()));
370        assert_eq!(labels.labels.get("version"), Some(&"0.1.0".to_string()));
371    }
372
373    #[tokio::test]
374    async fn test_metrics_collector_creation() {
375        let config = ObservabilityConfig {
376            metrics_enabled: false, // Disable for testing
377            ..ObservabilityConfig::default()
378        };
379        let collector = MetricsCollector::new(&config).unwrap();
380        
381        // Should succeed without Prometheus
382        assert!(!config.metrics_enabled);
383    }
384
385    #[tokio::test]
386    async fn test_performance_metrics() {
387        let config = ObservabilityConfig {
388            metrics_enabled: false,
389            ..ObservabilityConfig::default()
390        };
391        let collector = MetricsCollector::new(&config).unwrap();
392        
393        let metrics = collector.get_performance_metrics().await;
394        assert_eq!(metrics.events_created_total, 0);
395        assert_eq!(metrics.events_stored_total, 0);
396    }
397
398    #[test]
399    fn test_event_metrics() {
400        let metrics = EventMetrics {
401            event_type: "UserCreated".to_string(),
402            aggregate_type: "User".to_string(),
403            tenant_id: Some("tenant123".to_string()),
404            operation_duration_ms: 15.5,
405            payload_size_bytes: 1024,
406            serialization_duration_ms: 2.1,
407            storage_duration_ms: 8.3,
408            success: true,
409            error_type: None,
410            timestamp: chrono::Utc::now(),
411        };
412
413        assert_eq!(metrics.event_type, "UserCreated");
414        assert_eq!(metrics.aggregate_type, "User");
415        assert!(metrics.success);
416        assert_eq!(metrics.payload_size_bytes, 1024);
417    }
418
419    #[test]
420    fn test_operation_timer() {
421        let labels = MetricLabels::new();
422        let timer = OperationTimer::new("test_operation".to_string(), labels);
423        std::thread::sleep(std::time::Duration::from_millis(10));
424        let elapsed = timer.elapsed();
425        
426        assert!(elapsed.as_millis() >= 10);
427    }
428}