Skip to main content

camel_core/
tracer.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Instant;
5
6use opentelemetry::trace::{SpanKind, Status, TraceContextExt, Tracer};
7use opentelemetry::{Context as OtelContext, KeyValue, global};
8use tower::Service;
9use tower::ServiceExt;
10use tracing::Instrument;
11
12use crate::config::DetailLevel;
13use camel_api::{Body, BoxProcessor, CamelError, Exchange};
14
15/// RAII guard that ensures an OTel span is ended when dropped.
16///
17/// This prevents span leaks if the inner processor panics or returns early.
18struct SpanEndGuard(OtelContext);
19
20impl Drop for SpanEndGuard {
21    fn drop(&mut self) {
22        self.0.span().end();
23    }
24}
25
26/// Returns a human-readable name for the body type variant.
27fn body_type_name(body: &Body) -> &'static str {
28    match body {
29        Body::Empty => "empty",
30        Body::Bytes(_) => "bytes",
31        Body::Text(_) => "text",
32        Body::Json(_) => "json",
33        Body::Stream(_) => "stream",
34    }
35}
36
37/// A processor wrapper that emits tracing spans for each step.
38///
39/// This processor wraps another processor and adds distributed tracing by:
40/// 1. Starting a native OpenTelemetry span for each exchange
41/// 2. Propagating the OTel context through `exchange.otel_context`
42/// 3. Recording errors and status on the span
43///
44/// When no OTel provider is configured (noop provider), spans are no-ops with minimal overhead.
45pub struct TracingProcessor {
46    inner: BoxProcessor,
47    route_id: String,
48    step_id: String,
49    step_index: usize,
50    detail_level: DetailLevel,
51}
52
53impl TracingProcessor {
54    /// Wrap a processor with tracing.
55    pub fn new(
56        inner: BoxProcessor,
57        route_id: String,
58        step_index: usize,
59        detail_level: DetailLevel,
60    ) -> Self {
61        Self {
62            inner,
63            route_id,
64            step_id: format!("step-{}", step_index),
65            step_index,
66            detail_level,
67        }
68    }
69
70    /// Build the span name from route_id and step_id.
71    fn span_name(&self) -> String {
72        format!("{}/{}", self.route_id, self.step_id)
73    }
74}
75
76impl Service<Exchange> for TracingProcessor {
77    type Response = Exchange;
78    type Error = CamelError;
79    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
80
81    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
82        self.inner.poll_ready(cx)
83    }
84
85    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
86        let start = Instant::now();
87        let span_name = self.span_name();
88
89        // Get the global tracer (noop if no provider is configured)
90        let tracer = global::tracer("camel-core");
91
92        // Extract parent context from exchange.otel_context
93        let parent_cx = exchange.otel_context.clone();
94
95        // Build span attributes
96        let mut attributes = vec![
97            KeyValue::new("correlation_id", exchange.correlation_id().to_string()),
98            KeyValue::new("route_id", self.route_id.clone()),
99            KeyValue::new("step_id", self.step_id.clone()),
100            KeyValue::new("step_index", self.step_index as i64),
101        ];
102
103        if self.detail_level >= DetailLevel::Medium {
104            attributes.push(KeyValue::new(
105                "headers_count",
106                exchange.input.headers.len() as i64,
107            ));
108            attributes.push(KeyValue::new(
109                "body_type",
110                body_type_name(&exchange.input.body),
111            ));
112            attributes.push(KeyValue::new("has_error", exchange.has_error()));
113        }
114
115        // Start a new span as a child of the parent context
116        let span = tracer
117            .span_builder(span_name.clone())
118            .with_kind(SpanKind::Internal)
119            .with_attributes(attributes)
120            .start_with_context(&tracer, &parent_cx);
121
122        // Create new context with this span as the active span
123        let cx = OtelContext::current_with_span(span);
124
125        // Store back into exchange so downstream processors inherit this context
126        exchange.otel_context = cx.clone();
127
128        // Also create a tracing span for local dev logging
129        let tracing_span = tracing::info_span!(
130            target: "camel_tracer",
131            "step",
132            correlation_id = %exchange.correlation_id(),
133            route_id = %self.route_id,
134            step_id = %self.step_id,
135            step_index = self.step_index,
136            timestamp = %chrono::Utc::now().to_rfc3339(),
137            duration_ms = tracing::field::Empty,
138            status = tracing::field::Empty,
139            headers_count = tracing::field::Empty,
140            body_type = tracing::field::Empty,
141            has_error = tracing::field::Empty,
142            output_body_type = tracing::field::Empty,
143            header_0 = tracing::field::Empty,
144            header_1 = tracing::field::Empty,
145            header_2 = tracing::field::Empty,
146            error = tracing::field::Empty,
147            error_type = tracing::field::Empty,
148        );
149
150        if self.detail_level >= DetailLevel::Medium {
151            tracing_span.record("headers_count", exchange.input.headers.len() as u64);
152            tracing_span.record("body_type", body_type_name(&exchange.input.body));
153            tracing_span.record("has_error", exchange.has_error());
154        }
155
156        if self.detail_level >= DetailLevel::Full {
157            for (i, (k, v)) in exchange.input.headers.iter().take(3).enumerate() {
158                tracing_span.record(format!("header_{i}").as_str(), format!("{k}={v:?}"));
159            }
160        }
161
162        let mut inner = self.inner.clone();
163        let detail_level = self.detail_level.clone();
164
165        Box::pin(
166            async move {
167                // Note: ContextGuard is not Send (it uses thread-local storage), so we cannot
168                // hold it across await points in an async fn. Instead, we propagate the OTel
169                // context through exchange.otel_context, which is Send + Sync.
170
171                // Create guard to ensure span is ended even on panic
172                let _guard = SpanEndGuard(cx.clone());
173
174                let result = inner.ready().await?.call(exchange).await;
175
176                let duration_ms = start.elapsed().as_millis() as u64;
177                tracing::Span::current().record("duration_ms", duration_ms);
178
179                // Record duration on OTel span
180                cx.span()
181                    .set_attribute(KeyValue::new("duration_ms", duration_ms as i64));
182
183                match &result {
184                    Ok(ex) => {
185                        tracing::Span::current().record("status", "success");
186                        cx.span().set_status(Status::Ok);
187
188                        if detail_level >= DetailLevel::Medium {
189                            tracing::Span::current()
190                                .record("output_body_type", body_type_name(&ex.input.body));
191                            cx.span().set_attribute(KeyValue::new(
192                                "output_body_type",
193                                body_type_name(&ex.input.body),
194                            ));
195                        }
196                    }
197                    Err(e) => {
198                        tracing::Span::current().record("status", "error");
199                        tracing::Span::current().record("error", e.to_string());
200                        tracing::Span::current()
201                            .record("error_type", std::any::type_name::<CamelError>());
202
203                        // Record error on OTel span
204                        cx.span().set_status(Status::error(e.to_string()));
205                        cx.span().set_attribute(KeyValue::new(
206                            "error_type",
207                            std::any::type_name::<CamelError>(),
208                        ));
209                    }
210                }
211
212                // Span is ended by _guard when it drops here
213                result
214            }
215            .instrument(tracing_span),
216        )
217    }
218}
219
220impl Clone for TracingProcessor {
221    fn clone(&self) -> Self {
222        Self {
223            inner: self.inner.clone(),
224            route_id: self.route_id.clone(),
225            step_id: self.step_id.clone(),
226            step_index: self.step_index,
227            detail_level: self.detail_level.clone(),
228        }
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    //! Tests for TracingProcessor.
235    //!
236    //! These tests use the noop OTel provider, which means:
237    //! - Spans are created but not exported
238    //! - Span contexts may not have valid trace/span IDs
239    //! - Error recording on spans cannot be verified
240    //!
241    //! Full span hierarchy verification (trace ID matching, parent span ID, error recording)
242    //! requires an integration test with a real exporter, which will be covered in Task 11
243    //! (integration tests).
244
245    use super::*;
246    use camel_api::{BoxProcessorExt, IdentityProcessor, Message, Value};
247    use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState};
248    use tower::ServiceExt;
249
250    #[tokio::test]
251    async fn test_tracing_processor_minimal() {
252        let inner = BoxProcessor::new(IdentityProcessor);
253        let mut tracer =
254            TracingProcessor::new(inner, "test-route".to_string(), 0, DetailLevel::Minimal);
255
256        let exchange = Exchange::new(Message::default());
257        let result = tracer.ready().await.unwrap().call(exchange).await;
258
259        assert!(result.is_ok());
260    }
261
262    #[tokio::test]
263    async fn test_tracing_processor_medium_detail() {
264        let inner = BoxProcessor::new(IdentityProcessor);
265        let mut tracer =
266            TracingProcessor::new(inner, "test-route".to_string(), 0, DetailLevel::Medium);
267
268        let exchange = Exchange::new(Message::default());
269        let result = tracer.ready().await.unwrap().call(exchange).await;
270
271        assert!(result.is_ok());
272    }
273
274    #[tokio::test]
275    async fn test_tracing_processor_full_detail() {
276        let inner = BoxProcessor::new(IdentityProcessor);
277        let mut tracer =
278            TracingProcessor::new(inner, "test-route".to_string(), 0, DetailLevel::Full);
279
280        let mut exchange = Exchange::new(Message::default());
281        exchange
282            .input
283            .headers
284            .insert("test".to_string(), Value::String("value".into()));
285
286        let result = tracer.ready().await.unwrap().call(exchange).await;
287
288        assert!(result.is_ok());
289    }
290
291    #[tokio::test]
292    async fn test_tracing_processor_clone() {
293        let inner = BoxProcessor::new(IdentityProcessor);
294        let tracer =
295            TracingProcessor::new(inner, "test-route".to_string(), 1, DetailLevel::Minimal);
296
297        let mut cloned = tracer.clone();
298        let exchange = Exchange::new(Message::default());
299        let result = cloned.ready().await.unwrap().call(exchange).await;
300        assert!(result.is_ok());
301    }
302
303    #[tokio::test]
304    async fn test_tracing_processor_propagates_otel_context() {
305        let inner = BoxProcessor::new(IdentityProcessor);
306        let mut tracer =
307            TracingProcessor::new(inner, "test-route".to_string(), 0, DetailLevel::Minimal);
308
309        // Start with an empty exchange (default context)
310        let exchange = Exchange::new(Message::default());
311        assert!(
312            !exchange.otel_context.span().span_context().is_valid(),
313            "Initial context should have invalid span"
314        );
315
316        let result = tracer.ready().await.unwrap().call(exchange).await;
317
318        // After processing, the exchange should have a new span context
319        let output_exchange = result.unwrap();
320
321        // The output exchange should now have a valid span context
322        // (even with noop provider, the span should be recorded)
323        // Note: With noop provider, span context may still be invalid
324        // but the context should be properly attached
325        let _span_context = output_exchange.otel_context.span().span_context();
326    }
327
328    #[tokio::test]
329    async fn test_tracing_processor_with_parent_context() {
330        let inner = BoxProcessor::new(IdentityProcessor);
331        let mut tracer =
332            TracingProcessor::new(inner, "test-route".to_string(), 0, DetailLevel::Minimal);
333
334        // Create a parent span context
335        let trace_id = TraceId::from_hex("12345678901234567890123456789012").unwrap();
336        let span_id = SpanId::from_hex("1234567890123456").unwrap();
337        let parent_span_context = SpanContext::new(
338            trace_id,
339            span_id,
340            TraceFlags::SAMPLED,
341            true, // is_remote
342            TraceState::default(),
343        );
344
345        // Create exchange with parent context
346        let mut exchange = Exchange::new(Message::default());
347        exchange.otel_context = OtelContext::new().with_remote_span_context(parent_span_context);
348
349        // Store the initial parent span context for comparison
350        let initial_span_context = exchange.otel_context.span().span_context().clone();
351
352        // Verify parent context is set
353        assert!(
354            exchange.otel_context.span().span_context().is_valid(),
355            "Parent context should be valid"
356        );
357        let _parent_trace_id = exchange.otel_context.span().span_context().trace_id();
358
359        let result = tracer.ready().await.unwrap().call(exchange).await;
360
361        let output_exchange = result.unwrap();
362
363        // The output should still have a valid context
364        // The trace ID should be preserved from parent
365        let output_span = output_exchange.otel_context.span();
366        // With noop provider, we may not get a valid span context,
367        // but the context propagation mechanism should work
368        let _output_trace_id = output_span.span_context().trace_id();
369
370        // Verify that the exchange's otel_context has been updated (child span created)
371        // Even with noop provider, the span context should be a different object
372        // (the processor creates a new span, which may be a noop but is still a new span)
373        let output_span_context = output_span.span_context();
374        // The span contexts should be different objects (different span IDs conceptually,
375        // though noop provider may not actually assign them)
376        assert!(
377            !std::ptr::eq(&initial_span_context, output_span_context),
378            "exchange.otel_context should have been updated with a new child span context"
379        );
380    }
381
382    #[tokio::test]
383    async fn test_tracing_processor_records_error() {
384        // Create a processor that always fails
385        let failing_processor = BoxProcessor::from_fn(|_ex: Exchange| async move {
386            Err(CamelError::ProcessorError("intentional test error".into()))
387        });
388
389        let mut tracer = TracingProcessor::new(
390            failing_processor,
391            "test-route".to_string(),
392            0,
393            DetailLevel::Minimal,
394        );
395
396        let exchange = Exchange::new(Message::default());
397        let result = tracer.ready().await.unwrap().call(exchange).await;
398
399        // Verify the error is correctly propagated
400        assert!(result.is_err());
401        let err = result.unwrap_err();
402        assert!(err.to_string().contains("intentional test error"));
403
404        // Note: With noop provider, we cannot verify that the error was recorded on the span.
405        // Full span hierarchy verification (trace ID matching, parent span ID, error recording)
406        // requires an integration test with a real exporter, which will be covered in Task 11
407        // (integration tests).
408    }
409
410    #[tokio::test]
411    async fn test_tracing_processor_span_name_format() {
412        let inner = BoxProcessor::new(IdentityProcessor);
413        let tracer = TracingProcessor::new(inner, "my-route".to_string(), 5, DetailLevel::Minimal);
414
415        // Span name should be "route_id/step_id"
416        assert_eq!(tracer.span_name(), "my-route/step-5");
417    }
418
419    #[tokio::test]
420    async fn test_tracing_processor_chained_propagation() {
421        // Test that multiple processors in a chain properly propagate context
422        let processor1 = BoxProcessor::new(IdentityProcessor);
423        let mut tracer1 =
424            TracingProcessor::new(processor1, "route1".to_string(), 0, DetailLevel::Minimal);
425
426        let processor2 = BoxProcessor::new(IdentityProcessor);
427        let mut tracer2 =
428            TracingProcessor::new(processor2, "route2".to_string(), 1, DetailLevel::Minimal);
429
430        let exchange = Exchange::new(Message::default());
431        let result1 = tracer1.ready().await.unwrap().call(exchange).await;
432        let exchange1 = result1.unwrap();
433
434        // Pass the exchange through second processor
435        let result2 = tracer2.ready().await.unwrap().call(exchange1).await;
436        let exchange2 = result2.unwrap();
437
438        // Both processors should have updated the context
439        // The context should be valid and propagating
440        let _ = exchange2.otel_context;
441    }
442}