Skip to main content

camel_component_log/
lib.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::task::{Context, Poll};
5
6use tower::Service;
7use tracing::{debug, error, info, trace, warn};
8
9use camel_component_api::UriConfig;
10use camel_component_api::{BoxProcessor, CamelError, Exchange};
11use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
12
13// ---------------------------------------------------------------------------
14// LogLevel
15// ---------------------------------------------------------------------------
16
17/// Log level for the log component.
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
19pub enum LogLevel {
20    Trace,
21    Debug,
22    #[default]
23    Info,
24    Warn,
25    Error,
26}
27
28impl FromStr for LogLevel {
29    type Err = String;
30
31    fn from_str(s: &str) -> Result<Self, Self::Err> {
32        match s.to_lowercase().as_str() {
33            "trace" => Ok(LogLevel::Trace),
34            "debug" => Ok(LogLevel::Debug),
35            "info" => Ok(LogLevel::Info),
36            "warn" | "warning" => Ok(LogLevel::Warn),
37            "error" => Ok(LogLevel::Error),
38            _ => Err(format!("Invalid log level: {}", s)),
39        }
40    }
41}
42
43// ---------------------------------------------------------------------------
44// LogConfig
45// ---------------------------------------------------------------------------
46
47/// Configuration parsed from a log URI.
48///
49/// Format: `log:category?level=info&showHeaders=true&showBody=true`
50#[derive(Debug, Clone, UriConfig)]
51#[uri_scheme = "log"]
52#[uri_config(crate = "camel_component_api")]
53pub struct LogConfig {
54    /// Log category (the path portion of the URI).
55    pub category: String,
56    /// Log level. Default: Info.
57    #[uri_param(default = "Info")]
58    pub level: LogLevel,
59    /// Whether to include headers in the log output.
60    #[uri_param(name = "showHeaders", default = "false")]
61    pub show_headers: bool,
62    /// Whether to include the body in the log output.
63    #[uri_param(name = "showBody", default = "true")]
64    pub show_body: bool,
65}
66
67// ---------------------------------------------------------------------------
68// LogComponent
69// ---------------------------------------------------------------------------
70
71/// The Log component logs exchange information using `tracing`.
72pub struct LogComponent;
73
74impl LogComponent {
75    pub fn new() -> Self {
76        Self
77    }
78}
79
80impl Default for LogComponent {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86impl Component for LogComponent {
87    fn scheme(&self) -> &str {
88        "log"
89    }
90
91    fn create_endpoint(
92        &self,
93        uri: &str,
94        _ctx: &dyn camel_component_api::ComponentContext,
95    ) -> Result<Box<dyn Endpoint>, CamelError> {
96        let config = LogConfig::from_uri(uri)?;
97        Ok(Box::new(LogEndpoint {
98            uri: uri.to_string(),
99            config,
100        }))
101    }
102}
103
104// ---------------------------------------------------------------------------
105// LogEndpoint
106// ---------------------------------------------------------------------------
107
108struct LogEndpoint {
109    uri: String,
110    config: LogConfig,
111}
112
113impl Endpoint for LogEndpoint {
114    fn uri(&self) -> &str {
115        &self.uri
116    }
117
118    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
119        Err(CamelError::EndpointCreationFailed(
120            "log endpoint does not support consumers".to_string(),
121        ))
122    }
123
124    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
125        Ok(BoxProcessor::new(LogProducer {
126            config: self.config.clone(),
127        }))
128    }
129}
130
131// ---------------------------------------------------------------------------
132// LogProducer
133// ---------------------------------------------------------------------------
134
135#[derive(Clone)]
136struct LogProducer {
137    config: LogConfig,
138}
139
140impl LogProducer {
141    fn format_exchange(&self, exchange: &Exchange) -> String {
142        let mut parts = Vec::new();
143
144        if self.config.show_body {
145            let body_str = match &exchange.input.body {
146                camel_component_api::Body::Empty => "[empty]".to_string(),
147                camel_component_api::Body::Text(s) => s.clone(),
148                camel_component_api::Body::Json(v) => v.to_string(),
149                camel_component_api::Body::Xml(s) => s.clone(),
150                camel_component_api::Body::Bytes(b) => format!("[{} bytes]", b.len()),
151                camel_component_api::Body::Stream(s) => {
152                    format!("[Stream: origin={:?}]", s.metadata.origin)
153                }
154            };
155            parts.push(format!("Body: {body_str}"));
156        }
157
158        if self.config.show_headers && !exchange.input.headers.is_empty() {
159            let headers: Vec<String> = exchange
160                .input
161                .headers
162                .iter()
163                .map(|(k, v)| format!("{k}={v}"))
164                .collect();
165            parts.push(format!("Headers: {{{}}}", headers.join(", ")));
166        }
167
168        if parts.is_empty() {
169            format!("[{}] Exchange received", self.config.category)
170        } else {
171            format!("[{}] {}", self.config.category, parts.join(" | "))
172        }
173    }
174}
175
176impl Service<Exchange> for LogProducer {
177    type Response = Exchange;
178    type Error = CamelError;
179    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
180
181    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
182        Poll::Ready(Ok(()))
183    }
184
185    fn call(&mut self, exchange: Exchange) -> Self::Future {
186        let msg = self.format_exchange(&exchange);
187        let level = self.config.level;
188
189        Box::pin(async move {
190            match level {
191                LogLevel::Trace => trace!("{msg}"),
192                LogLevel::Debug => debug!("{msg}"),
193                LogLevel::Info => info!("{msg}"),
194                LogLevel::Warn => warn!("{msg}"),
195                LogLevel::Error => error!("{msg}"),
196            }
197
198            Ok(exchange)
199        })
200    }
201}
202
203// ---------------------------------------------------------------------------
204// Tests
205// ---------------------------------------------------------------------------
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use camel_component_api::Body;
211    use camel_component_api::Message;
212    use camel_component_api::NoOpComponentContext;
213    use tower::ServiceExt;
214
215    fn test_producer_ctx() -> ProducerContext {
216        ProducerContext::new()
217    }
218
219    #[test]
220    fn test_log_config_defaults() {
221        let config = LogConfig::from_uri("log:myCategory").unwrap();
222        assert_eq!(config.category, "myCategory");
223        assert_eq!(config.level, LogLevel::Info);
224        assert!(!config.show_headers);
225        assert!(config.show_body);
226    }
227
228    #[test]
229    fn test_log_config_with_params() {
230        let config =
231            LogConfig::from_uri("log:app?level=debug&showHeaders=true&showBody=false").unwrap();
232        assert_eq!(config.category, "app");
233        assert_eq!(config.level, LogLevel::Debug);
234        assert!(config.show_headers);
235        assert!(!config.show_body);
236    }
237
238    #[test]
239    fn test_log_config_wrong_scheme() {
240        let result = LogConfig::from_uri("timer:tick");
241        assert!(result.is_err());
242    }
243
244    #[test]
245    fn test_log_component_scheme() {
246        let component = LogComponent::new();
247        assert_eq!(component.scheme(), "log");
248    }
249
250    #[test]
251    fn test_log_component_default() {
252        let component = LogComponent;
253        assert_eq!(component.scheme(), "log");
254    }
255
256    #[test]
257    fn test_log_level_from_str_variants() {
258        assert_eq!("trace".parse::<LogLevel>().unwrap(), LogLevel::Trace);
259        assert_eq!("DEBUG".parse::<LogLevel>().unwrap(), LogLevel::Debug);
260        assert_eq!("Info".parse::<LogLevel>().unwrap(), LogLevel::Info);
261        assert_eq!("warning".parse::<LogLevel>().unwrap(), LogLevel::Warn);
262        assert_eq!("error".parse::<LogLevel>().unwrap(), LogLevel::Error);
263    }
264
265    #[test]
266    fn test_log_level_from_str_invalid() {
267        let err = "nope".parse::<LogLevel>().unwrap_err();
268        assert_eq!(err, "Invalid log level: nope");
269    }
270
271    #[test]
272    fn test_log_config_invalid_level_falls_back_to_default() {
273        let config = LogConfig::from_uri("log:test?level=invalid").unwrap();
274        assert_eq!(config.level, LogLevel::Info);
275    }
276
277    #[test]
278    fn test_log_endpoint_uri() {
279        let component = LogComponent::new();
280        let endpoint = component
281            .create_endpoint("log:uri-check", &NoOpComponentContext)
282            .unwrap();
283        assert_eq!(endpoint.uri(), "log:uri-check");
284    }
285
286    #[test]
287    fn test_log_endpoint_no_consumer() {
288        let component = LogComponent::new();
289        let endpoint = component
290            .create_endpoint("log:info", &NoOpComponentContext)
291            .unwrap();
292        assert!(endpoint.create_consumer().is_err());
293    }
294
295    #[test]
296    fn test_log_endpoint_creates_producer() {
297        let ctx = test_producer_ctx();
298        let component = LogComponent::new();
299        let endpoint = component
300            .create_endpoint("log:info", &NoOpComponentContext)
301            .unwrap();
302        assert!(endpoint.create_producer(&ctx).is_ok());
303    }
304
305    #[tokio::test]
306    async fn test_log_producer_processes_exchange() {
307        let ctx = test_producer_ctx();
308        let component = LogComponent::new();
309        let endpoint = component
310            .create_endpoint("log:test?showHeaders=true", &NoOpComponentContext)
311            .unwrap();
312        let producer = endpoint.create_producer(&ctx).unwrap();
313
314        let mut exchange = Exchange::new(Message::new("hello world"));
315        exchange
316            .input
317            .set_header("source", serde_json::Value::String("test".into()));
318
319        let result = producer.oneshot(exchange).await.unwrap();
320        // Log producer passes exchange through unchanged
321        assert_eq!(result.input.body.as_text(), Some("hello world"));
322    }
323
324    #[test]
325    fn test_format_exchange_without_body_or_headers() {
326        let producer = LogProducer {
327            config: LogConfig {
328                category: "cat".to_string(),
329                level: LogLevel::Info,
330                show_headers: false,
331                show_body: false,
332            },
333        };
334        let exchange = Exchange::new(Message::new("ignored"));
335        let formatted = producer.format_exchange(&exchange);
336        assert_eq!(formatted, "[cat] Exchange received");
337    }
338
339    #[test]
340    fn test_format_exchange_body_variants() {
341        let base = LogProducer {
342            config: LogConfig {
343                category: "cat".to_string(),
344                level: LogLevel::Info,
345                show_headers: false,
346                show_body: true,
347            },
348        };
349
350        let empty = Exchange::new(Message::default());
351        assert!(base.format_exchange(&empty).contains("Body: [empty]"));
352
353        let mut json_msg = Message::new("");
354        json_msg.body = Body::Json(serde_json::json!({"k":"v"}));
355        let json_ex = Exchange::new(json_msg);
356        assert!(
357            base.format_exchange(&json_ex)
358                .contains("Body: {\"k\":\"v\"}")
359        );
360
361        let mut xml_msg = Message::new("");
362        xml_msg.body = Body::Xml("<a/>".to_string());
363        let xml_ex = Exchange::new(xml_msg);
364        assert!(base.format_exchange(&xml_ex).contains("Body: <a/>"));
365
366        let mut bytes_msg = Message::new("");
367        bytes_msg.body = Body::Bytes(b"abc".to_vec().into());
368        let bytes_ex = Exchange::new(bytes_msg);
369        assert!(base.format_exchange(&bytes_ex).contains("Body: [3 bytes]"));
370    }
371}