1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5use std::time::Instant;
6
7use opentelemetry::trace::{SpanKind, Status, TraceContextExt, Tracer};
8use opentelemetry::{Context as OtelContext, KeyValue, global};
9use tower::Service;
10use tower::ServiceExt;
11use tracing::Instrument;
12
13use crate::shared::observability::domain::DetailLevel;
14use camel_api::metrics::MetricsCollector;
15use camel_api::{Body, BoxProcessor, CamelError, Exchange};
16
17struct SpanEndGuard(OtelContext);
21
22impl Drop for SpanEndGuard {
23 fn drop(&mut self) {
24 self.0.span().end();
25 }
26}
27
28fn body_type_name(body: &Body) -> &'static str {
30 match body {
31 Body::Empty => "empty",
32 Body::Bytes(_) => "bytes",
33 Body::Text(_) => "text",
34 Body::Json(_) => "json",
35 Body::Xml(_) => "xml",
36 Body::Stream(_) => "stream",
37 }
38}
39
40pub struct TracingProcessor {
49 inner: BoxProcessor,
50 route_id: String,
51 step_id: String,
52 step_index: usize,
53 detail_level: DetailLevel,
54 metrics: Option<Arc<dyn MetricsCollector>>,
55}
56
57impl TracingProcessor {
58 pub fn new(
60 inner: BoxProcessor,
61 route_id: String,
62 step_index: usize,
63 detail_level: DetailLevel,
64 metrics: Option<Arc<dyn MetricsCollector>>,
65 ) -> Self {
66 Self {
67 inner,
68 route_id,
69 step_id: format!("step-{}", step_index),
70 step_index,
71 detail_level,
72 metrics,
73 }
74 }
75
76 fn span_name(&self) -> String {
78 format!("{}/{}", self.route_id, self.step_id)
79 }
80}
81
82impl Service<Exchange> for TracingProcessor {
83 type Response = Exchange;
84 type Error = CamelError;
85 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
86
87 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
88 self.inner.poll_ready(cx)
89 }
90
91 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
92 let start = Instant::now();
93 let span_name = self.span_name();
94
95 let tracer = global::tracer("camel-core");
97
98 let parent_cx = exchange.otel_context.clone();
100
101 let mut attributes = vec![
103 KeyValue::new("correlation_id", exchange.correlation_id().to_string()),
104 KeyValue::new("route_id", self.route_id.clone()),
105 KeyValue::new("step_id", self.step_id.clone()),
106 KeyValue::new("step_index", self.step_index as i64),
107 ];
108
109 if self.detail_level >= DetailLevel::Medium {
110 attributes.push(KeyValue::new(
111 "headers_count",
112 exchange.input.headers.len() as i64,
113 ));
114 attributes.push(KeyValue::new(
115 "body_type",
116 body_type_name(&exchange.input.body),
117 ));
118 attributes.push(KeyValue::new("has_error", exchange.has_error()));
119 }
120
121 let span = tracer
123 .span_builder(span_name.clone())
124 .with_kind(SpanKind::Internal)
125 .with_attributes(attributes)
126 .start_with_context(&tracer, &parent_cx);
127
128 let cx = OtelContext::current_with_span(span);
130
131 exchange.otel_context = cx.clone();
133
134 let tracing_span = tracing::info_span!(
136 target: "camel_tracer",
137 "step",
138 correlation_id = %exchange.correlation_id(),
139 route_id = %self.route_id,
140 step_id = %self.step_id,
141 step_index = self.step_index,
142 timestamp = %chrono::Utc::now().to_rfc3339(),
143 duration_ms = tracing::field::Empty,
144 status = tracing::field::Empty,
145 headers_count = tracing::field::Empty,
146 body_type = tracing::field::Empty,
147 has_error = tracing::field::Empty,
148 output_body_type = tracing::field::Empty,
149 header_0 = tracing::field::Empty,
150 header_1 = tracing::field::Empty,
151 header_2 = tracing::field::Empty,
152 error = tracing::field::Empty,
153 error_type = tracing::field::Empty,
154 );
155
156 if self.detail_level >= DetailLevel::Medium {
157 tracing_span.record("headers_count", exchange.input.headers.len() as u64);
158 tracing_span.record("body_type", body_type_name(&exchange.input.body));
159 tracing_span.record("has_error", exchange.has_error());
160 }
161
162 if self.detail_level >= DetailLevel::Full {
163 for (i, (k, v)) in exchange.input.headers.iter().take(3).enumerate() {
164 tracing_span.record(format!("header_{i}").as_str(), format!("{k}={v:?}"));
165 }
166 }
167
168 let mut inner = self.inner.clone();
169 let detail_level = self.detail_level.clone();
170 let metrics = self.metrics.clone();
171 let route_id = self.route_id.clone();
172
173 Box::pin(
174 async move {
175 let _guard = SpanEndGuard(cx.clone());
181
182 let result = inner.ready().await?.call(exchange).await;
183
184 let duration = start.elapsed();
185 let duration_ms = duration.as_millis() as u64;
186 tracing::Span::current().record("duration_ms", duration_ms);
187
188 cx.span()
190 .set_attribute(KeyValue::new("duration_ms", duration_ms as i64));
191
192 if let Some(ref metrics) = metrics {
194 metrics.record_exchange_duration(&route_id, duration);
195 metrics.increment_exchanges(&route_id);
196
197 if let Err(e) = &result {
198 metrics.increment_errors(&route_id, &e.to_string());
199 }
200 }
201
202 match &result {
203 Ok(ex) => {
204 tracing::Span::current().record("status", "success");
205 cx.span().set_status(Status::Ok);
206
207 if detail_level >= DetailLevel::Medium {
208 tracing::Span::current()
209 .record("output_body_type", body_type_name(&ex.input.body));
210 cx.span().set_attribute(KeyValue::new(
211 "output_body_type",
212 body_type_name(&ex.input.body),
213 ));
214 }
215 }
216 Err(e) => {
217 tracing::Span::current().record("status", "error");
218 tracing::Span::current().record("error", e.to_string());
219 tracing::Span::current()
220 .record("error_type", std::any::type_name::<CamelError>());
221
222 cx.span().set_status(Status::error(e.to_string()));
224 cx.span().set_attribute(KeyValue::new(
225 "error_type",
226 std::any::type_name::<CamelError>(),
227 ));
228 }
229 }
230
231 result
233 }
234 .instrument(tracing_span),
235 )
236 }
237}
238
239impl Clone for TracingProcessor {
240 fn clone(&self) -> Self {
241 Self {
242 inner: self.inner.clone(),
243 route_id: self.route_id.clone(),
244 step_id: self.step_id.clone(),
245 step_index: self.step_index,
246 detail_level: self.detail_level.clone(),
247 metrics: self.metrics.clone(),
248 }
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
266 use camel_api::{BoxProcessorExt, IdentityProcessor, Message, Value};
267 use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState};
268 use tower::ServiceExt;
269
270 #[tokio::test]
271 async fn test_tracing_processor_minimal() {
272 let inner = BoxProcessor::new(IdentityProcessor);
273 let mut tracer = TracingProcessor::new(
274 inner,
275 "test-route".to_string(),
276 0,
277 DetailLevel::Minimal,
278 None,
279 );
280
281 let exchange = Exchange::new(Message::default());
282 let result = tracer.ready().await.unwrap().call(exchange).await;
283
284 assert!(result.is_ok());
285 }
286
287 #[tokio::test]
288 async fn test_tracing_processor_medium_detail() {
289 let inner = BoxProcessor::new(IdentityProcessor);
290 let mut tracer = TracingProcessor::new(
291 inner,
292 "test-route".to_string(),
293 0,
294 DetailLevel::Medium,
295 None,
296 );
297
298 let exchange = Exchange::new(Message::default());
299 let result = tracer.ready().await.unwrap().call(exchange).await;
300
301 assert!(result.is_ok());
302 }
303
304 #[tokio::test]
305 async fn test_tracing_processor_full_detail() {
306 let inner = BoxProcessor::new(IdentityProcessor);
307 let mut tracer =
308 TracingProcessor::new(inner, "test-route".to_string(), 0, DetailLevel::Full, None);
309
310 let mut exchange = Exchange::new(Message::default());
311 exchange
312 .input
313 .headers
314 .insert("test".to_string(), Value::String("value".into()));
315
316 let result = tracer.ready().await.unwrap().call(exchange).await;
317
318 assert!(result.is_ok());
319 }
320
321 #[tokio::test]
322 async fn test_tracing_processor_clone() {
323 let inner = BoxProcessor::new(IdentityProcessor);
324 let tracer = TracingProcessor::new(
325 inner,
326 "test-route".to_string(),
327 1,
328 DetailLevel::Minimal,
329 None,
330 );
331
332 let mut cloned = tracer.clone();
333 let exchange = Exchange::new(Message::default());
334 let result = cloned.ready().await.unwrap().call(exchange).await;
335 assert!(result.is_ok());
336 }
337
338 #[tokio::test]
339 async fn test_tracing_processor_propagates_otel_context() {
340 let inner = BoxProcessor::new(IdentityProcessor);
341 let mut tracer = TracingProcessor::new(
342 inner,
343 "test-route".to_string(),
344 0,
345 DetailLevel::Minimal,
346 None,
347 );
348
349 let exchange = Exchange::new(Message::default());
351 assert!(
352 !exchange.otel_context.span().span_context().is_valid(),
353 "Initial context should have invalid span"
354 );
355
356 let result = tracer.ready().await.unwrap().call(exchange).await;
357
358 let output_exchange = result.unwrap();
360
361 let _span_context = output_exchange.otel_context.span().span_context();
366 }
367
368 #[tokio::test]
369 async fn test_tracing_processor_with_parent_context() {
370 let inner = BoxProcessor::new(IdentityProcessor);
371 let mut tracer = TracingProcessor::new(
372 inner,
373 "test-route".to_string(),
374 0,
375 DetailLevel::Minimal,
376 None,
377 );
378
379 let trace_id = TraceId::from_hex("12345678901234567890123456789012").unwrap();
381 let span_id = SpanId::from_hex("1234567890123456").unwrap();
382 let parent_span_context = SpanContext::new(
383 trace_id,
384 span_id,
385 TraceFlags::SAMPLED,
386 true, TraceState::default(),
388 );
389
390 let mut exchange = Exchange::new(Message::default());
392 exchange.otel_context = OtelContext::new().with_remote_span_context(parent_span_context);
393
394 let initial_span_context = exchange.otel_context.span().span_context().clone();
396
397 assert!(
399 exchange.otel_context.span().span_context().is_valid(),
400 "Parent context should be valid"
401 );
402 let _parent_trace_id = exchange.otel_context.span().span_context().trace_id();
403
404 let result = tracer.ready().await.unwrap().call(exchange).await;
405
406 let output_exchange = result.unwrap();
407
408 let output_span = output_exchange.otel_context.span();
411 let _output_trace_id = output_span.span_context().trace_id();
414
415 let output_span_context = output_span.span_context();
419 assert!(
422 !std::ptr::eq(&initial_span_context, output_span_context),
423 "exchange.otel_context should have been updated with a new child span context"
424 );
425 }
426
427 #[tokio::test]
428 async fn test_tracing_processor_records_error() {
429 let failing_processor = BoxProcessor::from_fn(|_ex: Exchange| async move {
431 Err(CamelError::ProcessorError("intentional test error".into()))
432 });
433
434 let mut tracer = TracingProcessor::new(
435 failing_processor,
436 "test-route".to_string(),
437 0,
438 DetailLevel::Minimal,
439 None,
440 );
441
442 let exchange = Exchange::new(Message::default());
443 let result = tracer.ready().await.unwrap().call(exchange).await;
444
445 assert!(result.is_err());
447 let err = result.unwrap_err();
448 assert!(err.to_string().contains("intentional test error"));
449
450 }
455
456 #[tokio::test]
457 async fn test_tracing_processor_span_name_format() {
458 let inner = BoxProcessor::new(IdentityProcessor);
459 let tracer =
460 TracingProcessor::new(inner, "my-route".to_string(), 5, DetailLevel::Minimal, None);
461
462 assert_eq!(tracer.span_name(), "my-route/step-5");
464 }
465
466 #[tokio::test]
467 async fn test_tracing_processor_chained_propagation() {
468 let processor1 = BoxProcessor::new(IdentityProcessor);
470 let mut tracer1 = TracingProcessor::new(
471 processor1,
472 "route1".to_string(),
473 0,
474 DetailLevel::Minimal,
475 None,
476 );
477
478 let processor2 = BoxProcessor::new(IdentityProcessor);
479 let mut tracer2 = TracingProcessor::new(
480 processor2,
481 "route2".to_string(),
482 1,
483 DetailLevel::Minimal,
484 None,
485 );
486
487 let exchange = Exchange::new(Message::default());
488 let result1 = tracer1.ready().await.unwrap().call(exchange).await;
489 let exchange1 = result1.unwrap();
490
491 let result2 = tracer2.ready().await.unwrap().call(exchange1).await;
493 let exchange2 = result2.unwrap();
494
495 let _ = exchange2.otel_context;
498 }
499}