1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5use std::time::Instant;
6
7use opentelemetry::trace::{SpanKind, Status, TraceContextExt, Tracer};
8use opentelemetry::{Context as OtelContext, KeyValue, global};
9use tower::Service;
10use tower::ServiceExt;
11use tracing::Instrument;
12
13use crate::shared::observability::domain::DetailLevel;
14use camel_api::metrics::MetricsCollector;
15use camel_api::{Body, BoxProcessor, CamelError, Exchange};
16
17struct SpanEndGuard(OtelContext);
21
22impl Drop for SpanEndGuard {
23 fn drop(&mut self) {
24 self.0.span().end();
25 }
26}
27
28fn body_type_name(body: &Body) -> &'static str {
30 match body {
31 Body::Empty => "empty",
32 Body::Bytes(_) => "bytes",
33 Body::Text(_) => "text",
34 Body::Json(_) => "json",
35 Body::Xml(_) => "xml",
36 Body::Stream(_) => "stream",
37 }
38}
39
40pub struct TracingProcessor {
49 inner: BoxProcessor,
50 route_id: String,
51 step_id: String,
52 span_name: String,
53 step_index: usize,
54 detail_level: DetailLevel,
55 metrics: Option<Arc<dyn MetricsCollector>>,
56}
57
58impl TracingProcessor {
59 pub fn new(
61 inner: BoxProcessor,
62 route_id: String,
63 step_index: usize,
64 detail_level: DetailLevel,
65 metrics: Option<Arc<dyn MetricsCollector>>,
66 ) -> Self {
67 let step_id = format!("step-{}", step_index);
68 let span_name = format!("{route_id}:{step_id}");
69 Self {
70 inner,
71 route_id,
72 step_id,
73 span_name,
74 step_index,
75 detail_level,
76 metrics,
77 }
78 }
79}
80
81impl Service<Exchange> for TracingProcessor {
82 type Response = Exchange;
83 type Error = CamelError;
84 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
85
86 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87 self.inner.poll_ready(cx)
88 }
89
90 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
91 let start = Instant::now();
92 let span_name = self.span_name.clone();
93
94 let tracer = global::tracer("camel-core");
96
97 let parent_cx = exchange.otel_context.clone();
99
100 let mut attributes = [
102 KeyValue::new("messaging.system", "camel"),
103 KeyValue::new("correlation_id", exchange.correlation_id().to_string()),
104 KeyValue::new("route_id", self.route_id.clone()),
105 KeyValue::new("step_id", self.step_id.clone()),
106 KeyValue::new("step_index", self.step_index as i64),
107 KeyValue::new("headers_count", 0i64),
108 KeyValue::new("body_type", ""),
109 KeyValue::new("has_error", false),
110 ];
111 let mut attr_count = 5;
112
113 if self.detail_level >= DetailLevel::Medium {
114 attributes[5] = KeyValue::new("headers_count", exchange.input.headers.len() as i64);
115 attributes[6] = KeyValue::new("body_type", body_type_name(&exchange.input.body));
116 attributes[7] = KeyValue::new("has_error", exchange.has_error());
117 attr_count = 8;
118 }
119
120 let span = tracer
122 .span_builder(span_name)
123 .with_kind(SpanKind::Internal)
124 .with_attributes(attributes[..attr_count].iter().cloned())
125 .start_with_context(&tracer, &parent_cx);
126
127 let cx = OtelContext::current_with_span(span);
129
130 exchange.otel_context = cx.clone();
132
133 let tracing_span = tracing::info_span!(
135 target: "camel_tracer",
136 "step",
137 correlation_id = %exchange.correlation_id(),
138 route_id = %self.route_id,
139 step_id = %self.step_id,
140 step_index = self.step_index,
141 duration_ms = tracing::field::Empty,
142 status = tracing::field::Empty,
143 headers_count = tracing::field::Empty,
144 body_type = tracing::field::Empty,
145 has_error = tracing::field::Empty,
146 output_body_type = tracing::field::Empty,
147 header_0 = tracing::field::Empty,
148 header_1 = tracing::field::Empty,
149 header_2 = tracing::field::Empty,
150 error = tracing::field::Empty,
151 error_type = tracing::field::Empty,
152 );
153
154 if self.detail_level >= DetailLevel::Medium {
155 tracing_span.record("headers_count", exchange.input.headers.len() as u64);
156 tracing_span.record("body_type", body_type_name(&exchange.input.body));
157 tracing_span.record("has_error", exchange.has_error());
158 }
159
160 if self.detail_level >= DetailLevel::Full {
161 let headers: Vec<_> = exchange.input.headers.iter().take(3).collect();
162 if let Some((k, v)) = headers.first() {
163 tracing_span.record("header_0", format!("{k}={v:?}"));
164 }
165 if let Some((k, v)) = headers.get(1) {
166 tracing_span.record("header_1", format!("{k}={v:?}"));
167 }
168 if let Some((k, v)) = headers.get(2) {
169 tracing_span.record("header_2", format!("{k}={v:?}"));
170 }
171 }
172
173 let mut inner = self.inner.clone();
174 let detail_level = self.detail_level.clone();
175 let metrics = self.metrics.clone();
176 let route_id = self.route_id.clone();
177
178 Box::pin(
179 async move {
180 let _guard = SpanEndGuard(cx.clone());
186
187 let result = inner.ready().await?.call(exchange).await;
188
189 let duration = start.elapsed();
190 let duration_ms = duration.as_millis() as u64;
191 tracing::Span::current().record("duration_ms", duration_ms);
192
193 cx.span()
195 .set_attribute(KeyValue::new("duration_ms", duration_ms as i64));
196
197 if let Some(ref metrics) = metrics {
199 metrics.record_exchange_duration(&route_id, duration);
200 metrics.increment_exchanges(&route_id);
201
202 if let Err(e) = &result {
203 metrics.increment_errors(&route_id, e.classify());
204 }
205 }
206
207 match &result {
208 Ok(ex) => {
209 tracing::Span::current().record("status", "success");
210 cx.span().set_status(Status::Ok);
211
212 if detail_level >= DetailLevel::Medium {
213 tracing::Span::current()
214 .record("output_body_type", body_type_name(&ex.input.body));
215 cx.span().set_attribute(KeyValue::new(
216 "output_body_type",
217 body_type_name(&ex.input.body),
218 ));
219 }
220 }
221 Err(e) => {
222 let error_class = e.classify();
223 cx.span().set_status(Status::error(e.to_string()));
224 cx.span().add_event(
225 "error",
226 vec![
227 KeyValue::new("error.type", error_class.to_string()),
228 KeyValue::new("error.message", e.to_string()),
229 ],
230 );
231 tracing::Span::current().record("status", "error");
232 tracing::Span::current().record("error", e.to_string());
233 tracing::Span::current().record("error_type", error_class);
234 }
235 }
236
237 result
239 }
240 .instrument(tracing_span),
241 )
242 }
243}
244
245impl Clone for TracingProcessor {
246 fn clone(&self) -> Self {
247 Self {
248 inner: self.inner.clone(),
249 route_id: self.route_id.clone(),
250 step_id: self.step_id.clone(),
251 span_name: self.span_name.clone(),
252 step_index: self.step_index,
253 detail_level: self.detail_level.clone(),
254 metrics: self.metrics.clone(),
255 }
256 }
257}
258
259#[cfg(test)]
260mod tests {
261 use super::*;
273 use camel_api::{BoxProcessorExt, IdentityProcessor, Message, Value};
274 use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState};
275 use tower::ServiceExt;
276
277 #[tokio::test]
278 async fn test_tracing_processor_minimal() {
279 let inner = BoxProcessor::new(IdentityProcessor);
280 let mut tracer = TracingProcessor::new(
281 inner,
282 "test-route".to_string(),
283 0,
284 DetailLevel::Minimal,
285 None,
286 );
287
288 let exchange = Exchange::new(Message::default());
289 let result = tracer.ready().await.unwrap().call(exchange).await;
290
291 assert!(result.is_ok());
292 }
293
294 #[tokio::test]
295 async fn test_tracing_processor_medium_detail() {
296 let inner = BoxProcessor::new(IdentityProcessor);
297 let mut tracer = TracingProcessor::new(
298 inner,
299 "test-route".to_string(),
300 0,
301 DetailLevel::Medium,
302 None,
303 );
304
305 let exchange = Exchange::new(Message::default());
306 let result = tracer.ready().await.unwrap().call(exchange).await;
307
308 assert!(result.is_ok());
309 }
310
311 #[tokio::test]
312 async fn test_tracing_processor_full_detail() {
313 let inner = BoxProcessor::new(IdentityProcessor);
314 let mut tracer =
315 TracingProcessor::new(inner, "test-route".to_string(), 0, DetailLevel::Full, None);
316
317 let mut exchange = Exchange::new(Message::default());
318 exchange
319 .input
320 .headers
321 .insert("test".to_string(), Value::String("value".into()));
322
323 let result = tracer.ready().await.unwrap().call(exchange).await;
324
325 assert!(result.is_ok());
326 }
327
328 #[tokio::test]
329 async fn test_tracing_processor_clone() {
330 let inner = BoxProcessor::new(IdentityProcessor);
331 let tracer = TracingProcessor::new(
332 inner,
333 "test-route".to_string(),
334 1,
335 DetailLevel::Minimal,
336 None,
337 );
338
339 let mut cloned = tracer.clone();
340 let exchange = Exchange::new(Message::default());
341 let result = cloned.ready().await.unwrap().call(exchange).await;
342 assert!(result.is_ok());
343 }
344
345 #[tokio::test]
346 async fn test_tracing_processor_propagates_otel_context() {
347 let inner = BoxProcessor::new(IdentityProcessor);
348 let mut tracer = TracingProcessor::new(
349 inner,
350 "test-route".to_string(),
351 0,
352 DetailLevel::Minimal,
353 None,
354 );
355
356 let exchange = Exchange::new(Message::default());
358 assert!(
359 !exchange.otel_context.span().span_context().is_valid(),
360 "Initial context should have invalid span"
361 );
362
363 let result = tracer.ready().await.unwrap().call(exchange).await;
364
365 let output_exchange = result.unwrap();
367
368 let _span_context = output_exchange.otel_context.span().span_context();
373 }
374
375 #[tokio::test]
376 async fn test_tracing_processor_with_parent_context() {
377 let inner = BoxProcessor::new(IdentityProcessor);
378 let mut tracer = TracingProcessor::new(
379 inner,
380 "test-route".to_string(),
381 0,
382 DetailLevel::Minimal,
383 None,
384 );
385
386 let trace_id = TraceId::from_hex("12345678901234567890123456789012").unwrap();
388 let span_id = SpanId::from_hex("1234567890123456").unwrap();
389 let parent_span_context = SpanContext::new(
390 trace_id,
391 span_id,
392 TraceFlags::SAMPLED,
393 true, TraceState::default(),
395 );
396
397 let mut exchange = Exchange::new(Message::default());
399 exchange.otel_context = OtelContext::new().with_remote_span_context(parent_span_context);
400
401 let initial_span_context = exchange.otel_context.span().span_context().clone();
403
404 assert!(
406 exchange.otel_context.span().span_context().is_valid(),
407 "Parent context should be valid"
408 );
409 let _parent_trace_id = exchange.otel_context.span().span_context().trace_id();
410
411 let result = tracer.ready().await.unwrap().call(exchange).await;
412
413 let output_exchange = result.unwrap();
414
415 let output_span = output_exchange.otel_context.span();
418 let _output_trace_id = output_span.span_context().trace_id();
421
422 let output_span_context = output_span.span_context();
426 assert!(
429 !std::ptr::eq(&initial_span_context, output_span_context),
430 "exchange.otel_context should have been updated with a new child span context"
431 );
432 }
433
434 #[tokio::test]
435 async fn test_tracing_processor_records_error() {
436 let failing_processor = BoxProcessor::from_fn(|_ex: Exchange| async move {
438 Err(CamelError::ProcessorError("intentional test error".into()))
439 });
440
441 let mut tracer = TracingProcessor::new(
442 failing_processor,
443 "test-route".to_string(),
444 0,
445 DetailLevel::Minimal,
446 None,
447 );
448
449 let exchange = Exchange::new(Message::default());
450 let result = tracer.ready().await.unwrap().call(exchange).await;
451
452 assert!(result.is_err());
454 let err = result.unwrap_err();
455 assert!(err.to_string().contains("intentional test error"));
456
457 }
462
463 #[tokio::test]
464 async fn test_tracing_processor_span_name_format() {
465 let inner = BoxProcessor::new(IdentityProcessor);
466 let tracer =
467 TracingProcessor::new(inner, "my-route".to_string(), 5, DetailLevel::Minimal, None);
468
469 assert_eq!(tracer.span_name, "my-route:step-5");
470 }
471
472 #[tokio::test]
473 async fn test_tracing_processor_chained_propagation() {
474 let processor1 = BoxProcessor::new(IdentityProcessor);
476 let mut tracer1 = TracingProcessor::new(
477 processor1,
478 "route1".to_string(),
479 0,
480 DetailLevel::Minimal,
481 None,
482 );
483
484 let processor2 = BoxProcessor::new(IdentityProcessor);
485 let mut tracer2 = TracingProcessor::new(
486 processor2,
487 "route2".to_string(),
488 1,
489 DetailLevel::Minimal,
490 None,
491 );
492
493 let exchange = Exchange::new(Message::default());
494 let result1 = tracer1.ready().await.unwrap().call(exchange).await;
495 let exchange1 = result1.unwrap();
496
497 let result2 = tracer2.ready().await.unwrap().call(exchange1).await;
499 let exchange2 = result2.unwrap();
500
501 let _ = exchange2.otel_context;
504 }
505}