Skip to main content

camel_component_log/
lib.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6use tracing::{debug, error, info, trace, warn};
7
8use camel_api::{BoxProcessor, CamelError, Exchange};
9use camel_component::{Component, Consumer, Endpoint, ProducerContext};
10use camel_endpoint::parse_uri;
11
12// ---------------------------------------------------------------------------
13// LogLevel
14// ---------------------------------------------------------------------------
15
16/// Log level for the log component.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum LogLevel {
19    Trace,
20    Debug,
21    Info,
22    Warn,
23    Error,
24}
25
26impl LogLevel {
27    fn from_str(s: &str) -> Self {
28        match s.to_lowercase().as_str() {
29            "trace" => LogLevel::Trace,
30            "debug" => LogLevel::Debug,
31            "info" => LogLevel::Info,
32            "warn" | "warning" => LogLevel::Warn,
33            "error" => LogLevel::Error,
34            _ => LogLevel::Info,
35        }
36    }
37}
38
39// ---------------------------------------------------------------------------
40// LogConfig
41// ---------------------------------------------------------------------------
42
43/// Configuration parsed from a log URI.
44///
45/// Format: `log:category?level=info&showHeaders=true&showBody=true`
46#[derive(Debug, Clone)]
47pub struct LogConfig {
48    /// Log category (the path portion of the URI).
49    pub category: String,
50    /// Log level. Default: Info.
51    pub level: LogLevel,
52    /// Whether to include headers in the log output.
53    pub show_headers: bool,
54    /// Whether to include the body in the log output.
55    pub show_body: bool,
56}
57
58impl LogConfig {
59    /// Parse a log URI into a config.
60    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
61        let parts = parse_uri(uri)?;
62        if parts.scheme != "log" {
63            return Err(CamelError::InvalidUri(format!(
64                "expected scheme 'log', got '{}'",
65                parts.scheme
66            )));
67        }
68
69        let level = parts
70            .params
71            .get("level")
72            .map(|v| LogLevel::from_str(v))
73            .unwrap_or(LogLevel::Info);
74
75        let show_headers = parts
76            .params
77            .get("showHeaders")
78            .map(|v| v == "true")
79            .unwrap_or(false);
80
81        let show_body = parts
82            .params
83            .get("showBody")
84            .map(|v| v == "true")
85            .unwrap_or(true);
86
87        Ok(Self {
88            category: parts.path,
89            level,
90            show_headers,
91            show_body,
92        })
93    }
94}
95
96// ---------------------------------------------------------------------------
97// LogComponent
98// ---------------------------------------------------------------------------
99
100/// The Log component logs exchange information using `tracing`.
101pub struct LogComponent;
102
103impl LogComponent {
104    pub fn new() -> Self {
105        Self
106    }
107}
108
109impl Default for LogComponent {
110    fn default() -> Self {
111        Self::new()
112    }
113}
114
115impl Component for LogComponent {
116    fn scheme(&self) -> &str {
117        "log"
118    }
119
120    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
121        let config = LogConfig::from_uri(uri)?;
122        Ok(Box::new(LogEndpoint {
123            uri: uri.to_string(),
124            config,
125        }))
126    }
127}
128
129// ---------------------------------------------------------------------------
130// LogEndpoint
131// ---------------------------------------------------------------------------
132
133struct LogEndpoint {
134    uri: String,
135    config: LogConfig,
136}
137
138impl Endpoint for LogEndpoint {
139    fn uri(&self) -> &str {
140        &self.uri
141    }
142
143    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
144        Err(CamelError::EndpointCreationFailed(
145            "log endpoint does not support consumers".to_string(),
146        ))
147    }
148
149    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
150        Ok(BoxProcessor::new(LogProducer {
151            config: self.config.clone(),
152        }))
153    }
154}
155
156// ---------------------------------------------------------------------------
157// LogProducer
158// ---------------------------------------------------------------------------
159
160#[derive(Clone)]
161struct LogProducer {
162    config: LogConfig,
163}
164
165impl LogProducer {
166    fn format_exchange(&self, exchange: &Exchange) -> String {
167        let mut parts = Vec::new();
168
169        if self.config.show_body {
170            let body_str = match &exchange.input.body {
171                camel_api::Body::Empty => "[empty]".to_string(),
172                camel_api::Body::Text(s) => s.clone(),
173                camel_api::Body::Json(v) => v.to_string(),
174                camel_api::Body::Bytes(b) => format!("[{} bytes]", b.len()),
175                camel_api::Body::Stream(s) => format!("[Stream: origin={:?}]", s.metadata.origin),
176            };
177            parts.push(format!("Body: {body_str}"));
178        }
179
180        if self.config.show_headers && !exchange.input.headers.is_empty() {
181            let headers: Vec<String> = exchange
182                .input
183                .headers
184                .iter()
185                .map(|(k, v)| format!("{k}={v}"))
186                .collect();
187            parts.push(format!("Headers: {{{}}}", headers.join(", ")));
188        }
189
190        if parts.is_empty() {
191            format!("[{}] Exchange received", self.config.category)
192        } else {
193            format!("[{}] {}", self.config.category, parts.join(" | "))
194        }
195    }
196}
197
198impl Service<Exchange> for LogProducer {
199    type Response = Exchange;
200    type Error = CamelError;
201    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
202
203    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
204        Poll::Ready(Ok(()))
205    }
206
207    fn call(&mut self, exchange: Exchange) -> Self::Future {
208        let msg = self.format_exchange(&exchange);
209        let level = self.config.level;
210
211        Box::pin(async move {
212            match level {
213                LogLevel::Trace => trace!("{msg}"),
214                LogLevel::Debug => debug!("{msg}"),
215                LogLevel::Info => info!("{msg}"),
216                LogLevel::Warn => warn!("{msg}"),
217                LogLevel::Error => error!("{msg}"),
218            }
219
220            Ok(exchange)
221        })
222    }
223}
224
225// ---------------------------------------------------------------------------
226// Tests
227// ---------------------------------------------------------------------------
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use camel_api::Message;
233    use std::sync::Arc;
234    use tokio::sync::Mutex;
235    use tower::ServiceExt;
236
237    // NullRouteController for testing
238    struct NullRouteController;
239    #[async_trait::async_trait]
240    impl camel_api::RouteController for NullRouteController {
241        async fn start_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
242            Ok(())
243        }
244        async fn stop_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
245            Ok(())
246        }
247        async fn restart_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
248            Ok(())
249        }
250        async fn suspend_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
251            Ok(())
252        }
253        async fn resume_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
254            Ok(())
255        }
256        fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
257            None
258        }
259        async fn start_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
260            Ok(())
261        }
262        async fn stop_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
263            Ok(())
264        }
265    }
266
267    fn test_producer_ctx() -> ProducerContext {
268        ProducerContext::new(Arc::new(Mutex::new(NullRouteController)))
269    }
270
271    #[test]
272    fn test_log_config_defaults() {
273        let config = LogConfig::from_uri("log:myCategory").unwrap();
274        assert_eq!(config.category, "myCategory");
275        assert_eq!(config.level, LogLevel::Info);
276        assert!(!config.show_headers);
277        assert!(config.show_body);
278    }
279
280    #[test]
281    fn test_log_config_with_params() {
282        let config =
283            LogConfig::from_uri("log:app?level=debug&showHeaders=true&showBody=false").unwrap();
284        assert_eq!(config.category, "app");
285        assert_eq!(config.level, LogLevel::Debug);
286        assert!(config.show_headers);
287        assert!(!config.show_body);
288    }
289
290    #[test]
291    fn test_log_config_wrong_scheme() {
292        let result = LogConfig::from_uri("timer:tick");
293        assert!(result.is_err());
294    }
295
296    #[test]
297    fn test_log_component_scheme() {
298        let component = LogComponent::new();
299        assert_eq!(component.scheme(), "log");
300    }
301
302    #[test]
303    fn test_log_endpoint_no_consumer() {
304        let component = LogComponent::new();
305        let endpoint = component.create_endpoint("log:info").unwrap();
306        assert!(endpoint.create_consumer().is_err());
307    }
308
309    #[test]
310    fn test_log_endpoint_creates_producer() {
311        let ctx = test_producer_ctx();
312        let component = LogComponent::new();
313        let endpoint = component.create_endpoint("log:info").unwrap();
314        assert!(endpoint.create_producer(&ctx).is_ok());
315    }
316
317    #[tokio::test]
318    async fn test_log_producer_processes_exchange() {
319        let ctx = test_producer_ctx();
320        let component = LogComponent::new();
321        let endpoint = component
322            .create_endpoint("log:test?showHeaders=true")
323            .unwrap();
324        let producer = endpoint.create_producer(&ctx).unwrap();
325
326        let mut exchange = Exchange::new(Message::new("hello world"));
327        exchange
328            .input
329            .set_header("source", serde_json::Value::String("test".into()));
330
331        let result = producer.oneshot(exchange).await.unwrap();
332        // Log producer passes exchange through unchanged
333        assert_eq!(result.input.body.as_text(), Some("hello world"));
334    }
335}