1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6use tracing::{debug, error, info, trace, warn};
7
8use camel_api::{BoxProcessor, CamelError, Exchange};
9use camel_component::{Component, Consumer, Endpoint, ProducerContext};
10use camel_endpoint::parse_uri;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum LogLevel {
19 Trace,
20 Debug,
21 Info,
22 Warn,
23 Error,
24}
25
26impl LogLevel {
27 fn from_str(s: &str) -> Self {
28 match s.to_lowercase().as_str() {
29 "trace" => LogLevel::Trace,
30 "debug" => LogLevel::Debug,
31 "info" => LogLevel::Info,
32 "warn" | "warning" => LogLevel::Warn,
33 "error" => LogLevel::Error,
34 _ => LogLevel::Info,
35 }
36 }
37}
38
39#[derive(Debug, Clone)]
47pub struct LogConfig {
48 pub category: String,
50 pub level: LogLevel,
52 pub show_headers: bool,
54 pub show_body: bool,
56}
57
58impl LogConfig {
59 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
61 let parts = parse_uri(uri)?;
62 if parts.scheme != "log" {
63 return Err(CamelError::InvalidUri(format!(
64 "expected scheme 'log', got '{}'",
65 parts.scheme
66 )));
67 }
68
69 let level = parts
70 .params
71 .get("level")
72 .map(|v| LogLevel::from_str(v))
73 .unwrap_or(LogLevel::Info);
74
75 let show_headers = parts
76 .params
77 .get("showHeaders")
78 .map(|v| v == "true")
79 .unwrap_or(false);
80
81 let show_body = parts
82 .params
83 .get("showBody")
84 .map(|v| v == "true")
85 .unwrap_or(true);
86
87 Ok(Self {
88 category: parts.path,
89 level,
90 show_headers,
91 show_body,
92 })
93 }
94}
95
96pub struct LogComponent;
102
103impl LogComponent {
104 pub fn new() -> Self {
105 Self
106 }
107}
108
109impl Default for LogComponent {
110 fn default() -> Self {
111 Self::new()
112 }
113}
114
115impl Component for LogComponent {
116 fn scheme(&self) -> &str {
117 "log"
118 }
119
120 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
121 let config = LogConfig::from_uri(uri)?;
122 Ok(Box::new(LogEndpoint {
123 uri: uri.to_string(),
124 config,
125 }))
126 }
127}
128
129struct LogEndpoint {
134 uri: String,
135 config: LogConfig,
136}
137
138impl Endpoint for LogEndpoint {
139 fn uri(&self) -> &str {
140 &self.uri
141 }
142
143 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
144 Err(CamelError::EndpointCreationFailed(
145 "log endpoint does not support consumers".to_string(),
146 ))
147 }
148
149 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
150 Ok(BoxProcessor::new(LogProducer {
151 config: self.config.clone(),
152 }))
153 }
154}
155
156#[derive(Clone)]
161struct LogProducer {
162 config: LogConfig,
163}
164
165impl LogProducer {
166 fn format_exchange(&self, exchange: &Exchange) -> String {
167 let mut parts = Vec::new();
168
169 if self.config.show_body {
170 let body_str = match &exchange.input.body {
171 camel_api::Body::Empty => "[empty]".to_string(),
172 camel_api::Body::Text(s) => s.clone(),
173 camel_api::Body::Json(v) => v.to_string(),
174 camel_api::Body::Bytes(b) => format!("[{} bytes]", b.len()),
175 camel_api::Body::Stream(s) => format!("[Stream: origin={:?}]", s.metadata.origin),
176 };
177 parts.push(format!("Body: {body_str}"));
178 }
179
180 if self.config.show_headers && !exchange.input.headers.is_empty() {
181 let headers: Vec<String> = exchange
182 .input
183 .headers
184 .iter()
185 .map(|(k, v)| format!("{k}={v}"))
186 .collect();
187 parts.push(format!("Headers: {{{}}}", headers.join(", ")));
188 }
189
190 if parts.is_empty() {
191 format!("[{}] Exchange received", self.config.category)
192 } else {
193 format!("[{}] {}", self.config.category, parts.join(" | "))
194 }
195 }
196}
197
198impl Service<Exchange> for LogProducer {
199 type Response = Exchange;
200 type Error = CamelError;
201 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
202
203 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
204 Poll::Ready(Ok(()))
205 }
206
207 fn call(&mut self, exchange: Exchange) -> Self::Future {
208 let msg = self.format_exchange(&exchange);
209 let level = self.config.level;
210
211 Box::pin(async move {
212 match level {
213 LogLevel::Trace => trace!("{msg}"),
214 LogLevel::Debug => debug!("{msg}"),
215 LogLevel::Info => info!("{msg}"),
216 LogLevel::Warn => warn!("{msg}"),
217 LogLevel::Error => error!("{msg}"),
218 }
219
220 Ok(exchange)
221 })
222 }
223}
224
225#[cfg(test)]
230mod tests {
231 use super::*;
232 use camel_api::Message;
233 use std::sync::Arc;
234 use tokio::sync::Mutex;
235 use tower::ServiceExt;
236
237 struct NullRouteController;
239 #[async_trait::async_trait]
240 impl camel_api::RouteController for NullRouteController {
241 async fn start_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
242 Ok(())
243 }
244 async fn stop_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
245 Ok(())
246 }
247 async fn restart_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
248 Ok(())
249 }
250 async fn suspend_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
251 Ok(())
252 }
253 async fn resume_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
254 Ok(())
255 }
256 fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
257 None
258 }
259 async fn start_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
260 Ok(())
261 }
262 async fn stop_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
263 Ok(())
264 }
265 }
266
267 fn test_producer_ctx() -> ProducerContext {
268 ProducerContext::new(Arc::new(Mutex::new(NullRouteController)))
269 }
270
271 #[test]
272 fn test_log_config_defaults() {
273 let config = LogConfig::from_uri("log:myCategory").unwrap();
274 assert_eq!(config.category, "myCategory");
275 assert_eq!(config.level, LogLevel::Info);
276 assert!(!config.show_headers);
277 assert!(config.show_body);
278 }
279
280 #[test]
281 fn test_log_config_with_params() {
282 let config =
283 LogConfig::from_uri("log:app?level=debug&showHeaders=true&showBody=false").unwrap();
284 assert_eq!(config.category, "app");
285 assert_eq!(config.level, LogLevel::Debug);
286 assert!(config.show_headers);
287 assert!(!config.show_body);
288 }
289
290 #[test]
291 fn test_log_config_wrong_scheme() {
292 let result = LogConfig::from_uri("timer:tick");
293 assert!(result.is_err());
294 }
295
296 #[test]
297 fn test_log_component_scheme() {
298 let component = LogComponent::new();
299 assert_eq!(component.scheme(), "log");
300 }
301
302 #[test]
303 fn test_log_endpoint_no_consumer() {
304 let component = LogComponent::new();
305 let endpoint = component.create_endpoint("log:info").unwrap();
306 assert!(endpoint.create_consumer().is_err());
307 }
308
309 #[test]
310 fn test_log_endpoint_creates_producer() {
311 let ctx = test_producer_ctx();
312 let component = LogComponent::new();
313 let endpoint = component.create_endpoint("log:info").unwrap();
314 assert!(endpoint.create_producer(&ctx).is_ok());
315 }
316
317 #[tokio::test]
318 async fn test_log_producer_processes_exchange() {
319 let ctx = test_producer_ctx();
320 let component = LogComponent::new();
321 let endpoint = component
322 .create_endpoint("log:test?showHeaders=true")
323 .unwrap();
324 let producer = endpoint.create_producer(&ctx).unwrap();
325
326 let mut exchange = Exchange::new(Message::new("hello world"));
327 exchange
328 .input
329 .set_header("source", serde_json::Value::String("test".into()));
330
331 let result = producer.oneshot(exchange).await.unwrap();
332 assert_eq!(result.input.body.as_text(), Some("hello world"));
334 }
335}