1use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::fmt;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use uuid::Uuid;
9
10#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
12pub struct CorrelationId(String);
13
14impl CorrelationId {
15 pub fn new(id: impl Into<String>) -> Self {
17 Self(id.into())
18 }
19
20 pub fn from_uuid(uuid: Uuid) -> Self {
22 Self(uuid.to_string())
23 }
24
25 pub fn as_str(&self) -> &str {
27 &self.0
28 }
29
30 pub fn into_string(self) -> String {
32 self.0
33 }
34}
35
36impl fmt::Display for CorrelationId {
37 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38 write!(f, "{}", self.0)
39 }
40}
41
42impl From<String> for CorrelationId {
43 fn from(id: String) -> Self {
44 Self(id)
45 }
46}
47
48impl From<&str> for CorrelationId {
49 fn from(id: &str) -> Self {
50 Self(id.to_string())
51 }
52}
53
54impl From<Uuid> for CorrelationId {
55 fn from(uuid: Uuid) -> Self {
56 Self::from_uuid(uuid)
57 }
58}
59
60pub fn generate_correlation_id() -> CorrelationId {
62 CorrelationId::from_uuid(Uuid::new_v4())
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct CorrelationContext {
68 pub correlation_id: CorrelationId,
69 pub parent_id: Option<CorrelationId>,
70 pub operation: String,
71 pub service: String,
72 pub user_id: Option<String>,
73 pub session_id: Option<String>,
74 pub request_id: Option<String>,
75 pub trace_id: Option<String>,
76 pub span_id: Option<String>,
77 pub attributes: HashMap<String, String>,
78 pub created_at: chrono::DateTime<chrono::Utc>,
79}
80
81impl CorrelationContext {
82 pub fn new(operation: impl Into<String>, service: impl Into<String>) -> Self {
84 Self {
85 correlation_id: generate_correlation_id(),
86 parent_id: None,
87 operation: operation.into(),
88 service: service.into(),
89 user_id: None,
90 session_id: None,
91 request_id: None,
92 trace_id: None,
93 span_id: None,
94 attributes: HashMap::new(),
95 created_at: chrono::Utc::now(),
96 }
97 }
98
99 pub fn create_child(&self, operation: impl Into<String>) -> Self {
101 Self {
102 correlation_id: generate_correlation_id(),
103 parent_id: Some(self.correlation_id.clone()),
104 operation: operation.into(),
105 service: self.service.clone(),
106 user_id: self.user_id.clone(),
107 session_id: self.session_id.clone(),
108 request_id: self.request_id.clone(),
109 trace_id: self.trace_id.clone(),
110 span_id: None, attributes: self.attributes.clone(),
112 created_at: chrono::Utc::now(),
113 }
114 }
115
116 pub fn with_user_id(mut self, user_id: impl Into<String>) -> Self {
118 self.user_id = Some(user_id.into());
119 self
120 }
121
122 pub fn with_session_id(mut self, session_id: impl Into<String>) -> Self {
124 self.session_id = Some(session_id.into());
125 self
126 }
127
128 pub fn with_request_id(mut self, request_id: impl Into<String>) -> Self {
130 self.request_id = Some(request_id.into());
131 self
132 }
133
134 pub fn with_trace_id(mut self, trace_id: impl Into<String>) -> Self {
136 self.trace_id = Some(trace_id.into());
137 self
138 }
139
140 pub fn with_span_id(mut self, span_id: impl Into<String>) -> Self {
142 self.span_id = Some(span_id.into());
143 self
144 }
145
146 pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
148 self.attributes.insert(key.into(), value.into());
149 self
150 }
151
152 pub fn get_attribute(&self, key: &str) -> Option<&String> {
154 self.attributes.get(key)
155 }
156}
157
158#[derive(Debug)]
160pub struct CorrelationTracker {
161 contexts: Arc<RwLock<HashMap<CorrelationId, CorrelationContext>>>,
162}
163
164impl CorrelationTracker {
165 pub fn new() -> Self {
167 Self {
168 contexts: Arc::new(RwLock::new(HashMap::new())),
169 }
170 }
171
172 pub fn register(&self, correlation_id: CorrelationId) {
174 tracing::debug!(correlation_id = %correlation_id, "Registered correlation ID");
176 }
177
178 pub async fn store_context(&self, context: CorrelationContext) {
180 let correlation_id = context.correlation_id.clone();
181 self.contexts.write().await.insert(correlation_id.clone(), context);
182
183 tracing::debug!(
184 correlation_id = %correlation_id,
185 "Stored correlation context"
186 );
187 }
188
189 pub async fn get_context(&self, correlation_id: &CorrelationId) -> Option<CorrelationContext> {
191 self.contexts.read().await.get(correlation_id).cloned()
192 }
193
194 pub async fn remove_context(&self, correlation_id: &CorrelationId) -> Option<CorrelationContext> {
196 let context = self.contexts.write().await.remove(correlation_id);
197
198 if context.is_some() {
199 tracing::debug!(
200 correlation_id = %correlation_id,
201 "Removed correlation context"
202 );
203 }
204
205 context
206 }
207
208 pub async fn get_active_contexts(&self) -> Vec<CorrelationContext> {
210 self.contexts.read().await.values().cloned().collect()
211 }
212
213 pub async fn cleanup_old_contexts(&self, max_age: chrono::Duration) {
215 let cutoff = chrono::Utc::now() - max_age;
216 let mut contexts = self.contexts.write().await;
217
218 let initial_count = contexts.len();
219 contexts.retain(|_, context| context.created_at > cutoff);
220 let final_count = contexts.len();
221
222 if initial_count > final_count {
223 tracing::info!(
224 removed = initial_count - final_count,
225 remaining = final_count,
226 "Cleaned up old correlation contexts"
227 );
228 }
229 }
230}
231
232impl Default for CorrelationTracker {
233 fn default() -> Self {
234 Self::new()
235 }
236}
237
238#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct RequestContext {
241 pub correlation_context: CorrelationContext,
242 pub request_path: Option<String>,
243 pub http_method: Option<String>,
244 pub user_agent: Option<String>,
245 pub client_ip: Option<String>,
246 pub request_headers: HashMap<String, String>,
247 pub started_at: chrono::DateTime<chrono::Utc>,
248}
249
250impl RequestContext {
251 pub fn new(operation: impl Into<String>, service: impl Into<String>) -> Self {
253 Self {
254 correlation_context: CorrelationContext::new(operation, service),
255 request_path: None,
256 http_method: None,
257 user_agent: None,
258 client_ip: None,
259 request_headers: HashMap::new(),
260 started_at: chrono::Utc::now(),
261 }
262 }
263
264 pub fn correlation_id(&self) -> &CorrelationId {
266 &self.correlation_context.correlation_id
267 }
268
269 pub fn with_request_metadata(
271 mut self,
272 path: Option<String>,
273 method: Option<String>,
274 user_agent: Option<String>,
275 client_ip: Option<String>,
276 ) -> Self {
277 self.request_path = path;
278 self.http_method = method;
279 self.user_agent = user_agent;
280 self.client_ip = client_ip;
281 self
282 }
283
284 pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
286 self.request_headers.insert(name.into(), value.into());
287 self
288 }
289
290 pub fn duration(&self) -> chrono::Duration {
292 chrono::Utc::now() - self.started_at
293 }
294}
295
296#[derive(Debug, Clone, Serialize, Deserialize)]
298pub struct TraceCorrelation {
299 pub correlation_id: CorrelationId,
300 pub trace_id: String,
301 pub span_id: String,
302 pub parent_span_id: Option<String>,
303 pub service_name: String,
304 pub operation_name: String,
305 pub baggage: HashMap<String, String>,
306}
307
308impl TraceCorrelation {
309 pub fn new(
311 trace_id: impl Into<String>,
312 span_id: impl Into<String>,
313 service_name: impl Into<String>,
314 operation_name: impl Into<String>,
315 ) -> Self {
316 Self {
317 correlation_id: generate_correlation_id(),
318 trace_id: trace_id.into(),
319 span_id: span_id.into(),
320 parent_span_id: None,
321 service_name: service_name.into(),
322 operation_name: operation_name.into(),
323 baggage: HashMap::new(),
324 }
325 }
326
327 pub fn create_child(
329 &self,
330 span_id: impl Into<String>,
331 operation_name: impl Into<String>,
332 ) -> Self {
333 Self {
334 correlation_id: generate_correlation_id(),
335 trace_id: self.trace_id.clone(),
336 span_id: span_id.into(),
337 parent_span_id: Some(self.span_id.clone()),
338 service_name: self.service_name.clone(),
339 operation_name: operation_name.into(),
340 baggage: self.baggage.clone(),
341 }
342 }
343
344 pub fn with_baggage(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
346 self.baggage.insert(key.into(), value.into());
347 self
348 }
349
350 pub fn get_baggage(&self, key: &str) -> Option<&String> {
352 self.baggage.get(key)
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use super::*;
359
360 #[test]
361 fn test_correlation_id_generation() {
362 let id1 = generate_correlation_id();
363 let id2 = generate_correlation_id();
364
365 assert_ne!(id1, id2);
366 assert!(!id1.as_str().is_empty());
367 assert!(!id2.as_str().is_empty());
368 }
369
370 #[test]
371 fn test_correlation_context_creation() {
372 let context = CorrelationContext::new("test_operation", "test_service");
373
374 assert_eq!(context.operation, "test_operation");
375 assert_eq!(context.service, "test_service");
376 assert!(context.parent_id.is_none());
377 assert!(!context.correlation_id.as_str().is_empty());
378 }
379
380 #[test]
381 fn test_child_context_creation() {
382 let parent = CorrelationContext::new("parent_op", "test_service");
383 let child = parent.create_child("child_op");
384
385 assert_eq!(child.operation, "child_op");
386 assert_eq!(child.service, "test_service");
387 assert_eq!(child.parent_id, Some(parent.correlation_id));
388 assert_ne!(child.correlation_id, parent.correlation_id);
389 }
390
391 #[tokio::test]
392 async fn test_correlation_tracker() {
393 let tracker = CorrelationTracker::new();
394 let context = CorrelationContext::new("test_op", "test_service");
395 let correlation_id = context.correlation_id.clone();
396
397 tracker.store_context(context.clone()).await;
398
399 let retrieved = tracker.get_context(&correlation_id).await;
400 assert!(retrieved.is_some());
401 assert_eq!(retrieved.unwrap().operation, "test_op");
402
403 let removed = tracker.remove_context(&correlation_id).await;
404 assert!(removed.is_some());
405
406 let not_found = tracker.get_context(&correlation_id).await;
407 assert!(not_found.is_none());
408 }
409
410 #[test]
411 fn test_request_context() {
412 let request_ctx = RequestContext::new("http_request", "web_service")
413 .with_request_metadata(
414 Some("/api/events".to_string()),
415 Some("POST".to_string()),
416 Some("test-agent/1.0".to_string()),
417 Some("192.168.1.1".to_string()),
418 )
419 .with_header("Authorization", "Bearer token123");
420
421 assert_eq!(request_ctx.request_path, Some("/api/events".to_string()));
422 assert_eq!(request_ctx.http_method, Some("POST".to_string()));
423 assert_eq!(request_ctx.request_headers.get("Authorization"), Some(&"Bearer token123".to_string()));
424 }
425
426 #[test]
427 fn test_trace_correlation() {
428 let trace = TraceCorrelation::new("trace123", "span456", "eventuali", "create_event")
429 .with_baggage("user_id", "user789");
430
431 assert_eq!(trace.trace_id, "trace123");
432 assert_eq!(trace.span_id, "span456");
433 assert_eq!(trace.service_name, "eventuali");
434 assert_eq!(trace.operation_name, "create_event");
435 assert_eq!(trace.get_baggage("user_id"), Some(&"user789".to_string()));
436
437 let child = trace.create_child("child_span", "save_event");
438 assert_eq!(child.trace_id, trace.trace_id);
439 assert_eq!(child.parent_span_id, Some(trace.span_id));
440 assert_eq!(child.get_baggage("user_id"), Some(&"user789".to_string()));
441 }
442}