eventuali_core/observability/
metrics.rs1use crate::error::{EventualiError, Result};
6use crate::observability::ObservabilityConfig;
7use metrics::Label;
8use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
9use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13use std::time::{Duration, Instant};
14use tokio::sync::RwLock;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct MetricLabels {
19 pub labels: HashMap<String, String>,
20}
21
22impl MetricLabels {
23 pub fn new() -> Self {
25 Self {
26 labels: HashMap::new(),
27 }
28 }
29
30 pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
32 self.labels.insert(key.into(), value.into());
33 self
34 }
35
36 pub fn to_metrics_labels(&self) -> Vec<Label> {
38 self.labels
39 .iter()
40 .map(|(k, v)| Label::new(k.clone(), v.clone()))
41 .collect()
42 }
43}
44
45impl Default for MetricLabels {
46 fn default() -> Self {
47 Self::new()
48 }
49}
50
51pub struct OperationTimer {
53 name: String,
54 labels: MetricLabels,
55 start_time: Instant,
56}
57
58impl OperationTimer {
59 pub fn new(name: String, labels: MetricLabels) -> Self {
60 Self {
61 name,
62 labels,
63 start_time: Instant::now(),
64 }
65 }
66
67 pub fn stop(self) {
69 let duration = self.start_time.elapsed();
70 tracing::debug!(
71 metric_type = "histogram",
72 metric_name = %self.name,
73 duration_seconds = duration.as_secs_f64(),
74 ?self.labels,
75 "Operation timer stopped"
76 );
77 }
78
79 pub fn elapsed(&self) -> Duration {
81 self.start_time.elapsed()
82 }
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct PerformanceMetrics {
88 pub events_created_total: u64,
89 pub events_stored_total: u64,
90 pub events_loaded_total: u64,
91 pub operations_duration_seconds: HashMap<String, f64>,
92 pub error_count_total: u64,
93 pub active_connections: u64,
94 pub memory_usage_bytes: u64,
95 pub cpu_usage_percent: f64,
96 pub throughput_events_per_second: f64,
97 pub database_query_duration_seconds: f64,
98 pub cache_hit_ratio: f64,
99 pub timestamp: chrono::DateTime<chrono::Utc>,
100}
101
102impl Default for PerformanceMetrics {
103 fn default() -> Self {
104 Self {
105 events_created_total: 0,
106 events_stored_total: 0,
107 events_loaded_total: 0,
108 operations_duration_seconds: HashMap::new(),
109 error_count_total: 0,
110 active_connections: 0,
111 memory_usage_bytes: 0,
112 cpu_usage_percent: 0.0,
113 throughput_events_per_second: 0.0,
114 database_query_duration_seconds: 0.0,
115 cache_hit_ratio: 0.0,
116 timestamp: chrono::Utc::now(),
117 }
118 }
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct EventMetrics {
124 pub event_type: String,
125 pub aggregate_type: String,
126 pub tenant_id: Option<String>,
127 pub operation_duration_ms: f64,
128 pub payload_size_bytes: u64,
129 pub serialization_duration_ms: f64,
130 pub storage_duration_ms: f64,
131 pub success: bool,
132 pub error_type: Option<String>,
133 pub timestamp: chrono::DateTime<chrono::Utc>,
134}
135
136pub struct MetricsCollector {
138 #[allow(dead_code)]
139 prometheus_handle: Option<PrometheusHandle>,
140 config: ObservabilityConfig,
141 performance_metrics: Arc<RwLock<PerformanceMetrics>>,
142 counters: Arc<Mutex<HashMap<String, u64>>>,
143 gauges: Arc<Mutex<HashMap<String, f64>>>,
144 histograms: Arc<Mutex<HashMap<String, Vec<f64>>>>,
145}
146
147impl MetricsCollector {
148 pub fn new(config: &ObservabilityConfig) -> Result<Self> {
150 let prometheus_handle = if config.metrics_enabled {
151 match PrometheusBuilder::new().install() {
152 Ok(()) => {
153 tracing::info!("Prometheus recorder installed successfully");
154 None }
156 Err(e) => {
157 tracing::warn!("Failed to install Prometheus recorder: {}", e);
158 None
159 }
160 }
161 } else {
162 None
163 };
164
165 Ok(Self {
166 prometheus_handle,
167 config: config.clone(),
168 performance_metrics: Arc::new(RwLock::new(PerformanceMetrics::default())),
169 counters: Arc::new(Mutex::new(HashMap::new())),
170 gauges: Arc::new(Mutex::new(HashMap::new())),
171 histograms: Arc::new(Mutex::new(HashMap::new())),
172 })
173 }
174
175 pub async fn initialize(&self) -> Result<()> {
177 tracing::info!(
178 metrics_enabled = self.config.metrics_enabled,
179 prometheus_endpoint = ?self.config.prometheus_endpoint,
180 "Metrics collector initialized"
181 );
182
183 Ok(())
184 }
185
186 pub fn start_timer(&self, operation: &str, labels: MetricLabels) -> OperationTimer {
188 let name = format!("eventuali_{operation}_duration_seconds");
189 OperationTimer::new(name, labels)
190 }
191
192 pub fn increment_counter(&self, name: &str, labels: MetricLabels) {
194 tracing::debug!(
196 metric_type = "counter",
197 metric_name = name,
198 value = 1,
199 ?labels,
200 "Counter incremented"
201 );
202
203 if let Ok(mut counters) = self.counters.lock() {
205 *counters.entry(name.to_string()).or_insert(0) += 1;
206 }
207 }
208
209 pub fn record_gauge(&self, name: &str, value: f64, labels: MetricLabels) {
211 tracing::debug!(
212 metric_type = "gauge",
213 metric_name = name,
214 value = value,
215 ?labels,
216 "Gauge recorded"
217 );
218
219 if let Ok(mut gauges) = self.gauges.lock() {
221 gauges.insert(name.to_string(), value);
222 }
223 }
224
225 pub fn record_metric(&self, name: &str, value: f64, labels: MetricLabels) {
227 tracing::debug!(
228 metric_type = "histogram",
229 metric_name = name,
230 value = value,
231 ?labels,
232 "Histogram recorded"
233 );
234
235 if let Ok(mut histograms) = self.histograms.lock() {
237 histograms.entry(name.to_string()).or_insert_with(Vec::new).push(value);
238 }
239 }
240
241 pub async fn record_event_metrics(&self, metrics: EventMetrics) {
243 let labels = MetricLabels::new()
244 .with_label("event_type", &metrics.event_type)
245 .with_label("aggregate_type", &metrics.aggregate_type)
246 .with_label("success", metrics.success.to_string());
247
248 self.increment_counter("eventuali_events_processed_total", labels.clone());
250
251 self.record_metric("eventuali_event_operation_duration_seconds",
253 metrics.operation_duration_ms / 1000.0, labels.clone());
254
255 self.record_metric("eventuali_event_serialization_duration_seconds",
256 metrics.serialization_duration_ms / 1000.0, labels.clone());
257
258 self.record_metric("eventuali_event_storage_duration_seconds",
259 metrics.storage_duration_ms / 1000.0, labels.clone());
260
261 self.record_metric("eventuali_event_payload_size_bytes",
263 metrics.payload_size_bytes as f64, labels.clone());
264
265 if !metrics.success {
267 let error_labels = labels.with_label("error_type",
268 metrics.error_type.as_deref().unwrap_or("unknown"));
269 self.increment_counter("eventuali_event_errors_total", error_labels);
270 }
271 }
272
273 pub async fn get_performance_metrics(&self) -> PerformanceMetrics {
275 let mut metrics = self.performance_metrics.write().await;
276
277 if let Ok(counters) = self.counters.lock() {
279 metrics.events_created_total = *counters.get("eventuali_events_created_total").unwrap_or(&0);
280 metrics.events_stored_total = *counters.get("eventuali_events_stored_total").unwrap_or(&0);
281 metrics.events_loaded_total = *counters.get("eventuali_events_loaded_total").unwrap_or(&0);
282 metrics.error_count_total = *counters.get("eventuali_errors_total").unwrap_or(&0);
283 }
284
285 if let Ok(gauges) = self.gauges.lock() {
287 metrics.active_connections = *gauges.get("eventuali_active_connections").unwrap_or(&0.0) as u64;
288 metrics.memory_usage_bytes = *gauges.get("eventuali_memory_usage_bytes").unwrap_or(&0.0) as u64;
289 metrics.cpu_usage_percent = *gauges.get("eventuali_cpu_usage_percent").unwrap_or(&0.0);
290 }
291
292 let duration = chrono::Utc::now() - metrics.timestamp;
294 if duration.num_seconds() > 0 {
295 metrics.throughput_events_per_second =
296 metrics.events_created_total as f64 / duration.num_seconds() as f64;
297 }
298
299 metrics.timestamp = chrono::Utc::now();
300 metrics.clone()
301 }
302
303 pub async fn shutdown(&self) -> Result<()> {
305 tracing::info!("Metrics collector shut down successfully");
306 Ok(())
307 }
308}
309
310impl std::fmt::Debug for MetricsCollector {
311 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
312 f.debug_struct("MetricsCollector")
313 .field("prometheus_handle", &"[PrometheusHandle]")
314 .field("config", &self.config)
315 .field("performance_metrics", &"[PerformanceMetrics]")
316 .field("counters", &"[Counters]")
317 .field("gauges", &"[Gauges]")
318 .field("histograms", &"[Histograms]")
319 .finish()
320 }
321}
322
323pub struct PrometheusExporter {
325 handle: PrometheusHandle,
326}
327
328impl PrometheusExporter {
329 pub fn new(handle: PrometheusHandle) -> Self {
330 Self { handle }
331 }
332
333 pub fn get_metrics(&self) -> String {
335 self.handle.render()
336 }
337
338 pub fn get_metrics_bytes(&self) -> Vec<u8> {
340 self.get_metrics().into_bytes()
341 }
342
343 pub async fn export_to_file(&self, path: &str) -> Result<()> {
345 let metrics = self.get_metrics();
346 tokio::fs::write(path, metrics).await
347 .map_err(|e| EventualiError::ObservabilityError(format!("Failed to export metrics: {e}")))?;
348 Ok(())
349 }
350
351 pub fn metrics_endpoint_handler(&self) -> String {
353 self.get_metrics()
354 }
355}
356
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361
362 #[test]
363 fn test_metric_labels() {
364 let labels = MetricLabels::new()
365 .with_label("service", "eventuali")
366 .with_label("version", "0.1.0");
367
368 assert_eq!(labels.labels.len(), 2);
369 assert_eq!(labels.labels.get("service"), Some(&"eventuali".to_string()));
370 assert_eq!(labels.labels.get("version"), Some(&"0.1.0".to_string()));
371 }
372
373 #[tokio::test]
374 async fn test_metrics_collector_creation() {
375 let config = ObservabilityConfig {
376 metrics_enabled: false, ..ObservabilityConfig::default()
378 };
379 let collector = MetricsCollector::new(&config).unwrap();
380
381 assert!(!config.metrics_enabled);
383 }
384
385 #[tokio::test]
386 async fn test_performance_metrics() {
387 let config = ObservabilityConfig {
388 metrics_enabled: false,
389 ..ObservabilityConfig::default()
390 };
391 let collector = MetricsCollector::new(&config).unwrap();
392
393 let metrics = collector.get_performance_metrics().await;
394 assert_eq!(metrics.events_created_total, 0);
395 assert_eq!(metrics.events_stored_total, 0);
396 }
397
398 #[test]
399 fn test_event_metrics() {
400 let metrics = EventMetrics {
401 event_type: "UserCreated".to_string(),
402 aggregate_type: "User".to_string(),
403 tenant_id: Some("tenant123".to_string()),
404 operation_duration_ms: 15.5,
405 payload_size_bytes: 1024,
406 serialization_duration_ms: 2.1,
407 storage_duration_ms: 8.3,
408 success: true,
409 error_type: None,
410 timestamp: chrono::Utc::now(),
411 };
412
413 assert_eq!(metrics.event_type, "UserCreated");
414 assert_eq!(metrics.aggregate_type, "User");
415 assert!(metrics.success);
416 assert_eq!(metrics.payload_size_bytes, 1024);
417 }
418
419 #[test]
420 fn test_operation_timer() {
421 let labels = MetricLabels::new();
422 let timer = OperationTimer::new("test_operation".to_string(), labels);
423 std::thread::sleep(std::time::Duration::from_millis(10));
424 let elapsed = timer.elapsed();
425
426 assert!(elapsed.as_millis() >= 10);
427 }
428}