1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Instant;
5
6use opentelemetry::trace::{SpanKind, Status, TraceContextExt, Tracer};
7use opentelemetry::{Context as OtelContext, KeyValue, global};
8use tower::Service;
9use tower::ServiceExt;
10use tracing::Instrument;
11
12use crate::config::DetailLevel;
13use camel_api::{Body, BoxProcessor, CamelError, Exchange};
14
15struct SpanEndGuard(OtelContext);
19
20impl Drop for SpanEndGuard {
21 fn drop(&mut self) {
22 self.0.span().end();
23 }
24}
25
26fn body_type_name(body: &Body) -> &'static str {
28 match body {
29 Body::Empty => "empty",
30 Body::Bytes(_) => "bytes",
31 Body::Text(_) => "text",
32 Body::Json(_) => "json",
33 Body::Stream(_) => "stream",
34 }
35}
36
37pub struct TracingProcessor {
46 inner: BoxProcessor,
47 route_id: String,
48 step_id: String,
49 step_index: usize,
50 detail_level: DetailLevel,
51}
52
53impl TracingProcessor {
54 pub fn new(
56 inner: BoxProcessor,
57 route_id: String,
58 step_index: usize,
59 detail_level: DetailLevel,
60 ) -> Self {
61 Self {
62 inner,
63 route_id,
64 step_id: format!("step-{}", step_index),
65 step_index,
66 detail_level,
67 }
68 }
69
70 fn span_name(&self) -> String {
72 format!("{}/{}", self.route_id, self.step_id)
73 }
74}
75
76impl Service<Exchange> for TracingProcessor {
77 type Response = Exchange;
78 type Error = CamelError;
79 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
80
81 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
82 self.inner.poll_ready(cx)
83 }
84
85 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
86 let start = Instant::now();
87 let span_name = self.span_name();
88
89 let tracer = global::tracer("camel-core");
91
92 let parent_cx = exchange.otel_context.clone();
94
95 let mut attributes = vec![
97 KeyValue::new("correlation_id", exchange.correlation_id().to_string()),
98 KeyValue::new("route_id", self.route_id.clone()),
99 KeyValue::new("step_id", self.step_id.clone()),
100 KeyValue::new("step_index", self.step_index as i64),
101 ];
102
103 if self.detail_level >= DetailLevel::Medium {
104 attributes.push(KeyValue::new(
105 "headers_count",
106 exchange.input.headers.len() as i64,
107 ));
108 attributes.push(KeyValue::new(
109 "body_type",
110 body_type_name(&exchange.input.body),
111 ));
112 attributes.push(KeyValue::new("has_error", exchange.has_error()));
113 }
114
115 let span = tracer
117 .span_builder(span_name.clone())
118 .with_kind(SpanKind::Internal)
119 .with_attributes(attributes)
120 .start_with_context(&tracer, &parent_cx);
121
122 let cx = OtelContext::current_with_span(span);
124
125 exchange.otel_context = cx.clone();
127
128 let tracing_span = tracing::info_span!(
130 target: "camel_tracer",
131 "step",
132 correlation_id = %exchange.correlation_id(),
133 route_id = %self.route_id,
134 step_id = %self.step_id,
135 step_index = self.step_index,
136 timestamp = %chrono::Utc::now().to_rfc3339(),
137 duration_ms = tracing::field::Empty,
138 status = tracing::field::Empty,
139 headers_count = tracing::field::Empty,
140 body_type = tracing::field::Empty,
141 has_error = tracing::field::Empty,
142 output_body_type = tracing::field::Empty,
143 header_0 = tracing::field::Empty,
144 header_1 = tracing::field::Empty,
145 header_2 = tracing::field::Empty,
146 error = tracing::field::Empty,
147 error_type = tracing::field::Empty,
148 );
149
150 if self.detail_level >= DetailLevel::Medium {
151 tracing_span.record("headers_count", exchange.input.headers.len() as u64);
152 tracing_span.record("body_type", body_type_name(&exchange.input.body));
153 tracing_span.record("has_error", exchange.has_error());
154 }
155
156 if self.detail_level >= DetailLevel::Full {
157 for (i, (k, v)) in exchange.input.headers.iter().take(3).enumerate() {
158 tracing_span.record(format!("header_{i}").as_str(), format!("{k}={v:?}"));
159 }
160 }
161
162 let mut inner = self.inner.clone();
163 let detail_level = self.detail_level.clone();
164
165 Box::pin(
166 async move {
167 let _guard = SpanEndGuard(cx.clone());
173
174 let result = inner.ready().await?.call(exchange).await;
175
176 let duration_ms = start.elapsed().as_millis() as u64;
177 tracing::Span::current().record("duration_ms", duration_ms);
178
179 cx.span()
181 .set_attribute(KeyValue::new("duration_ms", duration_ms as i64));
182
183 match &result {
184 Ok(ex) => {
185 tracing::Span::current().record("status", "success");
186 cx.span().set_status(Status::Ok);
187
188 if detail_level >= DetailLevel::Medium {
189 tracing::Span::current()
190 .record("output_body_type", body_type_name(&ex.input.body));
191 cx.span().set_attribute(KeyValue::new(
192 "output_body_type",
193 body_type_name(&ex.input.body),
194 ));
195 }
196 }
197 Err(e) => {
198 tracing::Span::current().record("status", "error");
199 tracing::Span::current().record("error", e.to_string());
200 tracing::Span::current()
201 .record("error_type", std::any::type_name::<CamelError>());
202
203 cx.span().set_status(Status::error(e.to_string()));
205 cx.span().set_attribute(KeyValue::new(
206 "error_type",
207 std::any::type_name::<CamelError>(),
208 ));
209 }
210 }
211
212 result
214 }
215 .instrument(tracing_span),
216 )
217 }
218}
219
220impl Clone for TracingProcessor {
221 fn clone(&self) -> Self {
222 Self {
223 inner: self.inner.clone(),
224 route_id: self.route_id.clone(),
225 step_id: self.step_id.clone(),
226 step_index: self.step_index,
227 detail_level: self.detail_level.clone(),
228 }
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
246 use camel_api::{BoxProcessorExt, IdentityProcessor, Message, Value};
247 use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState};
248 use tower::ServiceExt;
249
250 #[tokio::test]
251 async fn test_tracing_processor_minimal() {
252 let inner = BoxProcessor::new(IdentityProcessor);
253 let mut tracer =
254 TracingProcessor::new(inner, "test-route".to_string(), 0, DetailLevel::Minimal);
255
256 let exchange = Exchange::new(Message::default());
257 let result = tracer.ready().await.unwrap().call(exchange).await;
258
259 assert!(result.is_ok());
260 }
261
262 #[tokio::test]
263 async fn test_tracing_processor_medium_detail() {
264 let inner = BoxProcessor::new(IdentityProcessor);
265 let mut tracer =
266 TracingProcessor::new(inner, "test-route".to_string(), 0, DetailLevel::Medium);
267
268 let exchange = Exchange::new(Message::default());
269 let result = tracer.ready().await.unwrap().call(exchange).await;
270
271 assert!(result.is_ok());
272 }
273
274 #[tokio::test]
275 async fn test_tracing_processor_full_detail() {
276 let inner = BoxProcessor::new(IdentityProcessor);
277 let mut tracer =
278 TracingProcessor::new(inner, "test-route".to_string(), 0, DetailLevel::Full);
279
280 let mut exchange = Exchange::new(Message::default());
281 exchange
282 .input
283 .headers
284 .insert("test".to_string(), Value::String("value".into()));
285
286 let result = tracer.ready().await.unwrap().call(exchange).await;
287
288 assert!(result.is_ok());
289 }
290
291 #[tokio::test]
292 async fn test_tracing_processor_clone() {
293 let inner = BoxProcessor::new(IdentityProcessor);
294 let tracer =
295 TracingProcessor::new(inner, "test-route".to_string(), 1, DetailLevel::Minimal);
296
297 let mut cloned = tracer.clone();
298 let exchange = Exchange::new(Message::default());
299 let result = cloned.ready().await.unwrap().call(exchange).await;
300 assert!(result.is_ok());
301 }
302
303 #[tokio::test]
304 async fn test_tracing_processor_propagates_otel_context() {
305 let inner = BoxProcessor::new(IdentityProcessor);
306 let mut tracer =
307 TracingProcessor::new(inner, "test-route".to_string(), 0, DetailLevel::Minimal);
308
309 let exchange = Exchange::new(Message::default());
311 assert!(
312 !exchange.otel_context.span().span_context().is_valid(),
313 "Initial context should have invalid span"
314 );
315
316 let result = tracer.ready().await.unwrap().call(exchange).await;
317
318 let output_exchange = result.unwrap();
320
321 let _span_context = output_exchange.otel_context.span().span_context();
326 }
327
328 #[tokio::test]
329 async fn test_tracing_processor_with_parent_context() {
330 let inner = BoxProcessor::new(IdentityProcessor);
331 let mut tracer =
332 TracingProcessor::new(inner, "test-route".to_string(), 0, DetailLevel::Minimal);
333
334 let trace_id = TraceId::from_hex("12345678901234567890123456789012").unwrap();
336 let span_id = SpanId::from_hex("1234567890123456").unwrap();
337 let parent_span_context = SpanContext::new(
338 trace_id,
339 span_id,
340 TraceFlags::SAMPLED,
341 true, TraceState::default(),
343 );
344
345 let mut exchange = Exchange::new(Message::default());
347 exchange.otel_context = OtelContext::new().with_remote_span_context(parent_span_context);
348
349 let initial_span_context = exchange.otel_context.span().span_context().clone();
351
352 assert!(
354 exchange.otel_context.span().span_context().is_valid(),
355 "Parent context should be valid"
356 );
357 let _parent_trace_id = exchange.otel_context.span().span_context().trace_id();
358
359 let result = tracer.ready().await.unwrap().call(exchange).await;
360
361 let output_exchange = result.unwrap();
362
363 let output_span = output_exchange.otel_context.span();
366 let _output_trace_id = output_span.span_context().trace_id();
369
370 let output_span_context = output_span.span_context();
374 assert!(
377 !std::ptr::eq(&initial_span_context, output_span_context),
378 "exchange.otel_context should have been updated with a new child span context"
379 );
380 }
381
382 #[tokio::test]
383 async fn test_tracing_processor_records_error() {
384 let failing_processor = BoxProcessor::from_fn(|_ex: Exchange| async move {
386 Err(CamelError::ProcessorError("intentional test error".into()))
387 });
388
389 let mut tracer = TracingProcessor::new(
390 failing_processor,
391 "test-route".to_string(),
392 0,
393 DetailLevel::Minimal,
394 );
395
396 let exchange = Exchange::new(Message::default());
397 let result = tracer.ready().await.unwrap().call(exchange).await;
398
399 assert!(result.is_err());
401 let err = result.unwrap_err();
402 assert!(err.to_string().contains("intentional test error"));
403
404 }
409
410 #[tokio::test]
411 async fn test_tracing_processor_span_name_format() {
412 let inner = BoxProcessor::new(IdentityProcessor);
413 let tracer = TracingProcessor::new(inner, "my-route".to_string(), 5, DetailLevel::Minimal);
414
415 assert_eq!(tracer.span_name(), "my-route/step-5");
417 }
418
419 #[tokio::test]
420 async fn test_tracing_processor_chained_propagation() {
421 let processor1 = BoxProcessor::new(IdentityProcessor);
423 let mut tracer1 =
424 TracingProcessor::new(processor1, "route1".to_string(), 0, DetailLevel::Minimal);
425
426 let processor2 = BoxProcessor::new(IdentityProcessor);
427 let mut tracer2 =
428 TracingProcessor::new(processor2, "route2".to_string(), 1, DetailLevel::Minimal);
429
430 let exchange = Exchange::new(Message::default());
431 let result1 = tracer1.ready().await.unwrap().call(exchange).await;
432 let exchange1 = result1.unwrap();
433
434 let result2 = tracer2.ready().await.unwrap().call(exchange1).await;
436 let exchange2 = result2.unwrap();
437
438 let _ = exchange2.otel_context;
441 }
442}