Skip to main content

camel_component_log/
lib.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::task::{Context, Poll};
5
6use tower::Service;
7use tracing::{debug, error, info, trace, warn};
8
9use camel_api::{BoxProcessor, CamelError, Exchange};
10use camel_component::{Component, Consumer, Endpoint, ProducerContext};
11use camel_endpoint::UriConfig;
12
13// ---------------------------------------------------------------------------
14// LogLevel
15// ---------------------------------------------------------------------------
16
17/// Log level for the log component.
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
19pub enum LogLevel {
20    Trace,
21    Debug,
22    #[default]
23    Info,
24    Warn,
25    Error,
26}
27
28impl FromStr for LogLevel {
29    type Err = String;
30
31    fn from_str(s: &str) -> Result<Self, Self::Err> {
32        match s.to_lowercase().as_str() {
33            "trace" => Ok(LogLevel::Trace),
34            "debug" => Ok(LogLevel::Debug),
35            "info" => Ok(LogLevel::Info),
36            "warn" | "warning" => Ok(LogLevel::Warn),
37            "error" => Ok(LogLevel::Error),
38            _ => Err(format!("Invalid log level: {}", s)),
39        }
40    }
41}
42
43// ---------------------------------------------------------------------------
44// LogConfig
45// ---------------------------------------------------------------------------
46
47/// Configuration parsed from a log URI.
48///
49/// Format: `log:category?level=info&showHeaders=true&showBody=true`
50#[derive(Debug, Clone, UriConfig)]
51#[uri_scheme = "log"]
52pub struct LogConfig {
53    /// Log category (the path portion of the URI).
54    pub category: String,
55    /// Log level. Default: Info.
56    #[uri_param(default = "Info")]
57    pub level: LogLevel,
58    /// Whether to include headers in the log output.
59    #[uri_param(name = "showHeaders", default = "false")]
60    pub show_headers: bool,
61    /// Whether to include the body in the log output.
62    #[uri_param(name = "showBody", default = "true")]
63    pub show_body: bool,
64}
65
66// ---------------------------------------------------------------------------
67// LogComponent
68// ---------------------------------------------------------------------------
69
70/// The Log component logs exchange information using `tracing`.
71pub struct LogComponent;
72
73impl LogComponent {
74    pub fn new() -> Self {
75        Self
76    }
77}
78
79impl Default for LogComponent {
80    fn default() -> Self {
81        Self::new()
82    }
83}
84
85impl Component for LogComponent {
86    fn scheme(&self) -> &str {
87        "log"
88    }
89
90    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
91        let config = LogConfig::from_uri(uri)?;
92        Ok(Box::new(LogEndpoint {
93            uri: uri.to_string(),
94            config,
95        }))
96    }
97}
98
99// ---------------------------------------------------------------------------
100// LogEndpoint
101// ---------------------------------------------------------------------------
102
103struct LogEndpoint {
104    uri: String,
105    config: LogConfig,
106}
107
108impl Endpoint for LogEndpoint {
109    fn uri(&self) -> &str {
110        &self.uri
111    }
112
113    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
114        Err(CamelError::EndpointCreationFailed(
115            "log endpoint does not support consumers".to_string(),
116        ))
117    }
118
119    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
120        Ok(BoxProcessor::new(LogProducer {
121            config: self.config.clone(),
122        }))
123    }
124}
125
126// ---------------------------------------------------------------------------
127// LogProducer
128// ---------------------------------------------------------------------------
129
130#[derive(Clone)]
131struct LogProducer {
132    config: LogConfig,
133}
134
135impl LogProducer {
136    fn format_exchange(&self, exchange: &Exchange) -> String {
137        let mut parts = Vec::new();
138
139        if self.config.show_body {
140            let body_str = match &exchange.input.body {
141                camel_api::Body::Empty => "[empty]".to_string(),
142                camel_api::Body::Text(s) => s.clone(),
143                camel_api::Body::Json(v) => v.to_string(),
144                camel_api::Body::Xml(s) => s.clone(),
145                camel_api::Body::Bytes(b) => format!("[{} bytes]", b.len()),
146                camel_api::Body::Stream(s) => format!("[Stream: origin={:?}]", s.metadata.origin),
147            };
148            parts.push(format!("Body: {body_str}"));
149        }
150
151        if self.config.show_headers && !exchange.input.headers.is_empty() {
152            let headers: Vec<String> = exchange
153                .input
154                .headers
155                .iter()
156                .map(|(k, v)| format!("{k}={v}"))
157                .collect();
158            parts.push(format!("Headers: {{{}}}", headers.join(", ")));
159        }
160
161        if parts.is_empty() {
162            format!("[{}] Exchange received", self.config.category)
163        } else {
164            format!("[{}] {}", self.config.category, parts.join(" | "))
165        }
166    }
167}
168
169impl Service<Exchange> for LogProducer {
170    type Response = Exchange;
171    type Error = CamelError;
172    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
173
174    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
175        Poll::Ready(Ok(()))
176    }
177
178    fn call(&mut self, exchange: Exchange) -> Self::Future {
179        let msg = self.format_exchange(&exchange);
180        let level = self.config.level;
181
182        Box::pin(async move {
183            match level {
184                LogLevel::Trace => trace!("{msg}"),
185                LogLevel::Debug => debug!("{msg}"),
186                LogLevel::Info => info!("{msg}"),
187                LogLevel::Warn => warn!("{msg}"),
188                LogLevel::Error => error!("{msg}"),
189            }
190
191            Ok(exchange)
192        })
193    }
194}
195
196// ---------------------------------------------------------------------------
197// Tests
198// ---------------------------------------------------------------------------
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203    use camel_api::Message;
204    use tower::ServiceExt;
205
206    fn test_producer_ctx() -> ProducerContext {
207        ProducerContext::new()
208    }
209
210    #[test]
211    fn test_log_config_defaults() {
212        let config = LogConfig::from_uri("log:myCategory").unwrap();
213        assert_eq!(config.category, "myCategory");
214        assert_eq!(config.level, LogLevel::Info);
215        assert!(!config.show_headers);
216        assert!(config.show_body);
217    }
218
219    #[test]
220    fn test_log_config_with_params() {
221        let config =
222            LogConfig::from_uri("log:app?level=debug&showHeaders=true&showBody=false").unwrap();
223        assert_eq!(config.category, "app");
224        assert_eq!(config.level, LogLevel::Debug);
225        assert!(config.show_headers);
226        assert!(!config.show_body);
227    }
228
229    #[test]
230    fn test_log_config_wrong_scheme() {
231        let result = LogConfig::from_uri("timer:tick");
232        assert!(result.is_err());
233    }
234
235    #[test]
236    fn test_log_component_scheme() {
237        let component = LogComponent::new();
238        assert_eq!(component.scheme(), "log");
239    }
240
241    #[test]
242    fn test_log_endpoint_no_consumer() {
243        let component = LogComponent::new();
244        let endpoint = component.create_endpoint("log:info").unwrap();
245        assert!(endpoint.create_consumer().is_err());
246    }
247
248    #[test]
249    fn test_log_endpoint_creates_producer() {
250        let ctx = test_producer_ctx();
251        let component = LogComponent::new();
252        let endpoint = component.create_endpoint("log:info").unwrap();
253        assert!(endpoint.create_producer(&ctx).is_ok());
254    }
255
256    #[tokio::test]
257    async fn test_log_producer_processes_exchange() {
258        let ctx = test_producer_ctx();
259        let component = LogComponent::new();
260        let endpoint = component
261            .create_endpoint("log:test?showHeaders=true")
262            .unwrap();
263        let producer = endpoint.create_producer(&ctx).unwrap();
264
265        let mut exchange = Exchange::new(Message::new("hello world"));
266        exchange
267            .input
268            .set_header("source", serde_json::Value::String("test".into()));
269
270        let result = producer.oneshot(exchange).await.unwrap();
271        // Log producer passes exchange through unchanged
272        assert_eq!(result.input.body.as_text(), Some("hello world"));
273    }
274}