1use std::future::Future;
7use std::pin::Pin;
8use std::str::FromStr;
9use std::task::{Context, Poll};
10
11use tower::Service;
12use tracing::{debug, error, info, trace, warn};
13
14use camel_component_api::UriConfig;
15use camel_component_api::{BoxProcessor, CamelError, Exchange};
16use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
24pub enum LogLevel {
25 Trace,
26 Debug,
27 #[default]
28 Info,
29 Warn,
30 Error,
31}
32
33impl FromStr for LogLevel {
34 type Err = String;
35
36 fn from_str(s: &str) -> Result<Self, Self::Err> {
37 match s.to_lowercase().as_str() {
38 "trace" => Ok(LogLevel::Trace),
39 "debug" => Ok(LogLevel::Debug),
40 "info" => Ok(LogLevel::Info),
41 "warn" | "warning" => Ok(LogLevel::Warn),
42 "error" => Ok(LogLevel::Error),
43 _ => Err(format!("Invalid log level: {}", s)),
44 }
45 }
46}
47
48#[derive(Debug, Clone, UriConfig)]
56#[uri_scheme = "log"]
57#[uri_config(crate = "camel_component_api")]
58pub struct LogConfig {
59 pub category: String,
61 #[uri_param(default = "Info")]
63 pub level: LogLevel,
64 #[uri_param(name = "showHeaders", default = "false")]
66 pub show_headers: bool,
67 #[uri_param(name = "showBody", default = "true")]
69 pub show_body: bool,
70}
71
72pub struct LogComponent;
78
79impl LogComponent {
80 pub fn new() -> Self {
81 Self
82 }
83}
84
85impl Default for LogComponent {
86 fn default() -> Self {
87 Self::new()
88 }
89}
90
91impl Component for LogComponent {
92 fn scheme(&self) -> &str {
93 "log"
94 }
95
96 fn create_endpoint(
97 &self,
98 uri: &str,
99 _ctx: &dyn camel_component_api::ComponentContext,
100 ) -> Result<Box<dyn Endpoint>, CamelError> {
101 let config = LogConfig::from_uri(uri)?;
102 Ok(Box::new(LogEndpoint {
103 uri: uri.to_string(),
104 config,
105 }))
106 }
107}
108
109struct LogEndpoint {
114 uri: String,
115 config: LogConfig,
116}
117
118impl Endpoint for LogEndpoint {
119 fn uri(&self) -> &str {
120 &self.uri
121 }
122
123 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
124 Err(CamelError::EndpointCreationFailed(
125 "log endpoint does not support consumers".to_string(),
126 ))
127 }
128
129 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
130 Ok(BoxProcessor::new(LogProducer {
131 config: self.config.clone(),
132 }))
133 }
134}
135
136#[derive(Clone)]
141struct LogProducer {
142 config: LogConfig,
143}
144
145impl LogProducer {
146 fn format_exchange(&self, exchange: &Exchange) -> String {
147 let mut parts = Vec::new();
148
149 if self.config.show_body {
150 let body_str = match &exchange.input.body {
151 camel_component_api::Body::Empty => "[empty]".to_string(),
152 camel_component_api::Body::Text(s) => s.clone(),
153 camel_component_api::Body::Json(v) => v.to_string(),
154 camel_component_api::Body::Xml(s) => s.clone(),
155 camel_component_api::Body::Bytes(b) => format!("[{} bytes]", b.len()),
156 camel_component_api::Body::Stream(s) => {
157 format!("[Stream: origin={:?}]", s.metadata.origin)
158 }
159 };
160 parts.push(format!("Body: {body_str}"));
161 }
162
163 if self.config.show_headers && !exchange.input.headers.is_empty() {
164 let headers: Vec<String> = exchange
165 .input
166 .headers
167 .iter()
168 .map(|(k, v)| format!("{k}={v}"))
169 .collect();
170 parts.push(format!("Headers: {{{}}}", headers.join(", ")));
171 }
172
173 if parts.is_empty() {
174 format!("[{}] Exchange received", self.config.category)
175 } else {
176 format!("[{}] {}", self.config.category, parts.join(" | "))
177 }
178 }
179}
180
181impl Service<Exchange> for LogProducer {
182 type Response = Exchange;
183 type Error = CamelError;
184 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
185
186 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
187 Poll::Ready(Ok(()))
188 }
189
190 fn call(&mut self, exchange: Exchange) -> Self::Future {
191 let msg = self.format_exchange(&exchange);
192 let level = self.config.level;
193
194 Box::pin(async move {
195 match level {
196 LogLevel::Trace => trace!("{msg}"),
197 LogLevel::Debug => debug!("{msg}"),
198 LogLevel::Info => info!("{msg}"),
199 LogLevel::Warn => warn!("{msg}"),
200 LogLevel::Error => error!("{msg}"),
201 }
202
203 Ok(exchange)
204 })
205 }
206}
207
208#[cfg(test)]
213mod tests {
214 use super::*;
215 use camel_component_api::Body;
216 use camel_component_api::Message;
217 use camel_component_api::NoOpComponentContext;
218 use tower::ServiceExt;
219
220 fn test_producer_ctx() -> ProducerContext {
221 ProducerContext::new()
222 }
223
224 #[test]
225 fn test_log_config_defaults() {
226 let config = LogConfig::from_uri("log:myCategory").unwrap();
227 assert_eq!(config.category, "myCategory");
228 assert_eq!(config.level, LogLevel::Info);
229 assert!(!config.show_headers);
230 assert!(config.show_body);
231 }
232
233 #[test]
234 fn test_log_config_with_params() {
235 let config =
236 LogConfig::from_uri("log:app?level=debug&showHeaders=true&showBody=false").unwrap();
237 assert_eq!(config.category, "app");
238 assert_eq!(config.level, LogLevel::Debug);
239 assert!(config.show_headers);
240 assert!(!config.show_body);
241 }
242
243 #[test]
244 fn test_log_config_wrong_scheme() {
245 let result = LogConfig::from_uri("timer:tick");
246 assert!(result.is_err());
247 }
248
249 #[test]
250 fn test_log_component_scheme() {
251 let component = LogComponent::new();
252 assert_eq!(component.scheme(), "log");
253 }
254
255 #[test]
256 fn test_log_component_default() {
257 let component = LogComponent;
258 assert_eq!(component.scheme(), "log");
259 }
260
261 #[test]
262 fn test_log_level_from_str_variants() {
263 assert_eq!("trace".parse::<LogLevel>().unwrap(), LogLevel::Trace);
264 assert_eq!("DEBUG".parse::<LogLevel>().unwrap(), LogLevel::Debug);
265 assert_eq!("Info".parse::<LogLevel>().unwrap(), LogLevel::Info);
266 assert_eq!("warning".parse::<LogLevel>().unwrap(), LogLevel::Warn);
267 assert_eq!("error".parse::<LogLevel>().unwrap(), LogLevel::Error);
268 }
269
270 #[test]
271 fn test_log_level_from_str_invalid() {
272 let err = "nope".parse::<LogLevel>().unwrap_err();
273 assert_eq!(err, "Invalid log level: nope");
274 }
275
276 #[test]
277 fn test_log_config_invalid_level_falls_back_to_default() {
278 let config = LogConfig::from_uri("log:test?level=invalid").unwrap();
279 assert_eq!(config.level, LogLevel::Info);
280 }
281
282 #[test]
283 fn test_log_endpoint_uri() {
284 let component = LogComponent::new();
285 let endpoint = component
286 .create_endpoint("log:uri-check", &NoOpComponentContext)
287 .unwrap();
288 assert_eq!(endpoint.uri(), "log:uri-check");
289 }
290
291 #[test]
292 fn test_log_endpoint_no_consumer() {
293 let component = LogComponent::new();
294 let endpoint = component
295 .create_endpoint("log:info", &NoOpComponentContext)
296 .unwrap();
297 assert!(endpoint.create_consumer().is_err());
298 }
299
300 #[test]
301 fn test_log_endpoint_creates_producer() {
302 let ctx = test_producer_ctx();
303 let component = LogComponent::new();
304 let endpoint = component
305 .create_endpoint("log:info", &NoOpComponentContext)
306 .unwrap();
307 assert!(endpoint.create_producer(&ctx).is_ok());
308 }
309
310 #[tokio::test]
311 async fn test_log_producer_processes_exchange() {
312 let ctx = test_producer_ctx();
313 let component = LogComponent::new();
314 let endpoint = component
315 .create_endpoint("log:test?showHeaders=true", &NoOpComponentContext)
316 .unwrap();
317 let producer = endpoint.create_producer(&ctx).unwrap();
318
319 let mut exchange = Exchange::new(Message::new("hello world"));
320 exchange
321 .input
322 .set_header("source", serde_json::Value::String("test".into()));
323
324 let result = producer.oneshot(exchange).await.unwrap();
325 assert_eq!(result.input.body.as_text(), Some("hello world"));
327 }
328
329 #[test]
330 fn test_format_exchange_without_body_or_headers() {
331 let producer = LogProducer {
332 config: LogConfig {
333 category: "cat".to_string(),
334 level: LogLevel::Info,
335 show_headers: false,
336 show_body: false,
337 },
338 };
339 let exchange = Exchange::new(Message::new("ignored"));
340 let formatted = producer.format_exchange(&exchange);
341 assert_eq!(formatted, "[cat] Exchange received");
342 }
343
344 #[test]
345 fn test_format_exchange_body_variants() {
346 let base = LogProducer {
347 config: LogConfig {
348 category: "cat".to_string(),
349 level: LogLevel::Info,
350 show_headers: false,
351 show_body: true,
352 },
353 };
354
355 let empty = Exchange::new(Message::default());
356 assert!(base.format_exchange(&empty).contains("Body: [empty]"));
357
358 let mut json_msg = Message::new("");
359 json_msg.body = Body::Json(serde_json::json!({"k":"v"}));
360 let json_ex = Exchange::new(json_msg);
361 assert!(
362 base.format_exchange(&json_ex)
363 .contains("Body: {\"k\":\"v\"}")
364 );
365
366 let mut xml_msg = Message::new("");
367 xml_msg.body = Body::Xml("<a/>".to_string());
368 let xml_ex = Exchange::new(xml_msg);
369 assert!(base.format_exchange(&xml_ex).contains("Body: <a/>"));
370
371 let mut bytes_msg = Message::new("");
372 bytes_msg.body = Body::Bytes(b"abc".to_vec().into());
373 let bytes_ex = Exchange::new(bytes_msg);
374 assert!(base.format_exchange(&bytes_ex).contains("Body: [3 bytes]"));
375 }
376}