Skip to main content

camel_core/shared/observability/adapters/
tracer.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5use std::time::Instant;
6
7use opentelemetry::trace::{SpanKind, Status, TraceContextExt, Tracer};
8use opentelemetry::{Context as OtelContext, KeyValue, global};
9use tower::Service;
10use tower::ServiceExt;
11use tracing::Instrument;
12
13use crate::shared::observability::domain::DetailLevel;
14use camel_api::metrics::MetricsCollector;
15use camel_api::{Body, BoxProcessor, CamelError, Exchange};
16
17/// RAII guard that ensures an OTel span is ended when dropped.
18///
19/// This prevents span leaks if the inner processor panics or returns early.
20struct SpanEndGuard(OtelContext);
21
22impl Drop for SpanEndGuard {
23    fn drop(&mut self) {
24        self.0.span().end();
25    }
26}
27
28/// Returns a human-readable name for the body type variant.
29fn body_type_name(body: &Body) -> &'static str {
30    match body {
31        Body::Empty => "empty",
32        Body::Bytes(_) => "bytes",
33        Body::Text(_) => "text",
34        Body::Json(_) => "json",
35        Body::Xml(_) => "xml",
36        Body::Stream(_) => "stream",
37    }
38}
39
40/// A processor wrapper that emits tracing spans for each step.
41///
42/// This processor wraps another processor and adds distributed tracing by:
43/// 1. Starting a native OpenTelemetry span for each exchange
44/// 2. Propagating the OTel context through `exchange.otel_context`
45/// 3. Recording errors and status on the span
46///
47/// When no OTel provider is configured (noop provider), spans are no-ops with minimal overhead.
48pub struct TracingProcessor {
49    inner: BoxProcessor,
50    route_id: String,
51    step_id: String,
52    span_name: String,
53    step_index: usize,
54    detail_level: DetailLevel,
55    metrics: Option<Arc<dyn MetricsCollector>>,
56}
57
58impl TracingProcessor {
59    /// Wrap a processor with tracing.
60    pub fn new(
61        inner: BoxProcessor,
62        route_id: String,
63        step_index: usize,
64        detail_level: DetailLevel,
65        metrics: Option<Arc<dyn MetricsCollector>>,
66    ) -> Self {
67        let step_id = format!("step-{}", step_index);
68        let span_name = format!("{route_id}:{step_id}");
69        Self {
70            inner,
71            route_id,
72            step_id,
73            span_name,
74            step_index,
75            detail_level,
76            metrics,
77        }
78    }
79}
80
81impl Service<Exchange> for TracingProcessor {
82    type Response = Exchange;
83    type Error = CamelError;
84    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
85
86    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87        self.inner.poll_ready(cx)
88    }
89
90    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
91        let start = Instant::now();
92        let span_name = self.span_name.clone();
93
94        // Get the global tracer (noop if no provider is configured)
95        let tracer = global::tracer("camel-core");
96
97        // Extract parent context from exchange.otel_context
98        let parent_cx = exchange.otel_context.clone();
99
100        // Build span attributes
101        let mut attributes = [
102            KeyValue::new("messaging.system", "camel"),
103            KeyValue::new("correlation_id", exchange.correlation_id().to_string()),
104            KeyValue::new("route_id", self.route_id.clone()),
105            KeyValue::new("step_id", self.step_id.clone()),
106            KeyValue::new("step_index", self.step_index as i64),
107            KeyValue::new("headers_count", 0i64),
108            KeyValue::new("body_type", ""),
109            KeyValue::new("has_error", false),
110        ];
111        let mut attr_count = 5;
112
113        if self.detail_level >= DetailLevel::Medium {
114            attributes[5] = KeyValue::new("headers_count", exchange.input.headers.len() as i64);
115            attributes[6] = KeyValue::new("body_type", body_type_name(&exchange.input.body));
116            attributes[7] = KeyValue::new("has_error", exchange.has_error());
117            attr_count = 8;
118        }
119
120        // Start a new span as a child of the parent context
121        let span = tracer
122            .span_builder(span_name)
123            .with_kind(SpanKind::Internal)
124            .with_attributes(attributes[..attr_count].iter().cloned())
125            .start_with_context(&tracer, &parent_cx);
126
127        // Create new context with this span as the active span
128        let cx = OtelContext::current_with_span(span);
129
130        // Store back into exchange so downstream processors inherit this context
131        exchange.otel_context = cx.clone();
132
133        // Also create a tracing span for local dev logging
134        let tracing_span = tracing::info_span!(
135            target: "camel_tracer",
136            "step",
137            correlation_id = %exchange.correlation_id(),
138            route_id = %self.route_id,
139            step_id = %self.step_id,
140            step_index = self.step_index,
141            duration_ms = tracing::field::Empty,
142            status = tracing::field::Empty,
143            headers_count = tracing::field::Empty,
144            body_type = tracing::field::Empty,
145            has_error = tracing::field::Empty,
146            output_body_type = tracing::field::Empty,
147            header_0 = tracing::field::Empty,
148            header_1 = tracing::field::Empty,
149            header_2 = tracing::field::Empty,
150            error = tracing::field::Empty,
151            error_type = tracing::field::Empty,
152        );
153
154        if self.detail_level >= DetailLevel::Medium {
155            tracing_span.record("headers_count", exchange.input.headers.len() as u64);
156            tracing_span.record("body_type", body_type_name(&exchange.input.body));
157            tracing_span.record("has_error", exchange.has_error());
158        }
159
160        if self.detail_level >= DetailLevel::Full {
161            let headers: Vec<_> = exchange.input.headers.iter().take(3).collect();
162            if let Some((k, v)) = headers.first() {
163                tracing_span.record("header_0", format!("{k}={v:?}"));
164            }
165            if let Some((k, v)) = headers.get(1) {
166                tracing_span.record("header_1", format!("{k}={v:?}"));
167            }
168            if let Some((k, v)) = headers.get(2) {
169                tracing_span.record("header_2", format!("{k}={v:?}"));
170            }
171        }
172
173        let mut inner = self.inner.clone();
174        let detail_level = self.detail_level.clone();
175        let metrics = self.metrics.clone();
176        let route_id = self.route_id.clone();
177
178        Box::pin(
179            async move {
180                // Note: ContextGuard is not Send (it uses thread-local storage), so we cannot
181                // hold it across await points in an async fn. Instead, we propagate the OTel
182                // context through exchange.otel_context, which is Send + Sync.
183
184                // Create guard to ensure span is ended even on panic
185                let _guard = SpanEndGuard(cx.clone());
186
187                let result = inner.ready().await?.call(exchange).await;
188
189                let duration = start.elapsed();
190                let duration_ms = duration.as_millis() as u64;
191                tracing::Span::current().record("duration_ms", duration_ms);
192
193                // Record duration on OTel span
194                cx.span()
195                    .set_attribute(KeyValue::new("duration_ms", duration_ms as i64));
196
197                // Record metrics if collector is present
198                if let Some(ref metrics) = metrics {
199                    metrics.record_exchange_duration(&route_id, duration);
200                    metrics.increment_exchanges(&route_id);
201
202                    if let Err(e) = &result {
203                        metrics.increment_errors(&route_id, e.classify());
204                    }
205                }
206
207                match &result {
208                    Ok(ex) => {
209                        tracing::Span::current().record("status", "success");
210                        cx.span().set_status(Status::Ok);
211
212                        if detail_level >= DetailLevel::Medium {
213                            tracing::Span::current()
214                                .record("output_body_type", body_type_name(&ex.input.body));
215                            cx.span().set_attribute(KeyValue::new(
216                                "output_body_type",
217                                body_type_name(&ex.input.body),
218                            ));
219                        }
220                    }
221                    Err(e) => {
222                        let error_class = e.classify();
223                        cx.span().set_status(Status::error(e.to_string()));
224                        cx.span().add_event(
225                            "error",
226                            vec![
227                                KeyValue::new("error.type", error_class.to_string()),
228                                KeyValue::new("error.message", e.to_string()),
229                            ],
230                        );
231                        tracing::Span::current().record("status", "error");
232                        tracing::Span::current().record("error", e.to_string());
233                        tracing::Span::current().record("error_type", error_class);
234                    }
235                }
236
237                // Span is ended by _guard when it drops here
238                result
239            }
240            .instrument(tracing_span),
241        )
242    }
243}
244
245impl Clone for TracingProcessor {
246    fn clone(&self) -> Self {
247        Self {
248            inner: self.inner.clone(),
249            route_id: self.route_id.clone(),
250            step_id: self.step_id.clone(),
251            span_name: self.span_name.clone(),
252            step_index: self.step_index,
253            detail_level: self.detail_level.clone(),
254            metrics: self.metrics.clone(),
255        }
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    //! Tests for TracingProcessor.
262    //!
263    //! These tests use the noop OTel provider, which means:
264    //! - Spans are created but not exported
265    //! - Span contexts may not have valid trace/span IDs
266    //! - Error recording on spans cannot be verified
267    //!
268    //! Full span hierarchy verification (trace ID matching, parent span ID, error recording)
269    //! requires an integration test with a real exporter, which will be covered in Task 11
270    //! (integration tests).
271
272    use super::*;
273    use camel_api::{BoxProcessorExt, IdentityProcessor, Message, Value};
274    use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState};
275    use tower::ServiceExt;
276
277    #[tokio::test]
278    async fn test_tracing_processor_minimal() {
279        let inner = BoxProcessor::new(IdentityProcessor);
280        let mut tracer = TracingProcessor::new(
281            inner,
282            "test-route".to_string(),
283            0,
284            DetailLevel::Minimal,
285            None,
286        );
287
288        let exchange = Exchange::new(Message::default());
289        let result = tracer.ready().await.unwrap().call(exchange).await;
290
291        assert!(result.is_ok());
292    }
293
294    #[tokio::test]
295    async fn test_tracing_processor_medium_detail() {
296        let inner = BoxProcessor::new(IdentityProcessor);
297        let mut tracer = TracingProcessor::new(
298            inner,
299            "test-route".to_string(),
300            0,
301            DetailLevel::Medium,
302            None,
303        );
304
305        let exchange = Exchange::new(Message::default());
306        let result = tracer.ready().await.unwrap().call(exchange).await;
307
308        assert!(result.is_ok());
309    }
310
311    #[tokio::test]
312    async fn test_tracing_processor_full_detail() {
313        let inner = BoxProcessor::new(IdentityProcessor);
314        let mut tracer =
315            TracingProcessor::new(inner, "test-route".to_string(), 0, DetailLevel::Full, None);
316
317        let mut exchange = Exchange::new(Message::default());
318        exchange
319            .input
320            .headers
321            .insert("test".to_string(), Value::String("value".into()));
322
323        let result = tracer.ready().await.unwrap().call(exchange).await;
324
325        assert!(result.is_ok());
326    }
327
328    #[tokio::test]
329    async fn test_tracing_processor_clone() {
330        let inner = BoxProcessor::new(IdentityProcessor);
331        let tracer = TracingProcessor::new(
332            inner,
333            "test-route".to_string(),
334            1,
335            DetailLevel::Minimal,
336            None,
337        );
338
339        let mut cloned = tracer.clone();
340        let exchange = Exchange::new(Message::default());
341        let result = cloned.ready().await.unwrap().call(exchange).await;
342        assert!(result.is_ok());
343    }
344
345    #[tokio::test]
346    async fn test_tracing_processor_propagates_otel_context() {
347        let inner = BoxProcessor::new(IdentityProcessor);
348        let mut tracer = TracingProcessor::new(
349            inner,
350            "test-route".to_string(),
351            0,
352            DetailLevel::Minimal,
353            None,
354        );
355
356        // Start with an empty exchange (default context)
357        let exchange = Exchange::new(Message::default());
358        assert!(
359            !exchange.otel_context.span().span_context().is_valid(),
360            "Initial context should have invalid span"
361        );
362
363        let result = tracer.ready().await.unwrap().call(exchange).await;
364
365        // After processing, the exchange should have a new span context
366        let output_exchange = result.unwrap();
367
368        // The output exchange should now have a valid span context
369        // (even with noop provider, the span should be recorded)
370        // Note: With noop provider, span context may still be invalid
371        // but the context should be properly attached
372        let _span_context = output_exchange.otel_context.span().span_context();
373    }
374
375    #[tokio::test]
376    async fn test_tracing_processor_with_parent_context() {
377        let inner = BoxProcessor::new(IdentityProcessor);
378        let mut tracer = TracingProcessor::new(
379            inner,
380            "test-route".to_string(),
381            0,
382            DetailLevel::Minimal,
383            None,
384        );
385
386        // Create a parent span context
387        let trace_id = TraceId::from_hex("12345678901234567890123456789012").unwrap();
388        let span_id = SpanId::from_hex("1234567890123456").unwrap();
389        let parent_span_context = SpanContext::new(
390            trace_id,
391            span_id,
392            TraceFlags::SAMPLED,
393            true, // is_remote
394            TraceState::default(),
395        );
396
397        // Create exchange with parent context
398        let mut exchange = Exchange::new(Message::default());
399        exchange.otel_context = OtelContext::new().with_remote_span_context(parent_span_context);
400
401        // Store the initial parent span context for comparison
402        let initial_span_context = exchange.otel_context.span().span_context().clone();
403
404        // Verify parent context is set
405        assert!(
406            exchange.otel_context.span().span_context().is_valid(),
407            "Parent context should be valid"
408        );
409        let _parent_trace_id = exchange.otel_context.span().span_context().trace_id();
410
411        let result = tracer.ready().await.unwrap().call(exchange).await;
412
413        let output_exchange = result.unwrap();
414
415        // The output should still have a valid context
416        // The trace ID should be preserved from parent
417        let output_span = output_exchange.otel_context.span();
418        // With noop provider, we may not get a valid span context,
419        // but the context propagation mechanism should work
420        let _output_trace_id = output_span.span_context().trace_id();
421
422        // Verify that the exchange's otel_context has been updated (child span created)
423        // Even with noop provider, the span context should be a different object
424        // (the processor creates a new span, which may be a noop but is still a new span)
425        let output_span_context = output_span.span_context();
426        // The span contexts should be different objects (different span IDs conceptually,
427        // though noop provider may not actually assign them)
428        assert!(
429            !std::ptr::eq(&initial_span_context, output_span_context),
430            "exchange.otel_context should have been updated with a new child span context"
431        );
432    }
433
434    #[tokio::test]
435    async fn test_tracing_processor_records_error() {
436        // Create a processor that always fails
437        let failing_processor = BoxProcessor::from_fn(|_ex: Exchange| async move {
438            Err(CamelError::ProcessorError("intentional test error".into()))
439        });
440
441        let mut tracer = TracingProcessor::new(
442            failing_processor,
443            "test-route".to_string(),
444            0,
445            DetailLevel::Minimal,
446            None,
447        );
448
449        let exchange = Exchange::new(Message::default());
450        let result = tracer.ready().await.unwrap().call(exchange).await;
451
452        // Verify the error is correctly propagated
453        assert!(result.is_err());
454        let err = result.unwrap_err();
455        assert!(err.to_string().contains("intentional test error"));
456
457        // Note: With noop provider, we cannot verify that the error was recorded on the span.
458        // Full span hierarchy verification (trace ID matching, parent span ID, error recording)
459        // requires an integration test with a real exporter, which will be covered in Task 11
460        // (integration tests).
461    }
462
463    #[tokio::test]
464    async fn test_tracing_processor_span_name_format() {
465        let inner = BoxProcessor::new(IdentityProcessor);
466        let tracer =
467            TracingProcessor::new(inner, "my-route".to_string(), 5, DetailLevel::Minimal, None);
468
469        assert_eq!(tracer.span_name, "my-route:step-5");
470    }
471
472    #[tokio::test]
473    async fn test_tracing_processor_chained_propagation() {
474        // Test that multiple processors in a chain properly propagate context
475        let processor1 = BoxProcessor::new(IdentityProcessor);
476        let mut tracer1 = TracingProcessor::new(
477            processor1,
478            "route1".to_string(),
479            0,
480            DetailLevel::Minimal,
481            None,
482        );
483
484        let processor2 = BoxProcessor::new(IdentityProcessor);
485        let mut tracer2 = TracingProcessor::new(
486            processor2,
487            "route2".to_string(),
488            1,
489            DetailLevel::Minimal,
490            None,
491        );
492
493        let exchange = Exchange::new(Message::default());
494        let result1 = tracer1.ready().await.unwrap().call(exchange).await;
495        let exchange1 = result1.unwrap();
496
497        // Pass the exchange through second processor
498        let result2 = tracer2.ready().await.unwrap().call(exchange1).await;
499        let exchange2 = result2.unwrap();
500
501        // Both processors should have updated the context
502        // The context should be valid and propagating
503        let _ = exchange2.otel_context;
504    }
505}