1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Instant;
5
6use tower::Service;
7use tower::ServiceExt;
8use tracing::Instrument;
9
10use crate::config::DetailLevel;
11use camel_api::{Body, BoxProcessor, CamelError, Exchange};
12
13fn body_type_name(body: &Body) -> &'static str {
15 match body {
16 Body::Empty => "empty",
17 Body::Bytes(_) => "bytes",
18 Body::Text(_) => "text",
19 Body::Json(_) => "json",
20 Body::Stream(_) => "stream",
21 }
22}
23
24pub struct TracingProcessor {
26 inner: BoxProcessor,
27 route_id: String,
28 step_id: String,
29 step_index: usize,
30 detail_level: DetailLevel,
31}
32
33impl TracingProcessor {
34 pub fn new(
36 inner: BoxProcessor,
37 route_id: String,
38 step_index: usize,
39 detail_level: DetailLevel,
40 ) -> Self {
41 Self {
42 inner,
43 route_id,
44 step_id: format!("step-{}", step_index),
45 step_index,
46 detail_level,
47 }
48 }
49}
50
51impl Service<Exchange> for TracingProcessor {
52 type Response = Exchange;
53 type Error = CamelError;
54 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
55
56 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
57 self.inner.poll_ready(cx)
58 }
59
60 fn call(&mut self, exchange: Exchange) -> Self::Future {
61 let start = Instant::now();
62 let span = tracing::info_span!(
63 target: "camel_tracer",
64 "step",
65 correlation_id = %exchange.correlation_id(),
66 route_id = %self.route_id,
67 step_id = %self.step_id,
68 step_index = self.step_index,
69 timestamp = %chrono::Utc::now().to_rfc3339(),
70 duration_ms = tracing::field::Empty,
71 status = tracing::field::Empty,
72 headers_count = tracing::field::Empty,
73 body_type = tracing::field::Empty,
74 has_error = tracing::field::Empty,
75 output_body_type = tracing::field::Empty,
76 header_0 = tracing::field::Empty,
77 header_1 = tracing::field::Empty,
78 header_2 = tracing::field::Empty,
79 error = tracing::field::Empty,
80 error_type = tracing::field::Empty,
81 );
82
83 if self.detail_level >= DetailLevel::Medium {
84 span.record("headers_count", exchange.input.headers.len() as u64);
85 span.record("body_type", body_type_name(&exchange.input.body));
86 span.record("has_error", exchange.has_error());
87 }
88
89 if self.detail_level >= DetailLevel::Full {
90 for (i, (k, v)) in exchange.input.headers.iter().take(3).enumerate() {
91 span.record(format!("header_{i}").as_str(), format!("{k}={v:?}"));
92 }
93 }
94
95 let mut inner = self.inner.clone();
96 let detail_level = self.detail_level.clone();
97
98 Box::pin(
99 async move {
100 let result = inner.ready().await?.call(exchange).await;
101
102 let duration_ms = start.elapsed().as_millis() as u64;
103 tracing::Span::current().record("duration_ms", duration_ms);
104
105 match &result {
106 Ok(ex) => {
107 tracing::Span::current().record("status", "success");
108 if detail_level >= DetailLevel::Medium {
109 tracing::Span::current()
110 .record("output_body_type", body_type_name(&ex.input.body));
111 }
112 }
113 Err(e) => {
114 tracing::Span::current().record("status", "error");
115 tracing::Span::current().record("error", e.to_string());
116 tracing::Span::current()
117 .record("error_type", std::any::type_name::<CamelError>());
118 }
119 }
120
121 result
122 }
123 .instrument(span),
124 )
125 }
126}
127
128impl Clone for TracingProcessor {
129 fn clone(&self) -> Self {
130 Self {
131 inner: self.inner.clone(),
132 route_id: self.route_id.clone(),
133 step_id: self.step_id.clone(),
134 step_index: self.step_index,
135 detail_level: self.detail_level.clone(),
136 }
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143 use camel_api::{IdentityProcessor, Message, Value};
144 use tower::ServiceExt;
145
146 #[tokio::test]
147 async fn test_tracing_processor_minimal() {
148 let inner = BoxProcessor::new(IdentityProcessor);
149 let mut tracer =
150 TracingProcessor::new(inner, "test-route".to_string(), 0, DetailLevel::Minimal);
151
152 let exchange = Exchange::new(Message::default());
153 let result = tracer.ready().await.unwrap().call(exchange).await;
154
155 assert!(result.is_ok());
156 }
157
158 #[tokio::test]
159 async fn test_tracing_processor_medium_detail() {
160 let inner = BoxProcessor::new(IdentityProcessor);
161 let mut tracer =
162 TracingProcessor::new(inner, "test-route".to_string(), 0, DetailLevel::Medium);
163
164 let exchange = Exchange::new(Message::default());
165 let result = tracer.ready().await.unwrap().call(exchange).await;
166
167 assert!(result.is_ok());
168 }
169
170 #[tokio::test]
171 async fn test_tracing_processor_full_detail() {
172 let inner = BoxProcessor::new(IdentityProcessor);
173 let mut tracer =
174 TracingProcessor::new(inner, "test-route".to_string(), 0, DetailLevel::Full);
175
176 let mut exchange = Exchange::new(Message::default());
177 exchange
178 .input
179 .headers
180 .insert("test".to_string(), Value::String("value".into()));
181
182 let result = tracer.ready().await.unwrap().call(exchange).await;
183
184 assert!(result.is_ok());
185 }
186
187 #[tokio::test]
188 async fn test_tracing_processor_clone() {
189 let inner = BoxProcessor::new(IdentityProcessor);
190 let tracer =
191 TracingProcessor::new(inner, "test-route".to_string(), 1, DetailLevel::Minimal);
192
193 let mut cloned = tracer.clone();
194 let exchange = Exchange::new(Message::default());
195 let result = cloned.ready().await.unwrap().call(exchange).await;
196 assert!(result.is_ok());
197 }
198}