1use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22#[derive(Debug, Clone)]
24pub struct ObservabilityConfig {
25 pub enable_tracing: bool,
27 pub enable_metrics: bool,
29 pub trace_sample_rate: f64,
31 pub service_name: String,
33}
34
35impl Default for ObservabilityConfig {
36 fn default() -> Self {
37 Self {
38 enable_tracing: true,
39 enable_metrics: true,
40 trace_sample_rate: 1.0,
41 service_name: "llmkit".to_string(),
42 }
43 }
44}
45
46#[derive(Debug, Clone)]
48pub struct RequestSpan {
49 pub request_id: String,
51 pub parent_span_id: Option<String>,
53 pub operation: String,
55 pub start_time: Instant,
57 pub metadata: Vec<(String, String)>,
59}
60
61impl RequestSpan {
62 pub fn new(operation: impl Into<String>) -> Self {
64 Self {
65 request_id: uuid::Uuid::new_v4().to_string(),
66 parent_span_id: None,
67 operation: operation.into(),
68 start_time: Instant::now(),
69 metadata: Vec::new(),
70 }
71 }
72
73 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
75 self.metadata.push((key.into(), value.into()));
76 self
77 }
78
79 pub fn elapsed(&self) -> Duration {
81 self.start_time.elapsed()
82 }
83
84 pub fn elapsed_ms(&self) -> f64 {
86 self.elapsed().as_secs_f64() * 1000.0
87 }
88}
89
90#[derive(Debug)]
92pub struct MetricsRecorder {
93 total_requests: Arc<AtomicU64>,
95 total_errors: Arc<AtomicU64>,
97 total_latency_ms: Arc<AtomicU64>,
99 config: ObservabilityConfig,
101}
102
103impl MetricsRecorder {
104 pub fn new(config: ObservabilityConfig) -> Self {
106 Self {
107 total_requests: Arc::new(AtomicU64::new(0)),
108 total_errors: Arc::new(AtomicU64::new(0)),
109 total_latency_ms: Arc::new(AtomicU64::new(0)),
110 config,
111 }
112 }
113
114 pub fn record_success(&self, latency_ms: f64) {
116 if !self.config.enable_metrics {
117 return;
118 }
119
120 self.total_requests.fetch_add(1, Ordering::Relaxed);
121 self.total_latency_ms
122 .fetch_add(latency_ms as u64, Ordering::Relaxed);
123 }
124
125 pub fn record_error(&self, latency_ms: f64) {
127 if !self.config.enable_metrics {
128 return;
129 }
130
131 self.total_requests.fetch_add(1, Ordering::Relaxed);
132 self.total_errors.fetch_add(1, Ordering::Relaxed);
133 self.total_latency_ms
134 .fetch_add(latency_ms as u64, Ordering::Relaxed);
135 }
136
137 pub fn snapshot(&self) -> MetricsSnapshot {
139 let total_requests = self.total_requests.load(Ordering::Acquire);
140 let total_errors = self.total_errors.load(Ordering::Acquire);
141 let total_latency_ms = self.total_latency_ms.load(Ordering::Acquire);
142
143 let error_rate = if total_requests > 0 {
144 total_errors as f64 / total_requests as f64
145 } else {
146 0.0
147 };
148
149 let avg_latency_ms = if total_requests > 0 {
150 total_latency_ms as f64 / total_requests as f64
151 } else {
152 0.0
153 };
154
155 MetricsSnapshot {
156 total_requests,
157 total_errors,
158 error_rate,
159 average_latency_ms: avg_latency_ms,
160 }
161 }
162
163 pub fn reset(&self) {
165 self.total_requests.store(0, Ordering::Release);
166 self.total_errors.store(0, Ordering::Release);
167 self.total_latency_ms.store(0, Ordering::Release);
168 }
169}
170
171impl Clone for MetricsRecorder {
172 fn clone(&self) -> Self {
173 Self {
174 total_requests: Arc::clone(&self.total_requests),
175 total_errors: Arc::clone(&self.total_errors),
176 total_latency_ms: Arc::clone(&self.total_latency_ms),
177 config: self.config.clone(),
178 }
179 }
180}
181
182#[derive(Debug, Clone, Copy)]
184pub struct MetricsSnapshot {
185 pub total_requests: u64,
187 pub total_errors: u64,
189 pub error_rate: f64,
191 pub average_latency_ms: f64,
193}
194
195#[derive(Debug, Clone)]
197pub struct TracingContext {
198 pub trace_id: String,
200 pub span_id: String,
202 pub parent_span_id: Option<String>,
204 pub baggage: Vec<(String, String)>,
206}
207
208impl Default for TracingContext {
209 fn default() -> Self {
210 Self {
211 trace_id: uuid::Uuid::new_v4().to_string(),
212 span_id: uuid::Uuid::new_v4().to_string(),
213 parent_span_id: None,
214 baggage: Vec::new(),
215 }
216 }
217}
218
219#[derive(Debug)]
221pub struct Observability {
222 config: ObservabilityConfig,
224 metrics: MetricsRecorder,
226}
227
228impl Observability {
229 pub fn new(config: ObservabilityConfig) -> Self {
231 Self {
232 metrics: MetricsRecorder::new(config.clone()),
233 config,
234 }
235 }
236
237 pub fn start_span(&self, operation: impl Into<String>) -> RequestSpan {
239 RequestSpan::new(operation)
240 }
241
242 pub fn record_request(&self, span: &RequestSpan, success: bool) {
244 let latency_ms = span.elapsed_ms();
245
246 if success {
247 self.metrics.record_success(latency_ms);
248 } else {
249 self.metrics.record_error(latency_ms);
250 }
251 }
252
253 pub fn metrics(&self) -> MetricsSnapshot {
255 self.metrics.snapshot()
256 }
257
258 pub fn reset_metrics(&self) {
260 self.metrics.reset();
261 }
262
263 pub fn create_context(&self) -> TracingContext {
265 TracingContext::default()
266 }
267
268 pub fn config(&self) -> &ObservabilityConfig {
270 &self.config
271 }
272}
273
274impl Default for Observability {
275 fn default() -> Self {
276 Self::new(ObservabilityConfig::default())
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283 use std::thread;
284
285 #[test]
286 fn test_request_span_creation() {
287 let span = RequestSpan::new("test_operation");
288 assert_eq!(span.operation, "test_operation");
289 assert!(!span.request_id.is_empty());
290 assert!(span.elapsed_ms() >= 0.0);
291 }
292
293 #[test]
294 fn test_request_span_metadata() {
295 let span = RequestSpan::new("test")
296 .with_metadata("provider", "openai")
297 .with_metadata("model", "gpt-4");
298
299 assert_eq!(span.metadata.len(), 2);
300 assert_eq!(span.metadata[0].0, "provider");
301 assert_eq!(span.metadata[0].1, "openai");
302 }
303
304 #[test]
305 fn test_metrics_recorder_success() {
306 let config = ObservabilityConfig::default();
307 let recorder = MetricsRecorder::new(config);
308
309 recorder.record_success(50.0);
310 recorder.record_success(75.0);
311
312 let snapshot = recorder.snapshot();
313 assert_eq!(snapshot.total_requests, 2);
314 assert_eq!(snapshot.total_errors, 0);
315 assert_eq!(snapshot.error_rate, 0.0);
316 assert!((snapshot.average_latency_ms - 62.5).abs() < 0.1);
317 }
318
319 #[test]
320 fn test_metrics_recorder_errors() {
321 let config = ObservabilityConfig::default();
322 let recorder = MetricsRecorder::new(config);
323
324 recorder.record_success(50.0);
325 recorder.record_error(100.0);
326 recorder.record_error(150.0);
327
328 let snapshot = recorder.snapshot();
329 assert_eq!(snapshot.total_requests, 3);
330 assert_eq!(snapshot.total_errors, 2);
331 assert!((snapshot.error_rate - 2.0 / 3.0).abs() < 0.01);
332 }
333
334 #[test]
335 fn test_metrics_recorder_disabled() {
336 let config = ObservabilityConfig {
337 enable_metrics: false,
338 ..Default::default()
339 };
340
341 let recorder = MetricsRecorder::new(config);
342 recorder.record_success(50.0);
343 recorder.record_success(75.0);
344
345 let snapshot = recorder.snapshot();
346 assert_eq!(snapshot.total_requests, 0);
347 assert_eq!(snapshot.total_errors, 0);
348 }
349
350 #[test]
351 fn test_metrics_recorder_clone() {
352 let config = ObservabilityConfig::default();
353 let recorder1 = MetricsRecorder::new(config);
354 let recorder2 = recorder1.clone();
355
356 recorder1.record_success(50.0);
357 let snapshot = recorder2.snapshot();
358
359 assert_eq!(snapshot.total_requests, 1);
360 }
361
362 #[test]
363 fn test_metrics_recorder_reset() {
364 let config = ObservabilityConfig::default();
365 let recorder = MetricsRecorder::new(config);
366
367 recorder.record_success(50.0);
368 recorder.record_success(75.0);
369
370 let snapshot = recorder.snapshot();
371 assert_eq!(snapshot.total_requests, 2);
372
373 recorder.reset();
374 let snapshot = recorder.snapshot();
375 assert_eq!(snapshot.total_requests, 0);
376 }
377
378 #[test]
379 fn test_observability_integration() {
380 let obs = Observability::default();
381 let span = obs.start_span("test_operation");
382
383 thread::sleep(Duration::from_millis(10));
384 obs.record_request(&span, true);
385
386 let metrics = obs.metrics();
387 assert_eq!(metrics.total_requests, 1);
388 assert_eq!(metrics.total_errors, 0);
389 assert!(metrics.average_latency_ms >= 10.0);
390 }
391
392 #[test]
393 fn test_tracing_context() {
394 let ctx = TracingContext::default();
395 assert!(!ctx.trace_id.is_empty());
396 assert!(!ctx.span_id.is_empty());
397 assert!(ctx.parent_span_id.is_none());
398 assert!(ctx.baggage.is_empty());
399 }
400
401 #[test]
402 fn test_observability_disabled_tracing() {
403 let config = ObservabilityConfig {
404 enable_tracing: false,
405 ..Default::default()
406 };
407
408 let obs = Observability::new(config);
409 let span = obs.start_span("test");
410
411 assert_eq!(span.operation, "test");
412 }
414
415 #[tokio::test]
416 async fn test_concurrent_metrics() {
417 let config = ObservabilityConfig::default();
418 let recorder = MetricsRecorder::new(config);
419 let mut set = tokio::task::JoinSet::new();
420
421 for i in 0..10 {
423 let rec = recorder.clone();
424 set.spawn(async move {
425 for j in 0..10 {
426 let latency = ((i * 10 + j) as f64) * 1.5;
427 if (i + j) % 3 == 0 {
428 rec.record_error(latency);
429 } else {
430 rec.record_success(latency);
431 }
432 }
433 });
434 }
435
436 while (set.join_next().await).is_some() {}
437
438 let snapshot = recorder.snapshot();
439 assert_eq!(snapshot.total_requests, 100);
440 assert!(snapshot.total_errors > 0);
441 assert!(snapshot.average_latency_ms > 0.0);
442 }
443}