Skip to main content

camel_component_log/
lib.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6use tracing::{debug, error, info, trace, warn};
7
8use camel_api::{BoxProcessor, CamelError, Exchange};
9use camel_component::{Component, Consumer, Endpoint, ProducerContext};
10use camel_endpoint::parse_uri;
11
12// ---------------------------------------------------------------------------
13// LogLevel
14// ---------------------------------------------------------------------------
15
16/// Log level for the log component.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum LogLevel {
19    Trace,
20    Debug,
21    Info,
22    Warn,
23    Error,
24}
25
26impl LogLevel {
27    fn from_str(s: &str) -> Self {
28        match s.to_lowercase().as_str() {
29            "trace" => LogLevel::Trace,
30            "debug" => LogLevel::Debug,
31            "info" => LogLevel::Info,
32            "warn" | "warning" => LogLevel::Warn,
33            "error" => LogLevel::Error,
34            _ => LogLevel::Info,
35        }
36    }
37}
38
39// ---------------------------------------------------------------------------
40// LogConfig
41// ---------------------------------------------------------------------------
42
43/// Configuration parsed from a log URI.
44///
45/// Format: `log:category?level=info&showHeaders=true&showBody=true`
46#[derive(Debug, Clone)]
47pub struct LogConfig {
48    /// Log category (the path portion of the URI).
49    pub category: String,
50    /// Log level. Default: Info.
51    pub level: LogLevel,
52    /// Whether to include headers in the log output.
53    pub show_headers: bool,
54    /// Whether to include the body in the log output.
55    pub show_body: bool,
56}
57
58impl LogConfig {
59    /// Parse a log URI into a config.
60    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
61        let parts = parse_uri(uri)?;
62        if parts.scheme != "log" {
63            return Err(CamelError::InvalidUri(format!(
64                "expected scheme 'log', got '{}'",
65                parts.scheme
66            )));
67        }
68
69        let level = parts
70            .params
71            .get("level")
72            .map(|v| LogLevel::from_str(v))
73            .unwrap_or(LogLevel::Info);
74
75        let show_headers = parts
76            .params
77            .get("showHeaders")
78            .map(|v| v == "true")
79            .unwrap_or(false);
80
81        let show_body = parts
82            .params
83            .get("showBody")
84            .map(|v| v == "true")
85            .unwrap_or(true);
86
87        Ok(Self {
88            category: parts.path,
89            level,
90            show_headers,
91            show_body,
92        })
93    }
94}
95
96// ---------------------------------------------------------------------------
97// LogComponent
98// ---------------------------------------------------------------------------
99
100/// The Log component logs exchange information using `tracing`.
101pub struct LogComponent;
102
103impl LogComponent {
104    pub fn new() -> Self {
105        Self
106    }
107}
108
109impl Default for LogComponent {
110    fn default() -> Self {
111        Self::new()
112    }
113}
114
115impl Component for LogComponent {
116    fn scheme(&self) -> &str {
117        "log"
118    }
119
120    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
121        let config = LogConfig::from_uri(uri)?;
122        Ok(Box::new(LogEndpoint {
123            uri: uri.to_string(),
124            config,
125        }))
126    }
127}
128
129// ---------------------------------------------------------------------------
130// LogEndpoint
131// ---------------------------------------------------------------------------
132
133struct LogEndpoint {
134    uri: String,
135    config: LogConfig,
136}
137
138impl Endpoint for LogEndpoint {
139    fn uri(&self) -> &str {
140        &self.uri
141    }
142
143    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
144        Err(CamelError::EndpointCreationFailed(
145            "log endpoint does not support consumers".to_string(),
146        ))
147    }
148
149    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
150        Ok(BoxProcessor::new(LogProducer {
151            config: self.config.clone(),
152        }))
153    }
154}
155
156// ---------------------------------------------------------------------------
157// LogProducer
158// ---------------------------------------------------------------------------
159
160#[derive(Clone)]
161struct LogProducer {
162    config: LogConfig,
163}
164
165impl LogProducer {
166    fn format_exchange(&self, exchange: &Exchange) -> String {
167        let mut parts = Vec::new();
168
169        if self.config.show_body {
170            let body_str = match &exchange.input.body {
171                camel_api::Body::Empty => "[empty]".to_string(),
172                camel_api::Body::Text(s) => s.clone(),
173                camel_api::Body::Json(v) => v.to_string(),
174                camel_api::Body::Bytes(b) => format!("[{} bytes]", b.len()),
175            };
176            parts.push(format!("Body: {body_str}"));
177        }
178
179        if self.config.show_headers && !exchange.input.headers.is_empty() {
180            let headers: Vec<String> = exchange
181                .input
182                .headers
183                .iter()
184                .map(|(k, v)| format!("{k}={v}"))
185                .collect();
186            parts.push(format!("Headers: {{{}}}", headers.join(", ")));
187        }
188
189        if parts.is_empty() {
190            format!("[{}] Exchange received", self.config.category)
191        } else {
192            format!("[{}] {}", self.config.category, parts.join(" | "))
193        }
194    }
195}
196
197impl Service<Exchange> for LogProducer {
198    type Response = Exchange;
199    type Error = CamelError;
200    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
201
202    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
203        Poll::Ready(Ok(()))
204    }
205
206    fn call(&mut self, exchange: Exchange) -> Self::Future {
207        let msg = self.format_exchange(&exchange);
208        let level = self.config.level;
209
210        Box::pin(async move {
211            match level {
212                LogLevel::Trace => trace!("{msg}"),
213                LogLevel::Debug => debug!("{msg}"),
214                LogLevel::Info => info!("{msg}"),
215                LogLevel::Warn => warn!("{msg}"),
216                LogLevel::Error => error!("{msg}"),
217            }
218
219            Ok(exchange)
220        })
221    }
222}
223
224// ---------------------------------------------------------------------------
225// Tests
226// ---------------------------------------------------------------------------
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231    use camel_api::Message;
232    use std::sync::Arc;
233    use tokio::sync::Mutex;
234    use tower::ServiceExt;
235
236    // NullRouteController for testing
237    struct NullRouteController;
238    #[async_trait::async_trait]
239    impl camel_api::RouteController for NullRouteController {
240        async fn start_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
241            Ok(())
242        }
243        async fn stop_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
244            Ok(())
245        }
246        async fn restart_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
247            Ok(())
248        }
249        async fn suspend_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
250            Ok(())
251        }
252        async fn resume_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
253            Ok(())
254        }
255        fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
256            None
257        }
258        async fn start_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
259            Ok(())
260        }
261        async fn stop_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
262            Ok(())
263        }
264    }
265
266    fn test_producer_ctx() -> ProducerContext {
267        ProducerContext::new(Arc::new(Mutex::new(NullRouteController)))
268    }
269
270    #[test]
271    fn test_log_config_defaults() {
272        let config = LogConfig::from_uri("log:myCategory").unwrap();
273        assert_eq!(config.category, "myCategory");
274        assert_eq!(config.level, LogLevel::Info);
275        assert!(!config.show_headers);
276        assert!(config.show_body);
277    }
278
279    #[test]
280    fn test_log_config_with_params() {
281        let config =
282            LogConfig::from_uri("log:app?level=debug&showHeaders=true&showBody=false").unwrap();
283        assert_eq!(config.category, "app");
284        assert_eq!(config.level, LogLevel::Debug);
285        assert!(config.show_headers);
286        assert!(!config.show_body);
287    }
288
289    #[test]
290    fn test_log_config_wrong_scheme() {
291        let result = LogConfig::from_uri("timer:tick");
292        assert!(result.is_err());
293    }
294
295    #[test]
296    fn test_log_component_scheme() {
297        let component = LogComponent::new();
298        assert_eq!(component.scheme(), "log");
299    }
300
301    #[test]
302    fn test_log_endpoint_no_consumer() {
303        let component = LogComponent::new();
304        let endpoint = component.create_endpoint("log:info").unwrap();
305        assert!(endpoint.create_consumer().is_err());
306    }
307
308    #[test]
309    fn test_log_endpoint_creates_producer() {
310        let ctx = test_producer_ctx();
311        let component = LogComponent::new();
312        let endpoint = component.create_endpoint("log:info").unwrap();
313        assert!(endpoint.create_producer(&ctx).is_ok());
314    }
315
316    #[tokio::test]
317    async fn test_log_producer_processes_exchange() {
318        let ctx = test_producer_ctx();
319        let component = LogComponent::new();
320        let endpoint = component
321            .create_endpoint("log:test?showHeaders=true")
322            .unwrap();
323        let producer = endpoint.create_producer(&ctx).unwrap();
324
325        let mut exchange = Exchange::new(Message::new("hello world"));
326        exchange
327            .input
328            .set_header("source", serde_json::Value::String("test".into()));
329
330        let result = producer.oneshot(exchange).await.unwrap();
331        // Log producer passes exchange through unchanged
332        assert_eq!(result.input.body.as_text(), Some("hello world"));
333    }
334}