Skip to main content

camel_core/shared/observability/adapters/
tracer.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5use std::time::Instant;
6
7use opentelemetry::trace::{SpanKind, Status, TraceContextExt, Tracer};
8use opentelemetry::{Context as OtelContext, KeyValue, global};
9use tower::Service;
10use tower::ServiceExt;
11use tracing::Instrument;
12
13use crate::shared::observability::domain::DetailLevel;
14use camel_api::metrics::MetricsCollector;
15use camel_api::{Body, BoxProcessor, CamelError, Exchange};
16
17/// RAII guard that ensures an OTel span is ended when dropped.
18///
19/// This prevents span leaks if the inner processor panics or returns early.
20struct SpanEndGuard(OtelContext);
21
22impl Drop for SpanEndGuard {
23    fn drop(&mut self) {
24        self.0.span().end();
25    }
26}
27
28/// Returns a human-readable name for the body type variant.
29fn body_type_name(body: &Body) -> &'static str {
30    match body {
31        Body::Empty => "empty",
32        Body::Bytes(_) => "bytes",
33        Body::Text(_) => "text",
34        Body::Json(_) => "json",
35        Body::Xml(_) => "xml",
36        Body::Stream(_) => "stream",
37    }
38}
39
40/// A processor wrapper that emits tracing spans for each step.
41///
42/// This processor wraps another processor and adds distributed tracing by:
43/// 1. Starting a native OpenTelemetry span for each exchange
44/// 2. Propagating the OTel context through `exchange.otel_context`
45/// 3. Recording errors and status on the span
46///
47/// When no OTel provider is configured (noop provider), spans are no-ops with minimal overhead.
48pub struct TracingProcessor {
49    inner: BoxProcessor,
50    route_id: String,
51    step_id: String,
52    step_index: usize,
53    detail_level: DetailLevel,
54    metrics: Option<Arc<dyn MetricsCollector>>,
55}
56
57impl TracingProcessor {
58    /// Wrap a processor with tracing.
59    pub fn new(
60        inner: BoxProcessor,
61        route_id: String,
62        step_index: usize,
63        detail_level: DetailLevel,
64        metrics: Option<Arc<dyn MetricsCollector>>,
65    ) -> Self {
66        Self {
67            inner,
68            route_id,
69            step_id: format!("step-{}", step_index),
70            step_index,
71            detail_level,
72            metrics,
73        }
74    }
75
76    /// Build the span name from route_id and step_id.
77    fn span_name(&self) -> String {
78        format!("{}/{}", self.route_id, self.step_id)
79    }
80}
81
82impl Service<Exchange> for TracingProcessor {
83    type Response = Exchange;
84    type Error = CamelError;
85    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
86
87    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
88        self.inner.poll_ready(cx)
89    }
90
91    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
92        let start = Instant::now();
93        let span_name = self.span_name();
94
95        // Get the global tracer (noop if no provider is configured)
96        let tracer = global::tracer("camel-core");
97
98        // Extract parent context from exchange.otel_context
99        let parent_cx = exchange.otel_context.clone();
100
101        // Build span attributes
102        let mut attributes = vec![
103            KeyValue::new("correlation_id", exchange.correlation_id().to_string()),
104            KeyValue::new("route_id", self.route_id.clone()),
105            KeyValue::new("step_id", self.step_id.clone()),
106            KeyValue::new("step_index", self.step_index as i64),
107        ];
108
109        if self.detail_level >= DetailLevel::Medium {
110            attributes.push(KeyValue::new(
111                "headers_count",
112                exchange.input.headers.len() as i64,
113            ));
114            attributes.push(KeyValue::new(
115                "body_type",
116                body_type_name(&exchange.input.body),
117            ));
118            attributes.push(KeyValue::new("has_error", exchange.has_error()));
119        }
120
121        // Start a new span as a child of the parent context
122        let span = tracer
123            .span_builder(span_name.clone())
124            .with_kind(SpanKind::Internal)
125            .with_attributes(attributes)
126            .start_with_context(&tracer, &parent_cx);
127
128        // Create new context with this span as the active span
129        let cx = OtelContext::current_with_span(span);
130
131        // Store back into exchange so downstream processors inherit this context
132        exchange.otel_context = cx.clone();
133
134        // Also create a tracing span for local dev logging
135        let tracing_span = tracing::info_span!(
136            target: "camel_tracer",
137            "step",
138            correlation_id = %exchange.correlation_id(),
139            route_id = %self.route_id,
140            step_id = %self.step_id,
141            step_index = self.step_index,
142            timestamp = %chrono::Utc::now().to_rfc3339(),
143            duration_ms = tracing::field::Empty,
144            status = tracing::field::Empty,
145            headers_count = tracing::field::Empty,
146            body_type = tracing::field::Empty,
147            has_error = tracing::field::Empty,
148            output_body_type = tracing::field::Empty,
149            header_0 = tracing::field::Empty,
150            header_1 = tracing::field::Empty,
151            header_2 = tracing::field::Empty,
152            error = tracing::field::Empty,
153            error_type = tracing::field::Empty,
154        );
155
156        if self.detail_level >= DetailLevel::Medium {
157            tracing_span.record("headers_count", exchange.input.headers.len() as u64);
158            tracing_span.record("body_type", body_type_name(&exchange.input.body));
159            tracing_span.record("has_error", exchange.has_error());
160        }
161
162        if self.detail_level >= DetailLevel::Full {
163            for (i, (k, v)) in exchange.input.headers.iter().take(3).enumerate() {
164                tracing_span.record(format!("header_{i}").as_str(), format!("{k}={v:?}"));
165            }
166        }
167
168        let mut inner = self.inner.clone();
169        let detail_level = self.detail_level.clone();
170        let metrics = self.metrics.clone();
171        let route_id = self.route_id.clone();
172
173        Box::pin(
174            async move {
175                // Note: ContextGuard is not Send (it uses thread-local storage), so we cannot
176                // hold it across await points in an async fn. Instead, we propagate the OTel
177                // context through exchange.otel_context, which is Send + Sync.
178
179                // Create guard to ensure span is ended even on panic
180                let _guard = SpanEndGuard(cx.clone());
181
182                let result = inner.ready().await?.call(exchange).await;
183
184                let duration = start.elapsed();
185                let duration_ms = duration.as_millis() as u64;
186                tracing::Span::current().record("duration_ms", duration_ms);
187
188                // Record duration on OTel span
189                cx.span()
190                    .set_attribute(KeyValue::new("duration_ms", duration_ms as i64));
191
192                // Record metrics if collector is present
193                if let Some(ref metrics) = metrics {
194                    metrics.record_exchange_duration(&route_id, duration);
195                    metrics.increment_exchanges(&route_id);
196
197                    if let Err(e) = &result {
198                        metrics.increment_errors(&route_id, &e.to_string());
199                    }
200                }
201
202                match &result {
203                    Ok(ex) => {
204                        tracing::Span::current().record("status", "success");
205                        cx.span().set_status(Status::Ok);
206
207                        if detail_level >= DetailLevel::Medium {
208                            tracing::Span::current()
209                                .record("output_body_type", body_type_name(&ex.input.body));
210                            cx.span().set_attribute(KeyValue::new(
211                                "output_body_type",
212                                body_type_name(&ex.input.body),
213                            ));
214                        }
215                    }
216                    Err(e) => {
217                        tracing::Span::current().record("status", "error");
218                        tracing::Span::current().record("error", e.to_string());
219                        tracing::Span::current()
220                            .record("error_type", std::any::type_name::<CamelError>());
221
222                        // Record error on OTel span
223                        cx.span().set_status(Status::error(e.to_string()));
224                        cx.span().set_attribute(KeyValue::new(
225                            "error_type",
226                            std::any::type_name::<CamelError>(),
227                        ));
228                    }
229                }
230
231                // Span is ended by _guard when it drops here
232                result
233            }
234            .instrument(tracing_span),
235        )
236    }
237}
238
239impl Clone for TracingProcessor {
240    fn clone(&self) -> Self {
241        Self {
242            inner: self.inner.clone(),
243            route_id: self.route_id.clone(),
244            step_id: self.step_id.clone(),
245            step_index: self.step_index,
246            detail_level: self.detail_level.clone(),
247            metrics: self.metrics.clone(),
248        }
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    //! Tests for TracingProcessor.
255    //!
256    //! These tests use the noop OTel provider, which means:
257    //! - Spans are created but not exported
258    //! - Span contexts may not have valid trace/span IDs
259    //! - Error recording on spans cannot be verified
260    //!
261    //! Full span hierarchy verification (trace ID matching, parent span ID, error recording)
262    //! requires an integration test with a real exporter, which will be covered in Task 11
263    //! (integration tests).
264
265    use super::*;
266    use camel_api::{BoxProcessorExt, IdentityProcessor, Message, Value};
267    use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState};
268    use tower::ServiceExt;
269
270    #[tokio::test]
271    async fn test_tracing_processor_minimal() {
272        let inner = BoxProcessor::new(IdentityProcessor);
273        let mut tracer = TracingProcessor::new(
274            inner,
275            "test-route".to_string(),
276            0,
277            DetailLevel::Minimal,
278            None,
279        );
280
281        let exchange = Exchange::new(Message::default());
282        let result = tracer.ready().await.unwrap().call(exchange).await;
283
284        assert!(result.is_ok());
285    }
286
287    #[tokio::test]
288    async fn test_tracing_processor_medium_detail() {
289        let inner = BoxProcessor::new(IdentityProcessor);
290        let mut tracer = TracingProcessor::new(
291            inner,
292            "test-route".to_string(),
293            0,
294            DetailLevel::Medium,
295            None,
296        );
297
298        let exchange = Exchange::new(Message::default());
299        let result = tracer.ready().await.unwrap().call(exchange).await;
300
301        assert!(result.is_ok());
302    }
303
304    #[tokio::test]
305    async fn test_tracing_processor_full_detail() {
306        let inner = BoxProcessor::new(IdentityProcessor);
307        let mut tracer =
308            TracingProcessor::new(inner, "test-route".to_string(), 0, DetailLevel::Full, None);
309
310        let mut exchange = Exchange::new(Message::default());
311        exchange
312            .input
313            .headers
314            .insert("test".to_string(), Value::String("value".into()));
315
316        let result = tracer.ready().await.unwrap().call(exchange).await;
317
318        assert!(result.is_ok());
319    }
320
321    #[tokio::test]
322    async fn test_tracing_processor_clone() {
323        let inner = BoxProcessor::new(IdentityProcessor);
324        let tracer = TracingProcessor::new(
325            inner,
326            "test-route".to_string(),
327            1,
328            DetailLevel::Minimal,
329            None,
330        );
331
332        let mut cloned = tracer.clone();
333        let exchange = Exchange::new(Message::default());
334        let result = cloned.ready().await.unwrap().call(exchange).await;
335        assert!(result.is_ok());
336    }
337
338    #[tokio::test]
339    async fn test_tracing_processor_propagates_otel_context() {
340        let inner = BoxProcessor::new(IdentityProcessor);
341        let mut tracer = TracingProcessor::new(
342            inner,
343            "test-route".to_string(),
344            0,
345            DetailLevel::Minimal,
346            None,
347        );
348
349        // Start with an empty exchange (default context)
350        let exchange = Exchange::new(Message::default());
351        assert!(
352            !exchange.otel_context.span().span_context().is_valid(),
353            "Initial context should have invalid span"
354        );
355
356        let result = tracer.ready().await.unwrap().call(exchange).await;
357
358        // After processing, the exchange should have a new span context
359        let output_exchange = result.unwrap();
360
361        // The output exchange should now have a valid span context
362        // (even with noop provider, the span should be recorded)
363        // Note: With noop provider, span context may still be invalid
364        // but the context should be properly attached
365        let _span_context = output_exchange.otel_context.span().span_context();
366    }
367
368    #[tokio::test]
369    async fn test_tracing_processor_with_parent_context() {
370        let inner = BoxProcessor::new(IdentityProcessor);
371        let mut tracer = TracingProcessor::new(
372            inner,
373            "test-route".to_string(),
374            0,
375            DetailLevel::Minimal,
376            None,
377        );
378
379        // Create a parent span context
380        let trace_id = TraceId::from_hex("12345678901234567890123456789012").unwrap();
381        let span_id = SpanId::from_hex("1234567890123456").unwrap();
382        let parent_span_context = SpanContext::new(
383            trace_id,
384            span_id,
385            TraceFlags::SAMPLED,
386            true, // is_remote
387            TraceState::default(),
388        );
389
390        // Create exchange with parent context
391        let mut exchange = Exchange::new(Message::default());
392        exchange.otel_context = OtelContext::new().with_remote_span_context(parent_span_context);
393
394        // Store the initial parent span context for comparison
395        let initial_span_context = exchange.otel_context.span().span_context().clone();
396
397        // Verify parent context is set
398        assert!(
399            exchange.otel_context.span().span_context().is_valid(),
400            "Parent context should be valid"
401        );
402        let _parent_trace_id = exchange.otel_context.span().span_context().trace_id();
403
404        let result = tracer.ready().await.unwrap().call(exchange).await;
405
406        let output_exchange = result.unwrap();
407
408        // The output should still have a valid context
409        // The trace ID should be preserved from parent
410        let output_span = output_exchange.otel_context.span();
411        // With noop provider, we may not get a valid span context,
412        // but the context propagation mechanism should work
413        let _output_trace_id = output_span.span_context().trace_id();
414
415        // Verify that the exchange's otel_context has been updated (child span created)
416        // Even with noop provider, the span context should be a different object
417        // (the processor creates a new span, which may be a noop but is still a new span)
418        let output_span_context = output_span.span_context();
419        // The span contexts should be different objects (different span IDs conceptually,
420        // though noop provider may not actually assign them)
421        assert!(
422            !std::ptr::eq(&initial_span_context, output_span_context),
423            "exchange.otel_context should have been updated with a new child span context"
424        );
425    }
426
427    #[tokio::test]
428    async fn test_tracing_processor_records_error() {
429        // Create a processor that always fails
430        let failing_processor = BoxProcessor::from_fn(|_ex: Exchange| async move {
431            Err(CamelError::ProcessorError("intentional test error".into()))
432        });
433
434        let mut tracer = TracingProcessor::new(
435            failing_processor,
436            "test-route".to_string(),
437            0,
438            DetailLevel::Minimal,
439            None,
440        );
441
442        let exchange = Exchange::new(Message::default());
443        let result = tracer.ready().await.unwrap().call(exchange).await;
444
445        // Verify the error is correctly propagated
446        assert!(result.is_err());
447        let err = result.unwrap_err();
448        assert!(err.to_string().contains("intentional test error"));
449
450        // Note: With noop provider, we cannot verify that the error was recorded on the span.
451        // Full span hierarchy verification (trace ID matching, parent span ID, error recording)
452        // requires an integration test with a real exporter, which will be covered in Task 11
453        // (integration tests).
454    }
455
456    #[tokio::test]
457    async fn test_tracing_processor_span_name_format() {
458        let inner = BoxProcessor::new(IdentityProcessor);
459        let tracer =
460            TracingProcessor::new(inner, "my-route".to_string(), 5, DetailLevel::Minimal, None);
461
462        // Span name should be "route_id/step_id"
463        assert_eq!(tracer.span_name(), "my-route/step-5");
464    }
465
466    #[tokio::test]
467    async fn test_tracing_processor_chained_propagation() {
468        // Test that multiple processors in a chain properly propagate context
469        let processor1 = BoxProcessor::new(IdentityProcessor);
470        let mut tracer1 = TracingProcessor::new(
471            processor1,
472            "route1".to_string(),
473            0,
474            DetailLevel::Minimal,
475            None,
476        );
477
478        let processor2 = BoxProcessor::new(IdentityProcessor);
479        let mut tracer2 = TracingProcessor::new(
480            processor2,
481            "route2".to_string(),
482            1,
483            DetailLevel::Minimal,
484            None,
485        );
486
487        let exchange = Exchange::new(Message::default());
488        let result1 = tracer1.ready().await.unwrap().call(exchange).await;
489        let exchange1 = result1.unwrap();
490
491        // Pass the exchange through second processor
492        let result2 = tracer2.ready().await.unwrap().call(exchange1).await;
493        let exchange2 = result2.unwrap();
494
495        // Both processors should have updated the context
496        // The context should be valid and propagating
497        let _ = exchange2.otel_context;
498    }
499}