1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6use tracing::{debug, error, info, trace, warn};
7
8use camel_api::{BoxProcessor, CamelError, Exchange};
9use camel_component::{Component, Consumer, Endpoint, ProducerContext};
10use camel_endpoint::parse_uri;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum LogLevel {
19 Trace,
20 Debug,
21 Info,
22 Warn,
23 Error,
24}
25
26impl LogLevel {
27 fn from_str(s: &str) -> Self {
28 match s.to_lowercase().as_str() {
29 "trace" => LogLevel::Trace,
30 "debug" => LogLevel::Debug,
31 "info" => LogLevel::Info,
32 "warn" | "warning" => LogLevel::Warn,
33 "error" => LogLevel::Error,
34 _ => LogLevel::Info,
35 }
36 }
37}
38
39#[derive(Debug, Clone)]
47pub struct LogConfig {
48 pub category: String,
50 pub level: LogLevel,
52 pub show_headers: bool,
54 pub show_body: bool,
56}
57
58impl LogConfig {
59 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
61 let parts = parse_uri(uri)?;
62 if parts.scheme != "log" {
63 return Err(CamelError::InvalidUri(format!(
64 "expected scheme 'log', got '{}'",
65 parts.scheme
66 )));
67 }
68
69 let level = parts
70 .params
71 .get("level")
72 .map(|v| LogLevel::from_str(v))
73 .unwrap_or(LogLevel::Info);
74
75 let show_headers = parts
76 .params
77 .get("showHeaders")
78 .map(|v| v == "true")
79 .unwrap_or(false);
80
81 let show_body = parts
82 .params
83 .get("showBody")
84 .map(|v| v == "true")
85 .unwrap_or(true);
86
87 Ok(Self {
88 category: parts.path,
89 level,
90 show_headers,
91 show_body,
92 })
93 }
94}
95
96pub struct LogComponent;
102
103impl LogComponent {
104 pub fn new() -> Self {
105 Self
106 }
107}
108
109impl Default for LogComponent {
110 fn default() -> Self {
111 Self::new()
112 }
113}
114
115impl Component for LogComponent {
116 fn scheme(&self) -> &str {
117 "log"
118 }
119
120 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
121 let config = LogConfig::from_uri(uri)?;
122 Ok(Box::new(LogEndpoint {
123 uri: uri.to_string(),
124 config,
125 }))
126 }
127}
128
129struct LogEndpoint {
134 uri: String,
135 config: LogConfig,
136}
137
138impl Endpoint for LogEndpoint {
139 fn uri(&self) -> &str {
140 &self.uri
141 }
142
143 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
144 Err(CamelError::EndpointCreationFailed(
145 "log endpoint does not support consumers".to_string(),
146 ))
147 }
148
149 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
150 Ok(BoxProcessor::new(LogProducer {
151 config: self.config.clone(),
152 }))
153 }
154}
155
156#[derive(Clone)]
161struct LogProducer {
162 config: LogConfig,
163}
164
165impl LogProducer {
166 fn format_exchange(&self, exchange: &Exchange) -> String {
167 let mut parts = Vec::new();
168
169 if self.config.show_body {
170 let body_str = match &exchange.input.body {
171 camel_api::Body::Empty => "[empty]".to_string(),
172 camel_api::Body::Text(s) => s.clone(),
173 camel_api::Body::Json(v) => v.to_string(),
174 camel_api::Body::Bytes(b) => format!("[{} bytes]", b.len()),
175 };
176 parts.push(format!("Body: {body_str}"));
177 }
178
179 if self.config.show_headers && !exchange.input.headers.is_empty() {
180 let headers: Vec<String> = exchange
181 .input
182 .headers
183 .iter()
184 .map(|(k, v)| format!("{k}={v}"))
185 .collect();
186 parts.push(format!("Headers: {{{}}}", headers.join(", ")));
187 }
188
189 if parts.is_empty() {
190 format!("[{}] Exchange received", self.config.category)
191 } else {
192 format!("[{}] {}", self.config.category, parts.join(" | "))
193 }
194 }
195}
196
197impl Service<Exchange> for LogProducer {
198 type Response = Exchange;
199 type Error = CamelError;
200 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
201
202 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
203 Poll::Ready(Ok(()))
204 }
205
206 fn call(&mut self, exchange: Exchange) -> Self::Future {
207 let msg = self.format_exchange(&exchange);
208 let level = self.config.level;
209
210 Box::pin(async move {
211 match level {
212 LogLevel::Trace => trace!("{msg}"),
213 LogLevel::Debug => debug!("{msg}"),
214 LogLevel::Info => info!("{msg}"),
215 LogLevel::Warn => warn!("{msg}"),
216 LogLevel::Error => error!("{msg}"),
217 }
218
219 Ok(exchange)
220 })
221 }
222}
223
224#[cfg(test)]
229mod tests {
230 use super::*;
231 use camel_api::Message;
232 use std::sync::Arc;
233 use tokio::sync::Mutex;
234 use tower::ServiceExt;
235
236 struct NullRouteController;
238 #[async_trait::async_trait]
239 impl camel_api::RouteController for NullRouteController {
240 async fn start_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
241 Ok(())
242 }
243 async fn stop_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
244 Ok(())
245 }
246 async fn restart_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
247 Ok(())
248 }
249 async fn suspend_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
250 Ok(())
251 }
252 async fn resume_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
253 Ok(())
254 }
255 fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
256 None
257 }
258 async fn start_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
259 Ok(())
260 }
261 async fn stop_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
262 Ok(())
263 }
264 }
265
266 fn test_producer_ctx() -> ProducerContext {
267 ProducerContext::new(Arc::new(Mutex::new(NullRouteController)))
268 }
269
270 #[test]
271 fn test_log_config_defaults() {
272 let config = LogConfig::from_uri("log:myCategory").unwrap();
273 assert_eq!(config.category, "myCategory");
274 assert_eq!(config.level, LogLevel::Info);
275 assert!(!config.show_headers);
276 assert!(config.show_body);
277 }
278
279 #[test]
280 fn test_log_config_with_params() {
281 let config =
282 LogConfig::from_uri("log:app?level=debug&showHeaders=true&showBody=false").unwrap();
283 assert_eq!(config.category, "app");
284 assert_eq!(config.level, LogLevel::Debug);
285 assert!(config.show_headers);
286 assert!(!config.show_body);
287 }
288
289 #[test]
290 fn test_log_config_wrong_scheme() {
291 let result = LogConfig::from_uri("timer:tick");
292 assert!(result.is_err());
293 }
294
295 #[test]
296 fn test_log_component_scheme() {
297 let component = LogComponent::new();
298 assert_eq!(component.scheme(), "log");
299 }
300
301 #[test]
302 fn test_log_endpoint_no_consumer() {
303 let component = LogComponent::new();
304 let endpoint = component.create_endpoint("log:info").unwrap();
305 assert!(endpoint.create_consumer().is_err());
306 }
307
308 #[test]
309 fn test_log_endpoint_creates_producer() {
310 let ctx = test_producer_ctx();
311 let component = LogComponent::new();
312 let endpoint = component.create_endpoint("log:info").unwrap();
313 assert!(endpoint.create_producer(&ctx).is_ok());
314 }
315
316 #[tokio::test]
317 async fn test_log_producer_processes_exchange() {
318 let ctx = test_producer_ctx();
319 let component = LogComponent::new();
320 let endpoint = component
321 .create_endpoint("log:test?showHeaders=true")
322 .unwrap();
323 let producer = endpoint.create_producer(&ctx).unwrap();
324
325 let mut exchange = Exchange::new(Message::new("hello world"));
326 exchange
327 .input
328 .set_header("source", serde_json::Value::String("test".into()));
329
330 let result = producer.oneshot(exchange).await.unwrap();
331 assert_eq!(result.input.body.as_text(), Some("hello world"));
333 }
334}