Skip to main content

camel_component_log/
lib.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::task::{Context, Poll};
5
6use tower::Service;
7use tracing::{debug, error, info, trace, warn};
8
9use camel_component_api::UriConfig;
10use camel_component_api::{BoxProcessor, CamelError, Exchange};
11use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
12
13// ---------------------------------------------------------------------------
14// LogLevel
15// ---------------------------------------------------------------------------
16
17/// Log level for the log component.
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
19pub enum LogLevel {
20    Trace,
21    Debug,
22    #[default]
23    Info,
24    Warn,
25    Error,
26}
27
28impl FromStr for LogLevel {
29    type Err = String;
30
31    fn from_str(s: &str) -> Result<Self, Self::Err> {
32        match s.to_lowercase().as_str() {
33            "trace" => Ok(LogLevel::Trace),
34            "debug" => Ok(LogLevel::Debug),
35            "info" => Ok(LogLevel::Info),
36            "warn" | "warning" => Ok(LogLevel::Warn),
37            "error" => Ok(LogLevel::Error),
38            _ => Err(format!("Invalid log level: {}", s)),
39        }
40    }
41}
42
43// ---------------------------------------------------------------------------
44// LogConfig
45// ---------------------------------------------------------------------------
46
47/// Configuration parsed from a log URI.
48///
49/// Format: `log:category?level=info&showHeaders=true&showBody=true`
50#[derive(Debug, Clone, UriConfig)]
51#[uri_scheme = "log"]
52#[uri_config(crate = "camel_component_api")]
53pub struct LogConfig {
54    /// Log category (the path portion of the URI).
55    pub category: String,
56    /// Log level. Default: Info.
57    #[uri_param(default = "Info")]
58    pub level: LogLevel,
59    /// Whether to include headers in the log output.
60    #[uri_param(name = "showHeaders", default = "false")]
61    pub show_headers: bool,
62    /// Whether to include the body in the log output.
63    #[uri_param(name = "showBody", default = "true")]
64    pub show_body: bool,
65}
66
67// ---------------------------------------------------------------------------
68// LogComponent
69// ---------------------------------------------------------------------------
70
71/// The Log component logs exchange information using `tracing`.
72pub struct LogComponent;
73
74impl LogComponent {
75    pub fn new() -> Self {
76        Self
77    }
78}
79
80impl Default for LogComponent {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86impl Component for LogComponent {
87    fn scheme(&self) -> &str {
88        "log"
89    }
90
91    fn create_endpoint(
92        &self,
93        uri: &str,
94        _ctx: &dyn camel_component_api::ComponentContext,
95    ) -> Result<Box<dyn Endpoint>, CamelError> {
96        let config = LogConfig::from_uri(uri)?;
97        Ok(Box::new(LogEndpoint {
98            uri: uri.to_string(),
99            config,
100        }))
101    }
102}
103
104// ---------------------------------------------------------------------------
105// LogEndpoint
106// ---------------------------------------------------------------------------
107
108struct LogEndpoint {
109    uri: String,
110    config: LogConfig,
111}
112
113impl Endpoint for LogEndpoint {
114    fn uri(&self) -> &str {
115        &self.uri
116    }
117
118    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
119        Err(CamelError::EndpointCreationFailed(
120            "log endpoint does not support consumers".to_string(),
121        ))
122    }
123
124    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
125        Ok(BoxProcessor::new(LogProducer {
126            config: self.config.clone(),
127        }))
128    }
129}
130
131// ---------------------------------------------------------------------------
132// LogProducer
133// ---------------------------------------------------------------------------
134
135#[derive(Clone)]
136struct LogProducer {
137    config: LogConfig,
138}
139
140impl LogProducer {
141    fn format_exchange(&self, exchange: &Exchange) -> String {
142        let mut parts = Vec::new();
143
144        if self.config.show_body {
145            let body_str = match &exchange.input.body {
146                camel_component_api::Body::Empty => "[empty]".to_string(),
147                camel_component_api::Body::Text(s) => s.clone(),
148                camel_component_api::Body::Json(v) => v.to_string(),
149                camel_component_api::Body::Xml(s) => s.clone(),
150                camel_component_api::Body::Bytes(b) => format!("[{} bytes]", b.len()),
151                camel_component_api::Body::Stream(s) => {
152                    format!("[Stream: origin={:?}]", s.metadata.origin)
153                }
154            };
155            parts.push(format!("Body: {body_str}"));
156        }
157
158        if self.config.show_headers && !exchange.input.headers.is_empty() {
159            let headers: Vec<String> = exchange
160                .input
161                .headers
162                .iter()
163                .map(|(k, v)| format!("{k}={v}"))
164                .collect();
165            parts.push(format!("Headers: {{{}}}", headers.join(", ")));
166        }
167
168        if parts.is_empty() {
169            format!("[{}] Exchange received", self.config.category)
170        } else {
171            format!("[{}] {}", self.config.category, parts.join(" | "))
172        }
173    }
174}
175
176impl Service<Exchange> for LogProducer {
177    type Response = Exchange;
178    type Error = CamelError;
179    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
180
181    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
182        Poll::Ready(Ok(()))
183    }
184
185    fn call(&mut self, exchange: Exchange) -> Self::Future {
186        let msg = self.format_exchange(&exchange);
187        let level = self.config.level;
188
189        Box::pin(async move {
190            match level {
191                LogLevel::Trace => trace!("{msg}"),
192                LogLevel::Debug => debug!("{msg}"),
193                LogLevel::Info => info!("{msg}"),
194                LogLevel::Warn => warn!("{msg}"),
195                LogLevel::Error => error!("{msg}"),
196            }
197
198            Ok(exchange)
199        })
200    }
201}
202
203// ---------------------------------------------------------------------------
204// Tests
205// ---------------------------------------------------------------------------
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use camel_component_api::Message;
211    use camel_component_api::NoOpComponentContext;
212    use tower::ServiceExt;
213
214    fn test_producer_ctx() -> ProducerContext {
215        ProducerContext::new()
216    }
217
218    #[test]
219    fn test_log_config_defaults() {
220        let config = LogConfig::from_uri("log:myCategory").unwrap();
221        assert_eq!(config.category, "myCategory");
222        assert_eq!(config.level, LogLevel::Info);
223        assert!(!config.show_headers);
224        assert!(config.show_body);
225    }
226
227    #[test]
228    fn test_log_config_with_params() {
229        let config =
230            LogConfig::from_uri("log:app?level=debug&showHeaders=true&showBody=false").unwrap();
231        assert_eq!(config.category, "app");
232        assert_eq!(config.level, LogLevel::Debug);
233        assert!(config.show_headers);
234        assert!(!config.show_body);
235    }
236
237    #[test]
238    fn test_log_config_wrong_scheme() {
239        let result = LogConfig::from_uri("timer:tick");
240        assert!(result.is_err());
241    }
242
243    #[test]
244    fn test_log_component_scheme() {
245        let component = LogComponent::new();
246        assert_eq!(component.scheme(), "log");
247    }
248
249    #[test]
250    fn test_log_endpoint_no_consumer() {
251        let component = LogComponent::new();
252        let endpoint = component
253            .create_endpoint("log:info", &NoOpComponentContext)
254            .unwrap();
255        assert!(endpoint.create_consumer().is_err());
256    }
257
258    #[test]
259    fn test_log_endpoint_creates_producer() {
260        let ctx = test_producer_ctx();
261        let component = LogComponent::new();
262        let endpoint = component
263            .create_endpoint("log:info", &NoOpComponentContext)
264            .unwrap();
265        assert!(endpoint.create_producer(&ctx).is_ok());
266    }
267
268    #[tokio::test]
269    async fn test_log_producer_processes_exchange() {
270        let ctx = test_producer_ctx();
271        let component = LogComponent::new();
272        let endpoint = component
273            .create_endpoint("log:test?showHeaders=true", &NoOpComponentContext)
274            .unwrap();
275        let producer = endpoint.create_producer(&ctx).unwrap();
276
277        let mut exchange = Exchange::new(Message::new("hello world"));
278        exchange
279            .input
280            .set_header("source", serde_json::Value::String("test".into()));
281
282        let result = producer.oneshot(exchange).await.unwrap();
283        // Log producer passes exchange through unchanged
284        assert_eq!(result.input.body.as_text(), Some("hello world"));
285    }
286}