camel_component_log/
lib.rs1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::task::{Context, Poll};
5
6use tower::Service;
7use tracing::{debug, error, info, trace, warn};
8
9use camel_component_api::UriConfig;
10use camel_component_api::{BoxProcessor, CamelError, Exchange};
11use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
19pub enum LogLevel {
20 Trace,
21 Debug,
22 #[default]
23 Info,
24 Warn,
25 Error,
26}
27
28impl FromStr for LogLevel {
29 type Err = String;
30
31 fn from_str(s: &str) -> Result<Self, Self::Err> {
32 match s.to_lowercase().as_str() {
33 "trace" => Ok(LogLevel::Trace),
34 "debug" => Ok(LogLevel::Debug),
35 "info" => Ok(LogLevel::Info),
36 "warn" | "warning" => Ok(LogLevel::Warn),
37 "error" => Ok(LogLevel::Error),
38 _ => Err(format!("Invalid log level: {}", s)),
39 }
40 }
41}
42
43#[derive(Debug, Clone, UriConfig)]
51#[uri_scheme = "log"]
52#[uri_config(crate = "camel_component_api")]
53pub struct LogConfig {
54 pub category: String,
56 #[uri_param(default = "Info")]
58 pub level: LogLevel,
59 #[uri_param(name = "showHeaders", default = "false")]
61 pub show_headers: bool,
62 #[uri_param(name = "showBody", default = "true")]
64 pub show_body: bool,
65}
66
67pub struct LogComponent;
73
74impl LogComponent {
75 pub fn new() -> Self {
76 Self
77 }
78}
79
80impl Default for LogComponent {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl Component for LogComponent {
87 fn scheme(&self) -> &str {
88 "log"
89 }
90
91 fn create_endpoint(
92 &self,
93 uri: &str,
94 _ctx: &dyn camel_component_api::ComponentContext,
95 ) -> Result<Box<dyn Endpoint>, CamelError> {
96 let config = LogConfig::from_uri(uri)?;
97 Ok(Box::new(LogEndpoint {
98 uri: uri.to_string(),
99 config,
100 }))
101 }
102}
103
104struct LogEndpoint {
109 uri: String,
110 config: LogConfig,
111}
112
113impl Endpoint for LogEndpoint {
114 fn uri(&self) -> &str {
115 &self.uri
116 }
117
118 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
119 Err(CamelError::EndpointCreationFailed(
120 "log endpoint does not support consumers".to_string(),
121 ))
122 }
123
124 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
125 Ok(BoxProcessor::new(LogProducer {
126 config: self.config.clone(),
127 }))
128 }
129}
130
131#[derive(Clone)]
136struct LogProducer {
137 config: LogConfig,
138}
139
140impl LogProducer {
141 fn format_exchange(&self, exchange: &Exchange) -> String {
142 let mut parts = Vec::new();
143
144 if self.config.show_body {
145 let body_str = match &exchange.input.body {
146 camel_component_api::Body::Empty => "[empty]".to_string(),
147 camel_component_api::Body::Text(s) => s.clone(),
148 camel_component_api::Body::Json(v) => v.to_string(),
149 camel_component_api::Body::Xml(s) => s.clone(),
150 camel_component_api::Body::Bytes(b) => format!("[{} bytes]", b.len()),
151 camel_component_api::Body::Stream(s) => {
152 format!("[Stream: origin={:?}]", s.metadata.origin)
153 }
154 };
155 parts.push(format!("Body: {body_str}"));
156 }
157
158 if self.config.show_headers && !exchange.input.headers.is_empty() {
159 let headers: Vec<String> = exchange
160 .input
161 .headers
162 .iter()
163 .map(|(k, v)| format!("{k}={v}"))
164 .collect();
165 parts.push(format!("Headers: {{{}}}", headers.join(", ")));
166 }
167
168 if parts.is_empty() {
169 format!("[{}] Exchange received", self.config.category)
170 } else {
171 format!("[{}] {}", self.config.category, parts.join(" | "))
172 }
173 }
174}
175
176impl Service<Exchange> for LogProducer {
177 type Response = Exchange;
178 type Error = CamelError;
179 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
180
181 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
182 Poll::Ready(Ok(()))
183 }
184
185 fn call(&mut self, exchange: Exchange) -> Self::Future {
186 let msg = self.format_exchange(&exchange);
187 let level = self.config.level;
188
189 Box::pin(async move {
190 match level {
191 LogLevel::Trace => trace!("{msg}"),
192 LogLevel::Debug => debug!("{msg}"),
193 LogLevel::Info => info!("{msg}"),
194 LogLevel::Warn => warn!("{msg}"),
195 LogLevel::Error => error!("{msg}"),
196 }
197
198 Ok(exchange)
199 })
200 }
201}
202
203#[cfg(test)]
208mod tests {
209 use super::*;
210 use camel_component_api::Message;
211 use camel_component_api::NoOpComponentContext;
212 use tower::ServiceExt;
213
214 fn test_producer_ctx() -> ProducerContext {
215 ProducerContext::new()
216 }
217
218 #[test]
219 fn test_log_config_defaults() {
220 let config = LogConfig::from_uri("log:myCategory").unwrap();
221 assert_eq!(config.category, "myCategory");
222 assert_eq!(config.level, LogLevel::Info);
223 assert!(!config.show_headers);
224 assert!(config.show_body);
225 }
226
227 #[test]
228 fn test_log_config_with_params() {
229 let config =
230 LogConfig::from_uri("log:app?level=debug&showHeaders=true&showBody=false").unwrap();
231 assert_eq!(config.category, "app");
232 assert_eq!(config.level, LogLevel::Debug);
233 assert!(config.show_headers);
234 assert!(!config.show_body);
235 }
236
237 #[test]
238 fn test_log_config_wrong_scheme() {
239 let result = LogConfig::from_uri("timer:tick");
240 assert!(result.is_err());
241 }
242
243 #[test]
244 fn test_log_component_scheme() {
245 let component = LogComponent::new();
246 assert_eq!(component.scheme(), "log");
247 }
248
249 #[test]
250 fn test_log_endpoint_no_consumer() {
251 let component = LogComponent::new();
252 let endpoint = component
253 .create_endpoint("log:info", &NoOpComponentContext)
254 .unwrap();
255 assert!(endpoint.create_consumer().is_err());
256 }
257
258 #[test]
259 fn test_log_endpoint_creates_producer() {
260 let ctx = test_producer_ctx();
261 let component = LogComponent::new();
262 let endpoint = component
263 .create_endpoint("log:info", &NoOpComponentContext)
264 .unwrap();
265 assert!(endpoint.create_producer(&ctx).is_ok());
266 }
267
268 #[tokio::test]
269 async fn test_log_producer_processes_exchange() {
270 let ctx = test_producer_ctx();
271 let component = LogComponent::new();
272 let endpoint = component
273 .create_endpoint("log:test?showHeaders=true", &NoOpComponentContext)
274 .unwrap();
275 let producer = endpoint.create_producer(&ctx).unwrap();
276
277 let mut exchange = Exchange::new(Message::new("hello world"));
278 exchange
279 .input
280 .set_header("source", serde_json::Value::String("test".into()));
281
282 let result = producer.oneshot(exchange).await.unwrap();
283 assert_eq!(result.input.body.as_text(), Some("hello world"));
285 }
286}