Skip to main content

camel_core/
tracer.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Instant;
5
6use tower::Service;
7use tower::ServiceExt;
8use tracing::Instrument;
9
10use crate::config::DetailLevel;
11use camel_api::{Body, BoxProcessor, CamelError, Exchange};
12
13/// Returns a human-readable name for the body type variant.
14fn body_type_name(body: &Body) -> &'static str {
15    match body {
16        Body::Empty => "empty",
17        Body::Bytes(_) => "bytes",
18        Body::Text(_) => "text",
19        Body::Json(_) => "json",
20        Body::Stream(_) => "stream",
21    }
22}
23
24/// A processor wrapper that emits tracing spans for each step.
25pub struct TracingProcessor {
26    inner: BoxProcessor,
27    route_id: String,
28    step_id: String,
29    step_index: usize,
30    detail_level: DetailLevel,
31}
32
33impl TracingProcessor {
34    /// Wrap a processor with tracing.
35    pub fn new(
36        inner: BoxProcessor,
37        route_id: String,
38        step_index: usize,
39        detail_level: DetailLevel,
40    ) -> Self {
41        Self {
42            inner,
43            route_id,
44            step_id: format!("step-{}", step_index),
45            step_index,
46            detail_level,
47        }
48    }
49}
50
51impl Service<Exchange> for TracingProcessor {
52    type Response = Exchange;
53    type Error = CamelError;
54    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
55
56    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
57        self.inner.poll_ready(cx)
58    }
59
60    fn call(&mut self, exchange: Exchange) -> Self::Future {
61        let start = Instant::now();
62        let span = tracing::info_span!(
63            target: "camel_tracer",
64            "step",
65            correlation_id = %exchange.correlation_id(),
66            route_id = %self.route_id,
67            step_id = %self.step_id,
68            step_index = self.step_index,
69            timestamp = %chrono::Utc::now().to_rfc3339(),
70            duration_ms = tracing::field::Empty,
71            status = tracing::field::Empty,
72            headers_count = tracing::field::Empty,
73            body_type = tracing::field::Empty,
74            has_error = tracing::field::Empty,
75            output_body_type = tracing::field::Empty,
76            header_0 = tracing::field::Empty,
77            header_1 = tracing::field::Empty,
78            header_2 = tracing::field::Empty,
79            error = tracing::field::Empty,
80            error_type = tracing::field::Empty,
81        );
82
83        if self.detail_level >= DetailLevel::Medium {
84            span.record("headers_count", exchange.input.headers.len() as u64);
85            span.record("body_type", body_type_name(&exchange.input.body));
86            span.record("has_error", exchange.has_error());
87        }
88
89        if self.detail_level >= DetailLevel::Full {
90            for (i, (k, v)) in exchange.input.headers.iter().take(3).enumerate() {
91                span.record(format!("header_{i}").as_str(), format!("{k}={v:?}"));
92            }
93        }
94
95        let mut inner = self.inner.clone();
96        let detail_level = self.detail_level.clone();
97
98        Box::pin(
99            async move {
100                let result = inner.ready().await?.call(exchange).await;
101
102                let duration_ms = start.elapsed().as_millis() as u64;
103                tracing::Span::current().record("duration_ms", duration_ms);
104
105                match &result {
106                    Ok(ex) => {
107                        tracing::Span::current().record("status", "success");
108                        if detail_level >= DetailLevel::Medium {
109                            tracing::Span::current()
110                                .record("output_body_type", body_type_name(&ex.input.body));
111                        }
112                    }
113                    Err(e) => {
114                        tracing::Span::current().record("status", "error");
115                        tracing::Span::current().record("error", e.to_string());
116                        tracing::Span::current()
117                            .record("error_type", std::any::type_name::<CamelError>());
118                    }
119                }
120
121                result
122            }
123            .instrument(span),
124        )
125    }
126}
127
128impl Clone for TracingProcessor {
129    fn clone(&self) -> Self {
130        Self {
131            inner: self.inner.clone(),
132            route_id: self.route_id.clone(),
133            step_id: self.step_id.clone(),
134            step_index: self.step_index,
135            detail_level: self.detail_level.clone(),
136        }
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    use camel_api::{IdentityProcessor, Message, Value};
144    use tower::ServiceExt;
145
146    #[tokio::test]
147    async fn test_tracing_processor_minimal() {
148        let inner = BoxProcessor::new(IdentityProcessor);
149        let mut tracer =
150            TracingProcessor::new(inner, "test-route".to_string(), 0, DetailLevel::Minimal);
151
152        let exchange = Exchange::new(Message::default());
153        let result = tracer.ready().await.unwrap().call(exchange).await;
154
155        assert!(result.is_ok());
156    }
157
158    #[tokio::test]
159    async fn test_tracing_processor_medium_detail() {
160        let inner = BoxProcessor::new(IdentityProcessor);
161        let mut tracer =
162            TracingProcessor::new(inner, "test-route".to_string(), 0, DetailLevel::Medium);
163
164        let exchange = Exchange::new(Message::default());
165        let result = tracer.ready().await.unwrap().call(exchange).await;
166
167        assert!(result.is_ok());
168    }
169
170    #[tokio::test]
171    async fn test_tracing_processor_full_detail() {
172        let inner = BoxProcessor::new(IdentityProcessor);
173        let mut tracer =
174            TracingProcessor::new(inner, "test-route".to_string(), 0, DetailLevel::Full);
175
176        let mut exchange = Exchange::new(Message::default());
177        exchange
178            .input
179            .headers
180            .insert("test".to_string(), Value::String("value".into()));
181
182        let result = tracer.ready().await.unwrap().call(exchange).await;
183
184        assert!(result.is_ok());
185    }
186
187    #[tokio::test]
188    async fn test_tracing_processor_clone() {
189        let inner = BoxProcessor::new(IdentityProcessor);
190        let tracer =
191            TracingProcessor::new(inner, "test-route".to_string(), 1, DetailLevel::Minimal);
192
193        let mut cloned = tracer.clone();
194        let exchange = Exchange::new(Message::default());
195        let result = cloned.ready().await.unwrap().call(exchange).await;
196        assert!(result.is_ok());
197    }
198}