eventuali_core/observability/
telemetry.rs1use crate::error::Result;
6use crate::observability::correlation::{CorrelationId, generate_correlation_id};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use uuid::Uuid;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct ObservabilityConfig {
16 pub tracing_enabled: bool,
17 pub metrics_enabled: bool,
18 pub structured_logging: bool,
19 pub jaeger_endpoint: Option<String>,
20 pub prometheus_endpoint: Option<String>,
21 pub service_name: String,
22 pub service_version: String,
23 pub environment: String,
24 pub sample_rate: f64,
25 pub max_events_per_span: u32,
26 pub export_timeout_millis: u64,
27}
28
29impl Default for ObservabilityConfig {
30 fn default() -> Self {
31 Self {
32 tracing_enabled: true,
33 metrics_enabled: true,
34 structured_logging: true,
35 jaeger_endpoint: Some("http://localhost:14268/api/traces".to_string()),
36 prometheus_endpoint: Some("http://localhost:9090".to_string()),
37 service_name: "eventuali".to_string(),
38 service_version: "0.1.0".to_string(),
39 environment: "development".to_string(),
40 sample_rate: 1.0, max_events_per_span: 128,
42 export_timeout_millis: 30000,
43 }
44 }
45}
46
47#[derive(Debug)]
49pub struct TelemetryProvider {
50 config: ObservabilityConfig,
51 active_traces: Arc<RwLock<HashMap<CorrelationId, TraceContext>>>,
52}
53
54impl TelemetryProvider {
55 pub async fn new(config: &ObservabilityConfig) -> Result<Self> {
57 Ok(Self {
58 config: config.clone(),
59 active_traces: Arc::new(RwLock::new(HashMap::new())),
60 })
61 }
62
63 pub async fn initialize(&self) -> Result<()> {
65 tracing::info!(
66 service_name = %self.config.service_name,
67 service_version = %self.config.service_version,
68 environment = %self.config.environment,
69 "Telemetry provider initialized"
70 );
71
72 Ok(())
73 }
74
75 pub async fn create_trace(&self, operation: &str, _parent: Option<&TraceContext>) -> TraceContext {
77 let correlation_id = generate_correlation_id();
78
79 let trace_context = TraceContext::new(operation.to_string(), correlation_id.clone());
80
81 self.active_traces.write().await.insert(correlation_id, trace_context.clone());
83
84 trace_context
85 }
86
87 pub async fn get_trace(&self, correlation_id: &CorrelationId) -> Option<TraceContext> {
89 self.active_traces.read().await.get(correlation_id).cloned()
90 }
91
92 pub async fn end_trace(&self, trace: &TraceContext) {
94 self.active_traces.write().await.remove(&trace.correlation_id);
95
96 let duration = trace.start_time.elapsed();
97 tracing::debug!(
98 operation = %trace.operation,
99 correlation_id = %trace.correlation_id,
100 duration_ms = duration.as_millis(),
101 "Trace completed"
102 );
103 }
104
105 pub async fn shutdown(&self) -> Result<()> {
107 self.active_traces.write().await.clear();
109
110 tracing::info!("Telemetry provider shut down successfully");
111 Ok(())
112 }
113}
114
115#[derive(Debug, Clone)]
117pub struct TraceContext {
118 pub operation: String,
119 pub correlation_id: CorrelationId,
120 pub start_time: std::time::Instant,
121 pub attributes: HashMap<String, String>,
122 pub trace_id: Option<String>,
123 pub span_id: Option<String>,
124}
125
126impl TraceContext {
127 pub fn new(operation: String, correlation_id: CorrelationId) -> Self {
128 Self {
129 operation,
130 correlation_id,
131 start_time: std::time::Instant::now(),
132 attributes: HashMap::new(),
133 trace_id: Some(uuid::Uuid::new_v4().to_string()),
134 span_id: Some(uuid::Uuid::new_v4().to_string()),
135 }
136 }
137
138 pub fn add_attribute(&mut self, key: &str, value: &str) {
140 self.attributes.insert(key.to_string(), value.to_string());
141
142 tracing::debug!(
143 trace_id = ?self.trace_id,
144 span_id = ?self.span_id,
145 correlation_id = %self.correlation_id,
146 key = key,
147 value = value,
148 "Added trace attribute"
149 );
150 }
151
152 pub fn add_event(&self, name: &str, attributes: HashMap<String, String>) {
154 tracing::info!(
155 trace_id = ?self.trace_id,
156 span_id = ?self.span_id,
157 correlation_id = %self.correlation_id,
158 event_name = name,
159 ?attributes,
160 "Trace event"
161 );
162 }
163
164 pub fn record_error(&self, error: &dyn std::error::Error) {
166 tracing::error!(
167 trace_id = ?self.trace_id,
168 span_id = ?self.span_id,
169 correlation_id = %self.correlation_id,
170 error = %error,
171 "Trace error recorded"
172 );
173 }
174
175 pub fn elapsed(&self) -> std::time::Duration {
177 self.start_time.elapsed()
178 }
179}
180
181#[derive(Debug)]
183pub struct TracingService {
184 provider: Arc<TelemetryProvider>,
185}
186
187impl TracingService {
188 pub fn new(provider: Arc<TelemetryProvider>) -> Self {
189 Self { provider }
190 }
191
192 pub async fn start_trace(&self, operation: &str) -> TraceContext {
194 self.provider.create_trace(operation, None).await
195 }
196
197 pub async fn start_child_trace(&self, operation: &str, parent: &TraceContext) -> TraceContext {
199 self.provider.create_trace(operation, Some(parent)).await
200 }
201
202 pub async fn end_trace(&self, trace: TraceContext) {
204 self.provider.end_trace(&trace).await;
205 }
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
210pub struct EventTrace {
211 pub event_id: Uuid,
212 pub trace_id: String,
213 pub span_id: String,
214 pub operation: String,
215 pub correlation_id: CorrelationId,
216 pub timestamp: chrono::DateTime<chrono::Utc>,
217 pub duration_ns: Option<u64>,
218 pub attributes: HashMap<String, String>,
219 pub status: TraceStatus,
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
223pub enum TraceStatus {
224 Ok,
225 Error(String),
226 Cancelled,
227}
228
229pub struct SpanBuilder {
231 operation: String,
232 attributes: HashMap<String, String>,
233 parent: Option<TraceContext>,
234}
235
236impl SpanBuilder {
237 pub fn new(operation: &str) -> Self {
238 Self {
239 operation: operation.to_string(),
240 attributes: HashMap::new(),
241 parent: None,
242 }
243 }
244
245 pub fn with_attribute(mut self, key: &str, value: &str) -> Self {
246 self.attributes.insert(key.to_string(), value.to_string());
247 self
248 }
249
250 pub fn with_parent(mut self, parent: TraceContext) -> Self {
251 self.parent = Some(parent);
252 self
253 }
254
255 pub async fn start(self, provider: &TelemetryProvider) -> TraceContext {
256 let mut trace = provider.create_trace(&self.operation, self.parent.as_ref()).await;
257
258 for (key, value) in self.attributes {
259 trace.add_attribute(&key, &value);
260 }
261
262 trace
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269
270 #[tokio::test]
271 async fn test_telemetry_provider_creation() {
272 let config = ObservabilityConfig::default();
273 let provider = TelemetryProvider::new(&config).await.unwrap();
274 assert_eq!(provider.config.service_name, "eventuali");
275 }
276
277 #[tokio::test]
278 async fn test_trace_context_creation() {
279 let config = ObservabilityConfig {
280 jaeger_endpoint: None, ..ObservabilityConfig::default()
282 };
283 let provider = TelemetryProvider::new(&config).await.unwrap();
284
285 let trace = provider.create_trace("test_operation", None).await;
286 assert_eq!(trace.operation, "test_operation");
287 assert!(!trace.correlation_id.to_string().is_empty());
288 }
289
290 #[tokio::test]
291 async fn test_trace_attributes() {
292 let config = ObservabilityConfig {
293 jaeger_endpoint: None,
294 ..ObservabilityConfig::default()
295 };
296 let provider = TelemetryProvider::new(&config).await.unwrap();
297
298 let mut trace = provider.create_trace("test_operation", None).await;
299 trace.add_attribute("test_key", "test_value");
300
301 assert_eq!(trace.attributes.get("test_key"), Some(&"test_value".to_string()));
302 }
303}