Skip to main content

camel_component_log/
lib.rs

1//! Log component for rust-camel — routes exchange body and headers to the
2//! `tracing` subsystem at a configurable log level, primarily for debugging.
3//!
4//! Main types: `LogComponent`, `LogEndpoint`, `LogProducer`, `LogLevel`.
5
6use std::future::Future;
7use std::pin::Pin;
8use std::str::FromStr;
9use std::task::{Context, Poll};
10
11use tower::Service;
12use tracing::{debug, error, info, trace, warn};
13
14use camel_component_api::UriConfig;
15use camel_component_api::{BoxProcessor, CamelError, Exchange};
16use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
17
18// ---------------------------------------------------------------------------
19// LogLevel
20// ---------------------------------------------------------------------------
21
22/// Log level for the log component.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
24pub enum LogLevel {
25    Trace,
26    Debug,
27    #[default]
28    Info,
29    Warn,
30    Error,
31}
32
33impl FromStr for LogLevel {
34    type Err = String;
35
36    fn from_str(s: &str) -> Result<Self, Self::Err> {
37        match s.to_lowercase().as_str() {
38            "trace" => Ok(LogLevel::Trace),
39            "debug" => Ok(LogLevel::Debug),
40            "info" => Ok(LogLevel::Info),
41            "warn" | "warning" => Ok(LogLevel::Warn),
42            "error" => Ok(LogLevel::Error),
43            _ => Err(format!("Invalid log level: {}", s)),
44        }
45    }
46}
47
48// ---------------------------------------------------------------------------
49// LogConfig
50// ---------------------------------------------------------------------------
51
52/// Configuration parsed from a log URI.
53///
54/// Format: `log:category?level=info&showHeaders=true&showBody=true`
55#[derive(Debug, Clone, UriConfig)]
56#[uri_scheme = "log"]
57#[uri_config(crate = "camel_component_api")]
58pub struct LogConfig {
59    /// Log category (the path portion of the URI).
60    pub category: String,
61    /// Log level. Default: Info.
62    #[uri_param(default = "Info")]
63    pub level: LogLevel,
64    /// Whether to include headers in the log output.
65    #[uri_param(name = "showHeaders", default = "false")]
66    pub show_headers: bool,
67    /// Whether to include the body in the log output.
68    #[uri_param(name = "showBody", default = "true")]
69    pub show_body: bool,
70}
71
72// ---------------------------------------------------------------------------
73// LogComponent
74// ---------------------------------------------------------------------------
75
76/// The Log component logs exchange information using `tracing`.
77pub struct LogComponent;
78
79impl LogComponent {
80    pub fn new() -> Self {
81        Self
82    }
83}
84
85impl Default for LogComponent {
86    fn default() -> Self {
87        Self::new()
88    }
89}
90
91impl Component for LogComponent {
92    fn scheme(&self) -> &str {
93        "log"
94    }
95
96    fn create_endpoint(
97        &self,
98        uri: &str,
99        _ctx: &dyn camel_component_api::ComponentContext,
100    ) -> Result<Box<dyn Endpoint>, CamelError> {
101        let config = LogConfig::from_uri(uri)?;
102        Ok(Box::new(LogEndpoint {
103            uri: uri.to_string(),
104            config,
105        }))
106    }
107}
108
109// ---------------------------------------------------------------------------
110// LogEndpoint
111// ---------------------------------------------------------------------------
112
113struct LogEndpoint {
114    uri: String,
115    config: LogConfig,
116}
117
118impl Endpoint for LogEndpoint {
119    fn uri(&self) -> &str {
120        &self.uri
121    }
122
123    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
124        Err(CamelError::EndpointCreationFailed(
125            "log endpoint does not support consumers".to_string(),
126        ))
127    }
128
129    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
130        Ok(BoxProcessor::new(LogProducer {
131            config: self.config.clone(),
132        }))
133    }
134}
135
136// ---------------------------------------------------------------------------
137// LogProducer
138// ---------------------------------------------------------------------------
139
140#[derive(Clone)]
141struct LogProducer {
142    config: LogConfig,
143}
144
145impl LogProducer {
146    fn format_exchange(&self, exchange: &Exchange) -> String {
147        let mut parts = Vec::new();
148
149        if self.config.show_body {
150            let body_str = match &exchange.input.body {
151                camel_component_api::Body::Empty => "[empty]".to_string(),
152                camel_component_api::Body::Text(s) => s.clone(),
153                camel_component_api::Body::Json(v) => v.to_string(),
154                camel_component_api::Body::Xml(s) => s.clone(),
155                camel_component_api::Body::Bytes(b) => format!("[{} bytes]", b.len()),
156                camel_component_api::Body::Stream(s) => {
157                    format!("[Stream: origin={:?}]", s.metadata.origin)
158                }
159            };
160            parts.push(format!("Body: {body_str}"));
161        }
162
163        if self.config.show_headers && !exchange.input.headers.is_empty() {
164            let headers: Vec<String> = exchange
165                .input
166                .headers
167                .iter()
168                .map(|(k, v)| format!("{k}={v}"))
169                .collect();
170            parts.push(format!("Headers: {{{}}}", headers.join(", ")));
171        }
172
173        if parts.is_empty() {
174            format!("[{}] Exchange received", self.config.category)
175        } else {
176            format!("[{}] {}", self.config.category, parts.join(" | "))
177        }
178    }
179}
180
181impl Service<Exchange> for LogProducer {
182    type Response = Exchange;
183    type Error = CamelError;
184    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
185
186    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
187        Poll::Ready(Ok(()))
188    }
189
190    fn call(&mut self, exchange: Exchange) -> Self::Future {
191        let msg = self.format_exchange(&exchange);
192        let level = self.config.level;
193
194        Box::pin(async move {
195            match level {
196                LogLevel::Trace => trace!("{msg}"),
197                LogLevel::Debug => debug!("{msg}"),
198                LogLevel::Info => info!("{msg}"),
199                LogLevel::Warn => warn!("{msg}"),
200                LogLevel::Error => error!("{msg}"),
201            }
202
203            Ok(exchange)
204        })
205    }
206}
207
208// ---------------------------------------------------------------------------
209// Tests
210// ---------------------------------------------------------------------------
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use camel_component_api::Body;
216    use camel_component_api::Message;
217    use camel_component_api::NoOpComponentContext;
218    use tower::ServiceExt;
219
220    fn test_producer_ctx() -> ProducerContext {
221        ProducerContext::new()
222    }
223
224    #[test]
225    fn test_log_config_defaults() {
226        let config = LogConfig::from_uri("log:myCategory").unwrap();
227        assert_eq!(config.category, "myCategory");
228        assert_eq!(config.level, LogLevel::Info);
229        assert!(!config.show_headers);
230        assert!(config.show_body);
231    }
232
233    #[test]
234    fn test_log_config_with_params() {
235        let config =
236            LogConfig::from_uri("log:app?level=debug&showHeaders=true&showBody=false").unwrap();
237        assert_eq!(config.category, "app");
238        assert_eq!(config.level, LogLevel::Debug);
239        assert!(config.show_headers);
240        assert!(!config.show_body);
241    }
242
243    #[test]
244    fn test_log_config_wrong_scheme() {
245        let result = LogConfig::from_uri("timer:tick");
246        assert!(result.is_err());
247    }
248
249    #[test]
250    fn test_log_component_scheme() {
251        let component = LogComponent::new();
252        assert_eq!(component.scheme(), "log");
253    }
254
255    #[test]
256    fn test_log_component_default() {
257        let component = LogComponent;
258        assert_eq!(component.scheme(), "log");
259    }
260
261    #[test]
262    fn test_log_level_from_str_variants() {
263        assert_eq!("trace".parse::<LogLevel>().unwrap(), LogLevel::Trace);
264        assert_eq!("DEBUG".parse::<LogLevel>().unwrap(), LogLevel::Debug);
265        assert_eq!("Info".parse::<LogLevel>().unwrap(), LogLevel::Info);
266        assert_eq!("warning".parse::<LogLevel>().unwrap(), LogLevel::Warn);
267        assert_eq!("error".parse::<LogLevel>().unwrap(), LogLevel::Error);
268    }
269
270    #[test]
271    fn test_log_level_from_str_invalid() {
272        let err = "nope".parse::<LogLevel>().unwrap_err();
273        assert_eq!(err, "Invalid log level: nope");
274    }
275
276    #[test]
277    fn test_log_config_invalid_level_falls_back_to_default() {
278        let config = LogConfig::from_uri("log:test?level=invalid").unwrap();
279        assert_eq!(config.level, LogLevel::Info);
280    }
281
282    #[test]
283    fn test_log_endpoint_uri() {
284        let component = LogComponent::new();
285        let endpoint = component
286            .create_endpoint("log:uri-check", &NoOpComponentContext)
287            .unwrap();
288        assert_eq!(endpoint.uri(), "log:uri-check");
289    }
290
291    #[test]
292    fn test_log_endpoint_no_consumer() {
293        let component = LogComponent::new();
294        let endpoint = component
295            .create_endpoint("log:info", &NoOpComponentContext)
296            .unwrap();
297        assert!(endpoint.create_consumer().is_err());
298    }
299
300    #[test]
301    fn test_log_endpoint_creates_producer() {
302        let ctx = test_producer_ctx();
303        let component = LogComponent::new();
304        let endpoint = component
305            .create_endpoint("log:info", &NoOpComponentContext)
306            .unwrap();
307        assert!(endpoint.create_producer(&ctx).is_ok());
308    }
309
310    #[tokio::test]
311    async fn test_log_producer_processes_exchange() {
312        let ctx = test_producer_ctx();
313        let component = LogComponent::new();
314        let endpoint = component
315            .create_endpoint("log:test?showHeaders=true", &NoOpComponentContext)
316            .unwrap();
317        let producer = endpoint.create_producer(&ctx).unwrap();
318
319        let mut exchange = Exchange::new(Message::new("hello world"));
320        exchange
321            .input
322            .set_header("source", serde_json::Value::String("test".into()));
323
324        let result = producer.oneshot(exchange).await.unwrap();
325        // Log producer passes exchange through unchanged
326        assert_eq!(result.input.body.as_text(), Some("hello world"));
327    }
328
329    #[test]
330    fn test_format_exchange_without_body_or_headers() {
331        let producer = LogProducer {
332            config: LogConfig {
333                category: "cat".to_string(),
334                level: LogLevel::Info,
335                show_headers: false,
336                show_body: false,
337            },
338        };
339        let exchange = Exchange::new(Message::new("ignored"));
340        let formatted = producer.format_exchange(&exchange);
341        assert_eq!(formatted, "[cat] Exchange received");
342    }
343
344    #[test]
345    fn test_format_exchange_body_variants() {
346        let base = LogProducer {
347            config: LogConfig {
348                category: "cat".to_string(),
349                level: LogLevel::Info,
350                show_headers: false,
351                show_body: true,
352            },
353        };
354
355        let empty = Exchange::new(Message::default());
356        assert!(base.format_exchange(&empty).contains("Body: [empty]"));
357
358        let mut json_msg = Message::new("");
359        json_msg.body = Body::Json(serde_json::json!({"k":"v"}));
360        let json_ex = Exchange::new(json_msg);
361        assert!(
362            base.format_exchange(&json_ex)
363                .contains("Body: {\"k\":\"v\"}")
364        );
365
366        let mut xml_msg = Message::new("");
367        xml_msg.body = Body::Xml("<a/>".to_string());
368        let xml_ex = Exchange::new(xml_msg);
369        assert!(base.format_exchange(&xml_ex).contains("Body: <a/>"));
370
371        let mut bytes_msg = Message::new("");
372        bytes_msg.body = Body::Bytes(b"abc".to_vec().into());
373        let bytes_ex = Exchange::new(bytes_msg);
374        assert!(base.format_exchange(&bytes_ex).contains("Body: [3 bytes]"));
375    }
376}