eventuali_core/observability/
correlation.rs

1//! Correlation ID management for distributed tracing and request tracking
2
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::fmt;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use uuid::Uuid;
9
10/// A unique identifier for correlating operations across distributed systems
11#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
12pub struct CorrelationId(String);
13
14impl CorrelationId {
15    /// Create a new correlation ID from a string
16    pub fn new(id: impl Into<String>) -> Self {
17        Self(id.into())
18    }
19
20    /// Create a correlation ID from a UUID
21    pub fn from_uuid(uuid: Uuid) -> Self {
22        Self(uuid.to_string())
23    }
24
25    /// Get the correlation ID as a string
26    pub fn as_str(&self) -> &str {
27        &self.0
28    }
29
30    /// Convert to string
31    pub fn into_string(self) -> String {
32        self.0
33    }
34}
35
36impl fmt::Display for CorrelationId {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        write!(f, "{}", self.0)
39    }
40}
41
42impl From<String> for CorrelationId {
43    fn from(id: String) -> Self {
44        Self(id)
45    }
46}
47
48impl From<&str> for CorrelationId {
49    fn from(id: &str) -> Self {
50        Self(id.to_string())
51    }
52}
53
54impl From<Uuid> for CorrelationId {
55    fn from(uuid: Uuid) -> Self {
56        Self::from_uuid(uuid)
57    }
58}
59
60/// Generate a new correlation ID
61pub fn generate_correlation_id() -> CorrelationId {
62    CorrelationId::from_uuid(Uuid::new_v4())
63}
64
65/// Context for tracking correlated operations
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct CorrelationContext {
68    pub correlation_id: CorrelationId,
69    pub parent_id: Option<CorrelationId>,
70    pub operation: String,
71    pub service: String,
72    pub user_id: Option<String>,
73    pub session_id: Option<String>,
74    pub request_id: Option<String>,
75    pub trace_id: Option<String>,
76    pub span_id: Option<String>,
77    pub attributes: HashMap<String, String>,
78    pub created_at: chrono::DateTime<chrono::Utc>,
79}
80
81impl CorrelationContext {
82    /// Create a new correlation context
83    pub fn new(operation: impl Into<String>, service: impl Into<String>) -> Self {
84        Self {
85            correlation_id: generate_correlation_id(),
86            parent_id: None,
87            operation: operation.into(),
88            service: service.into(),
89            user_id: None,
90            session_id: None,
91            request_id: None,
92            trace_id: None,
93            span_id: None,
94            attributes: HashMap::new(),
95            created_at: chrono::Utc::now(),
96        }
97    }
98
99    /// Create a child context with a new correlation ID but linked parent
100    pub fn create_child(&self, operation: impl Into<String>) -> Self {
101        Self {
102            correlation_id: generate_correlation_id(),
103            parent_id: Some(self.correlation_id.clone()),
104            operation: operation.into(),
105            service: self.service.clone(),
106            user_id: self.user_id.clone(),
107            session_id: self.session_id.clone(),
108            request_id: self.request_id.clone(),
109            trace_id: self.trace_id.clone(),
110            span_id: None, // Child gets new span ID
111            attributes: self.attributes.clone(),
112            created_at: chrono::Utc::now(),
113        }
114    }
115
116    /// Set user ID for this context
117    pub fn with_user_id(mut self, user_id: impl Into<String>) -> Self {
118        self.user_id = Some(user_id.into());
119        self
120    }
121
122    /// Set session ID for this context
123    pub fn with_session_id(mut self, session_id: impl Into<String>) -> Self {
124        self.session_id = Some(session_id.into());
125        self
126    }
127
128    /// Set request ID for this context
129    pub fn with_request_id(mut self, request_id: impl Into<String>) -> Self {
130        self.request_id = Some(request_id.into());
131        self
132    }
133
134    /// Set trace ID for this context
135    pub fn with_trace_id(mut self, trace_id: impl Into<String>) -> Self {
136        self.trace_id = Some(trace_id.into());
137        self
138    }
139
140    /// Set span ID for this context
141    pub fn with_span_id(mut self, span_id: impl Into<String>) -> Self {
142        self.span_id = Some(span_id.into());
143        self
144    }
145
146    /// Add an attribute to this context
147    pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
148        self.attributes.insert(key.into(), value.into());
149        self
150    }
151
152    /// Get an attribute from this context
153    pub fn get_attribute(&self, key: &str) -> Option<&String> {
154        self.attributes.get(key)
155    }
156}
157
158/// Tracker for managing correlation contexts across operations
159#[derive(Debug)]
160pub struct CorrelationTracker {
161    contexts: Arc<RwLock<HashMap<CorrelationId, CorrelationContext>>>,
162}
163
164impl CorrelationTracker {
165    /// Create a new correlation tracker
166    pub fn new() -> Self {
167        Self {
168            contexts: Arc::new(RwLock::new(HashMap::new())),
169        }
170    }
171
172    /// Register a new correlation ID
173    pub fn register(&self, correlation_id: CorrelationId) {
174        // This is a simple registration - we could extend with more metadata
175        tracing::debug!(correlation_id = %correlation_id, "Registered correlation ID");
176    }
177
178    /// Store a correlation context
179    pub async fn store_context(&self, context: CorrelationContext) {
180        let correlation_id = context.correlation_id.clone();
181        self.contexts.write().await.insert(correlation_id.clone(), context);
182        
183        tracing::debug!(
184            correlation_id = %correlation_id,
185            "Stored correlation context"
186        );
187    }
188
189    /// Retrieve a correlation context
190    pub async fn get_context(&self, correlation_id: &CorrelationId) -> Option<CorrelationContext> {
191        self.contexts.read().await.get(correlation_id).cloned()
192    }
193
194    /// Remove a correlation context
195    pub async fn remove_context(&self, correlation_id: &CorrelationId) -> Option<CorrelationContext> {
196        let context = self.contexts.write().await.remove(correlation_id);
197        
198        if context.is_some() {
199            tracing::debug!(
200                correlation_id = %correlation_id,
201                "Removed correlation context"
202            );
203        }
204        
205        context
206    }
207
208    /// Get all active correlation contexts
209    pub async fn get_active_contexts(&self) -> Vec<CorrelationContext> {
210        self.contexts.read().await.values().cloned().collect()
211    }
212
213    /// Clean up old contexts (should be called periodically)
214    pub async fn cleanup_old_contexts(&self, max_age: chrono::Duration) {
215        let cutoff = chrono::Utc::now() - max_age;
216        let mut contexts = self.contexts.write().await;
217        
218        let initial_count = contexts.len();
219        contexts.retain(|_, context| context.created_at > cutoff);
220        let final_count = contexts.len();
221        
222        if initial_count > final_count {
223            tracing::info!(
224                removed = initial_count - final_count,
225                remaining = final_count,
226                "Cleaned up old correlation contexts"
227            );
228        }
229    }
230}
231
232impl Default for CorrelationTracker {
233    fn default() -> Self {
234        Self::new()
235    }
236}
237
238/// Request context that includes correlation information
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct RequestContext {
241    pub correlation_context: CorrelationContext,
242    pub request_path: Option<String>,
243    pub http_method: Option<String>,
244    pub user_agent: Option<String>,
245    pub client_ip: Option<String>,
246    pub request_headers: HashMap<String, String>,
247    pub started_at: chrono::DateTime<chrono::Utc>,
248}
249
250impl RequestContext {
251    /// Create a new request context
252    pub fn new(operation: impl Into<String>, service: impl Into<String>) -> Self {
253        Self {
254            correlation_context: CorrelationContext::new(operation, service),
255            request_path: None,
256            http_method: None,
257            user_agent: None,
258            client_ip: None,
259            request_headers: HashMap::new(),
260            started_at: chrono::Utc::now(),
261        }
262    }
263
264    /// Get the correlation ID for this request
265    pub fn correlation_id(&self) -> &CorrelationId {
266        &self.correlation_context.correlation_id
267    }
268
269    /// Add request metadata
270    pub fn with_request_metadata(
271        mut self,
272        path: Option<String>,
273        method: Option<String>,
274        user_agent: Option<String>,
275        client_ip: Option<String>,
276    ) -> Self {
277        self.request_path = path;
278        self.http_method = method;
279        self.user_agent = user_agent;
280        self.client_ip = client_ip;
281        self
282    }
283
284    /// Add a request header
285    pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
286        self.request_headers.insert(name.into(), value.into());
287        self
288    }
289
290    /// Get request duration so far
291    pub fn duration(&self) -> chrono::Duration {
292        chrono::Utc::now() - self.started_at
293    }
294}
295
296/// Correlation for distributed tracing
297#[derive(Debug, Clone, Serialize, Deserialize)]
298pub struct TraceCorrelation {
299    pub correlation_id: CorrelationId,
300    pub trace_id: String,
301    pub span_id: String,
302    pub parent_span_id: Option<String>,
303    pub service_name: String,
304    pub operation_name: String,
305    pub baggage: HashMap<String, String>,
306}
307
308impl TraceCorrelation {
309    /// Create a new trace correlation
310    pub fn new(
311        trace_id: impl Into<String>,
312        span_id: impl Into<String>,
313        service_name: impl Into<String>,
314        operation_name: impl Into<String>,
315    ) -> Self {
316        Self {
317            correlation_id: generate_correlation_id(),
318            trace_id: trace_id.into(),
319            span_id: span_id.into(),
320            parent_span_id: None,
321            service_name: service_name.into(),
322            operation_name: operation_name.into(),
323            baggage: HashMap::new(),
324        }
325    }
326
327    /// Create a child trace correlation
328    pub fn create_child(
329        &self,
330        span_id: impl Into<String>,
331        operation_name: impl Into<String>,
332    ) -> Self {
333        Self {
334            correlation_id: generate_correlation_id(),
335            trace_id: self.trace_id.clone(),
336            span_id: span_id.into(),
337            parent_span_id: Some(self.span_id.clone()),
338            service_name: self.service_name.clone(),
339            operation_name: operation_name.into(),
340            baggage: self.baggage.clone(),
341        }
342    }
343
344    /// Add baggage to this correlation
345    pub fn with_baggage(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
346        self.baggage.insert(key.into(), value.into());
347        self
348    }
349
350    /// Get baggage value
351    pub fn get_baggage(&self, key: &str) -> Option<&String> {
352        self.baggage.get(key)
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    use super::*;
359
360    #[test]
361    fn test_correlation_id_generation() {
362        let id1 = generate_correlation_id();
363        let id2 = generate_correlation_id();
364        
365        assert_ne!(id1, id2);
366        assert!(!id1.as_str().is_empty());
367        assert!(!id2.as_str().is_empty());
368    }
369
370    #[test]
371    fn test_correlation_context_creation() {
372        let context = CorrelationContext::new("test_operation", "test_service");
373        
374        assert_eq!(context.operation, "test_operation");
375        assert_eq!(context.service, "test_service");
376        assert!(context.parent_id.is_none());
377        assert!(!context.correlation_id.as_str().is_empty());
378    }
379
380    #[test]
381    fn test_child_context_creation() {
382        let parent = CorrelationContext::new("parent_op", "test_service");
383        let child = parent.create_child("child_op");
384        
385        assert_eq!(child.operation, "child_op");
386        assert_eq!(child.service, "test_service");
387        assert_eq!(child.parent_id, Some(parent.correlation_id));
388        assert_ne!(child.correlation_id, parent.correlation_id);
389    }
390
391    #[tokio::test]
392    async fn test_correlation_tracker() {
393        let tracker = CorrelationTracker::new();
394        let context = CorrelationContext::new("test_op", "test_service");
395        let correlation_id = context.correlation_id.clone();
396        
397        tracker.store_context(context.clone()).await;
398        
399        let retrieved = tracker.get_context(&correlation_id).await;
400        assert!(retrieved.is_some());
401        assert_eq!(retrieved.unwrap().operation, "test_op");
402        
403        let removed = tracker.remove_context(&correlation_id).await;
404        assert!(removed.is_some());
405        
406        let not_found = tracker.get_context(&correlation_id).await;
407        assert!(not_found.is_none());
408    }
409
410    #[test]
411    fn test_request_context() {
412        let request_ctx = RequestContext::new("http_request", "web_service")
413            .with_request_metadata(
414                Some("/api/events".to_string()),
415                Some("POST".to_string()),
416                Some("test-agent/1.0".to_string()),
417                Some("192.168.1.1".to_string()),
418            )
419            .with_header("Authorization", "Bearer token123");
420
421        assert_eq!(request_ctx.request_path, Some("/api/events".to_string()));
422        assert_eq!(request_ctx.http_method, Some("POST".to_string()));
423        assert_eq!(request_ctx.request_headers.get("Authorization"), Some(&"Bearer token123".to_string()));
424    }
425
426    #[test]
427    fn test_trace_correlation() {
428        let trace = TraceCorrelation::new("trace123", "span456", "eventuali", "create_event")
429            .with_baggage("user_id", "user789");
430
431        assert_eq!(trace.trace_id, "trace123");
432        assert_eq!(trace.span_id, "span456");
433        assert_eq!(trace.service_name, "eventuali");
434        assert_eq!(trace.operation_name, "create_event");
435        assert_eq!(trace.get_baggage("user_id"), Some(&"user789".to_string()));
436
437        let child = trace.create_child("child_span", "save_event");
438        assert_eq!(child.trace_id, trace.trace_id);
439        assert_eq!(child.parent_span_id, Some(trace.span_id));
440        assert_eq!(child.get_baggage("user_id"), Some(&"user789".to_string()));
441    }
442}