1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::task::{Context, Poll};
5
6use tower::Service;
7use tracing::{debug, error, info, trace, warn};
8
9use camel_component_api::UriConfig;
10use camel_component_api::{BoxProcessor, CamelError, Exchange};
11use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
19pub enum LogLevel {
20 Trace,
21 Debug,
22 #[default]
23 Info,
24 Warn,
25 Error,
26}
27
28impl FromStr for LogLevel {
29 type Err = String;
30
31 fn from_str(s: &str) -> Result<Self, Self::Err> {
32 match s.to_lowercase().as_str() {
33 "trace" => Ok(LogLevel::Trace),
34 "debug" => Ok(LogLevel::Debug),
35 "info" => Ok(LogLevel::Info),
36 "warn" | "warning" => Ok(LogLevel::Warn),
37 "error" => Ok(LogLevel::Error),
38 _ => Err(format!("Invalid log level: {}", s)),
39 }
40 }
41}
42
43#[derive(Debug, Clone, UriConfig)]
51#[uri_scheme = "log"]
52#[uri_config(crate = "camel_component_api")]
53pub struct LogConfig {
54 pub category: String,
56 #[uri_param(default = "Info")]
58 pub level: LogLevel,
59 #[uri_param(name = "showHeaders", default = "false")]
61 pub show_headers: bool,
62 #[uri_param(name = "showBody", default = "true")]
64 pub show_body: bool,
65}
66
67pub struct LogComponent;
73
74impl LogComponent {
75 pub fn new() -> Self {
76 Self
77 }
78}
79
80impl Default for LogComponent {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl Component for LogComponent {
87 fn scheme(&self) -> &str {
88 "log"
89 }
90
91 fn create_endpoint(
92 &self,
93 uri: &str,
94 _ctx: &dyn camel_component_api::ComponentContext,
95 ) -> Result<Box<dyn Endpoint>, CamelError> {
96 let config = LogConfig::from_uri(uri)?;
97 Ok(Box::new(LogEndpoint {
98 uri: uri.to_string(),
99 config,
100 }))
101 }
102}
103
104struct LogEndpoint {
109 uri: String,
110 config: LogConfig,
111}
112
113impl Endpoint for LogEndpoint {
114 fn uri(&self) -> &str {
115 &self.uri
116 }
117
118 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
119 Err(CamelError::EndpointCreationFailed(
120 "log endpoint does not support consumers".to_string(),
121 ))
122 }
123
124 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
125 Ok(BoxProcessor::new(LogProducer {
126 config: self.config.clone(),
127 }))
128 }
129}
130
131#[derive(Clone)]
136struct LogProducer {
137 config: LogConfig,
138}
139
140impl LogProducer {
141 fn format_exchange(&self, exchange: &Exchange) -> String {
142 let mut parts = Vec::new();
143
144 if self.config.show_body {
145 let body_str = match &exchange.input.body {
146 camel_component_api::Body::Empty => "[empty]".to_string(),
147 camel_component_api::Body::Text(s) => s.clone(),
148 camel_component_api::Body::Json(v) => v.to_string(),
149 camel_component_api::Body::Xml(s) => s.clone(),
150 camel_component_api::Body::Bytes(b) => format!("[{} bytes]", b.len()),
151 camel_component_api::Body::Stream(s) => {
152 format!("[Stream: origin={:?}]", s.metadata.origin)
153 }
154 };
155 parts.push(format!("Body: {body_str}"));
156 }
157
158 if self.config.show_headers && !exchange.input.headers.is_empty() {
159 let headers: Vec<String> = exchange
160 .input
161 .headers
162 .iter()
163 .map(|(k, v)| format!("{k}={v}"))
164 .collect();
165 parts.push(format!("Headers: {{{}}}", headers.join(", ")));
166 }
167
168 if parts.is_empty() {
169 format!("[{}] Exchange received", self.config.category)
170 } else {
171 format!("[{}] {}", self.config.category, parts.join(" | "))
172 }
173 }
174}
175
176impl Service<Exchange> for LogProducer {
177 type Response = Exchange;
178 type Error = CamelError;
179 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
180
181 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
182 Poll::Ready(Ok(()))
183 }
184
185 fn call(&mut self, exchange: Exchange) -> Self::Future {
186 let msg = self.format_exchange(&exchange);
187 let level = self.config.level;
188
189 Box::pin(async move {
190 match level {
191 LogLevel::Trace => trace!("{msg}"),
192 LogLevel::Debug => debug!("{msg}"),
193 LogLevel::Info => info!("{msg}"),
194 LogLevel::Warn => warn!("{msg}"),
195 LogLevel::Error => error!("{msg}"),
196 }
197
198 Ok(exchange)
199 })
200 }
201}
202
203#[cfg(test)]
208mod tests {
209 use super::*;
210 use camel_component_api::Body;
211 use camel_component_api::Message;
212 use camel_component_api::NoOpComponentContext;
213 use tower::ServiceExt;
214
215 fn test_producer_ctx() -> ProducerContext {
216 ProducerContext::new()
217 }
218
219 #[test]
220 fn test_log_config_defaults() {
221 let config = LogConfig::from_uri("log:myCategory").unwrap();
222 assert_eq!(config.category, "myCategory");
223 assert_eq!(config.level, LogLevel::Info);
224 assert!(!config.show_headers);
225 assert!(config.show_body);
226 }
227
228 #[test]
229 fn test_log_config_with_params() {
230 let config =
231 LogConfig::from_uri("log:app?level=debug&showHeaders=true&showBody=false").unwrap();
232 assert_eq!(config.category, "app");
233 assert_eq!(config.level, LogLevel::Debug);
234 assert!(config.show_headers);
235 assert!(!config.show_body);
236 }
237
238 #[test]
239 fn test_log_config_wrong_scheme() {
240 let result = LogConfig::from_uri("timer:tick");
241 assert!(result.is_err());
242 }
243
244 #[test]
245 fn test_log_component_scheme() {
246 let component = LogComponent::new();
247 assert_eq!(component.scheme(), "log");
248 }
249
250 #[test]
251 fn test_log_component_default() {
252 let component = LogComponent;
253 assert_eq!(component.scheme(), "log");
254 }
255
256 #[test]
257 fn test_log_level_from_str_variants() {
258 assert_eq!("trace".parse::<LogLevel>().unwrap(), LogLevel::Trace);
259 assert_eq!("DEBUG".parse::<LogLevel>().unwrap(), LogLevel::Debug);
260 assert_eq!("Info".parse::<LogLevel>().unwrap(), LogLevel::Info);
261 assert_eq!("warning".parse::<LogLevel>().unwrap(), LogLevel::Warn);
262 assert_eq!("error".parse::<LogLevel>().unwrap(), LogLevel::Error);
263 }
264
265 #[test]
266 fn test_log_level_from_str_invalid() {
267 let err = "nope".parse::<LogLevel>().unwrap_err();
268 assert_eq!(err, "Invalid log level: nope");
269 }
270
271 #[test]
272 fn test_log_config_invalid_level_falls_back_to_default() {
273 let config = LogConfig::from_uri("log:test?level=invalid").unwrap();
274 assert_eq!(config.level, LogLevel::Info);
275 }
276
277 #[test]
278 fn test_log_endpoint_uri() {
279 let component = LogComponent::new();
280 let endpoint = component
281 .create_endpoint("log:uri-check", &NoOpComponentContext)
282 .unwrap();
283 assert_eq!(endpoint.uri(), "log:uri-check");
284 }
285
286 #[test]
287 fn test_log_endpoint_no_consumer() {
288 let component = LogComponent::new();
289 let endpoint = component
290 .create_endpoint("log:info", &NoOpComponentContext)
291 .unwrap();
292 assert!(endpoint.create_consumer().is_err());
293 }
294
295 #[test]
296 fn test_log_endpoint_creates_producer() {
297 let ctx = test_producer_ctx();
298 let component = LogComponent::new();
299 let endpoint = component
300 .create_endpoint("log:info", &NoOpComponentContext)
301 .unwrap();
302 assert!(endpoint.create_producer(&ctx).is_ok());
303 }
304
305 #[tokio::test]
306 async fn test_log_producer_processes_exchange() {
307 let ctx = test_producer_ctx();
308 let component = LogComponent::new();
309 let endpoint = component
310 .create_endpoint("log:test?showHeaders=true", &NoOpComponentContext)
311 .unwrap();
312 let producer = endpoint.create_producer(&ctx).unwrap();
313
314 let mut exchange = Exchange::new(Message::new("hello world"));
315 exchange
316 .input
317 .set_header("source", serde_json::Value::String("test".into()));
318
319 let result = producer.oneshot(exchange).await.unwrap();
320 assert_eq!(result.input.body.as_text(), Some("hello world"));
322 }
323
324 #[test]
325 fn test_format_exchange_without_body_or_headers() {
326 let producer = LogProducer {
327 config: LogConfig {
328 category: "cat".to_string(),
329 level: LogLevel::Info,
330 show_headers: false,
331 show_body: false,
332 },
333 };
334 let exchange = Exchange::new(Message::new("ignored"));
335 let formatted = producer.format_exchange(&exchange);
336 assert_eq!(formatted, "[cat] Exchange received");
337 }
338
339 #[test]
340 fn test_format_exchange_body_variants() {
341 let base = LogProducer {
342 config: LogConfig {
343 category: "cat".to_string(),
344 level: LogLevel::Info,
345 show_headers: false,
346 show_body: true,
347 },
348 };
349
350 let empty = Exchange::new(Message::default());
351 assert!(base.format_exchange(&empty).contains("Body: [empty]"));
352
353 let mut json_msg = Message::new("");
354 json_msg.body = Body::Json(serde_json::json!({"k":"v"}));
355 let json_ex = Exchange::new(json_msg);
356 assert!(
357 base.format_exchange(&json_ex)
358 .contains("Body: {\"k\":\"v\"}")
359 );
360
361 let mut xml_msg = Message::new("");
362 xml_msg.body = Body::Xml("<a/>".to_string());
363 let xml_ex = Exchange::new(xml_msg);
364 assert!(base.format_exchange(&xml_ex).contains("Body: <a/>"));
365
366 let mut bytes_msg = Message::new("");
367 bytes_msg.body = Body::Bytes(b"abc".to_vec().into());
368 let bytes_ex = Exchange::new(bytes_msg);
369 assert!(base.format_exchange(&bytes_ex).contains("Body: [3 bytes]"));
370 }
371}