eventuali_core/observability/
telemetry.rs

1//! OpenTelemetry integration module for distributed tracing
2//!
3//! Provides comprehensive tracing capabilities with minimal performance overhead.
4
5use crate::error::Result;
6use crate::observability::correlation::{CorrelationId, generate_correlation_id};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use uuid::Uuid;
12
13/// Configuration for observability features
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct ObservabilityConfig {
16    pub tracing_enabled: bool,
17    pub metrics_enabled: bool,
18    pub structured_logging: bool,
19    pub jaeger_endpoint: Option<String>,
20    pub prometheus_endpoint: Option<String>,
21    pub service_name: String,
22    pub service_version: String,
23    pub environment: String,
24    pub sample_rate: f64,
25    pub max_events_per_span: u32,
26    pub export_timeout_millis: u64,
27}
28
29impl Default for ObservabilityConfig {
30    fn default() -> Self {
31        Self {
32            tracing_enabled: true,
33            metrics_enabled: true,
34            structured_logging: true,
35            jaeger_endpoint: Some("http://localhost:14268/api/traces".to_string()),
36            prometheus_endpoint: Some("http://localhost:9090".to_string()),
37            service_name: "eventuali".to_string(),
38            service_version: "0.1.0".to_string(),
39            environment: "development".to_string(),
40            sample_rate: 1.0, // Sample all traces in development
41            max_events_per_span: 128,
42            export_timeout_millis: 30000,
43        }
44    }
45}
46
47/// Main telemetry provider for OpenTelemetry integration
48#[derive(Debug)]
49pub struct TelemetryProvider {
50    config: ObservabilityConfig,
51    active_traces: Arc<RwLock<HashMap<CorrelationId, TraceContext>>>,
52}
53
54impl TelemetryProvider {
55    /// Create a new telemetry provider
56    pub async fn new(config: &ObservabilityConfig) -> Result<Self> {
57        Ok(Self {
58            config: config.clone(),
59            active_traces: Arc::new(RwLock::new(HashMap::new())),
60        })
61    }
62
63    /// Initialize the telemetry provider
64    pub async fn initialize(&self) -> Result<()> {
65        tracing::info!(
66            service_name = %self.config.service_name,
67            service_version = %self.config.service_version,
68            environment = %self.config.environment,
69            "Telemetry provider initialized"
70        );
71
72        Ok(())
73    }
74
75    /// Create a new trace context
76    pub async fn create_trace(&self, operation: &str, _parent: Option<&TraceContext>) -> TraceContext {
77        let correlation_id = generate_correlation_id();
78
79        let trace_context = TraceContext::new(operation.to_string(), correlation_id.clone());
80
81        // Store the active trace
82        self.active_traces.write().await.insert(correlation_id, trace_context.clone());
83
84        trace_context
85    }
86
87    /// Get an active trace by correlation ID
88    pub async fn get_trace(&self, correlation_id: &CorrelationId) -> Option<TraceContext> {
89        self.active_traces.read().await.get(correlation_id).cloned()
90    }
91
92    /// End a trace and clean up
93    pub async fn end_trace(&self, trace: &TraceContext) {
94        self.active_traces.write().await.remove(&trace.correlation_id);
95        
96        let duration = trace.start_time.elapsed();
97        tracing::debug!(
98            operation = %trace.operation,
99            correlation_id = %trace.correlation_id,
100            duration_ms = duration.as_millis(),
101            "Trace completed"
102        );
103    }
104
105    /// Shutdown the telemetry provider
106    pub async fn shutdown(&self) -> Result<()> {
107        // Clear all active traces
108        self.active_traces.write().await.clear();
109
110        tracing::info!("Telemetry provider shut down successfully");
111        Ok(())
112    }
113}
114
115/// Represents a trace context for an operation
116#[derive(Debug, Clone)]
117pub struct TraceContext {
118    pub operation: String,
119    pub correlation_id: CorrelationId,
120    pub start_time: std::time::Instant,
121    pub attributes: HashMap<String, String>,
122    pub trace_id: Option<String>,
123    pub span_id: Option<String>,
124}
125
126impl TraceContext {
127    pub fn new(operation: String, correlation_id: CorrelationId) -> Self {
128        Self {
129            operation,
130            correlation_id,
131            start_time: std::time::Instant::now(),
132            attributes: HashMap::new(),
133            trace_id: Some(uuid::Uuid::new_v4().to_string()),
134            span_id: Some(uuid::Uuid::new_v4().to_string()),
135        }
136    }
137
138    /// Add an attribute to this trace
139    pub fn add_attribute(&mut self, key: &str, value: &str) {
140        self.attributes.insert(key.to_string(), value.to_string());
141        
142        tracing::debug!(
143            trace_id = ?self.trace_id,
144            span_id = ?self.span_id,
145            correlation_id = %self.correlation_id,
146            key = key,
147            value = value,
148            "Added trace attribute"
149        );
150    }
151
152    /// Add an event to this trace
153    pub fn add_event(&self, name: &str, attributes: HashMap<String, String>) {
154        tracing::info!(
155            trace_id = ?self.trace_id,
156            span_id = ?self.span_id,
157            correlation_id = %self.correlation_id,
158            event_name = name,
159            ?attributes,
160            "Trace event"
161        );
162    }
163
164    /// Record an error in this trace
165    pub fn record_error(&self, error: &dyn std::error::Error) {
166        tracing::error!(
167            trace_id = ?self.trace_id,
168            span_id = ?self.span_id,
169            correlation_id = %self.correlation_id,
170            error = %error,
171            "Trace error recorded"
172        );
173    }
174
175    /// Get elapsed time for this trace
176    pub fn elapsed(&self) -> std::time::Duration {
177        self.start_time.elapsed()
178    }
179}
180
181/// Service for managing distributed tracing
182#[derive(Debug)]
183pub struct TracingService {
184    provider: Arc<TelemetryProvider>,
185}
186
187impl TracingService {
188    pub fn new(provider: Arc<TelemetryProvider>) -> Self {
189        Self { provider }
190    }
191
192    /// Start a new trace for an operation
193    pub async fn start_trace(&self, operation: &str) -> TraceContext {
194        self.provider.create_trace(operation, None).await
195    }
196
197    /// Start a child trace
198    pub async fn start_child_trace(&self, operation: &str, parent: &TraceContext) -> TraceContext {
199        self.provider.create_trace(operation, Some(parent)).await
200    }
201
202    /// End a trace
203    pub async fn end_trace(&self, trace: TraceContext) {
204        self.provider.end_trace(&trace).await;
205    }
206}
207
208/// Represents an event trace with metadata
209#[derive(Debug, Clone, Serialize, Deserialize)]
210pub struct EventTrace {
211    pub event_id: Uuid,
212    pub trace_id: String,
213    pub span_id: String,
214    pub operation: String,
215    pub correlation_id: CorrelationId,
216    pub timestamp: chrono::DateTime<chrono::Utc>,
217    pub duration_ns: Option<u64>,
218    pub attributes: HashMap<String, String>,
219    pub status: TraceStatus,
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
223pub enum TraceStatus {
224    Ok,
225    Error(String),
226    Cancelled,
227}
228
229/// Builder for creating spans with fluent API
230pub struct SpanBuilder {
231    operation: String,
232    attributes: HashMap<String, String>,
233    parent: Option<TraceContext>,
234}
235
236impl SpanBuilder {
237    pub fn new(operation: &str) -> Self {
238        Self {
239            operation: operation.to_string(),
240            attributes: HashMap::new(),
241            parent: None,
242        }
243    }
244
245    pub fn with_attribute(mut self, key: &str, value: &str) -> Self {
246        self.attributes.insert(key.to_string(), value.to_string());
247        self
248    }
249
250    pub fn with_parent(mut self, parent: TraceContext) -> Self {
251        self.parent = Some(parent);
252        self
253    }
254
255    pub async fn start(self, provider: &TelemetryProvider) -> TraceContext {
256        let mut trace = provider.create_trace(&self.operation, self.parent.as_ref()).await;
257        
258        for (key, value) in self.attributes {
259            trace.add_attribute(&key, &value);
260        }
261
262        trace
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269
270    #[tokio::test]
271    async fn test_telemetry_provider_creation() {
272        let config = ObservabilityConfig::default();
273        let provider = TelemetryProvider::new(&config).await.unwrap();
274        assert_eq!(provider.config.service_name, "eventuali");
275    }
276
277    #[tokio::test]
278    async fn test_trace_context_creation() {
279        let config = ObservabilityConfig {
280            jaeger_endpoint: None, // Disable export for testing
281            ..ObservabilityConfig::default()
282        };
283        let provider = TelemetryProvider::new(&config).await.unwrap();
284        
285        let trace = provider.create_trace("test_operation", None).await;
286        assert_eq!(trace.operation, "test_operation");
287        assert!(!trace.correlation_id.to_string().is_empty());
288    }
289
290    #[tokio::test]
291    async fn test_trace_attributes() {
292        let config = ObservabilityConfig {
293            jaeger_endpoint: None,
294            ..ObservabilityConfig::default()
295        };
296        let provider = TelemetryProvider::new(&config).await.unwrap();
297        
298        let mut trace = provider.create_trace("test_operation", None).await;
299        trace.add_attribute("test_key", "test_value");
300        
301        assert_eq!(trace.attributes.get("test_key"), Some(&"test_value".to_string()));
302    }
303}