camel_component_log/
lib.rs1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::task::{Context, Poll};
5
6use tower::Service;
7use tracing::{debug, error, info, trace, warn};
8
9use camel_api::{BoxProcessor, CamelError, Exchange};
10use camel_component::{Component, Consumer, Endpoint, ProducerContext};
11use camel_endpoint::UriConfig;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
19pub enum LogLevel {
20 Trace,
21 Debug,
22 #[default]
23 Info,
24 Warn,
25 Error,
26}
27
28impl FromStr for LogLevel {
29 type Err = String;
30
31 fn from_str(s: &str) -> Result<Self, Self::Err> {
32 match s.to_lowercase().as_str() {
33 "trace" => Ok(LogLevel::Trace),
34 "debug" => Ok(LogLevel::Debug),
35 "info" => Ok(LogLevel::Info),
36 "warn" | "warning" => Ok(LogLevel::Warn),
37 "error" => Ok(LogLevel::Error),
38 _ => Err(format!("Invalid log level: {}", s)),
39 }
40 }
41}
42
43#[derive(Debug, Clone, UriConfig)]
51#[uri_scheme = "log"]
52pub struct LogConfig {
53 pub category: String,
55 #[uri_param(default = "Info")]
57 pub level: LogLevel,
58 #[uri_param(name = "showHeaders", default = "false")]
60 pub show_headers: bool,
61 #[uri_param(name = "showBody", default = "true")]
63 pub show_body: bool,
64}
65
66pub struct LogComponent;
72
73impl LogComponent {
74 pub fn new() -> Self {
75 Self
76 }
77}
78
79impl Default for LogComponent {
80 fn default() -> Self {
81 Self::new()
82 }
83}
84
85impl Component for LogComponent {
86 fn scheme(&self) -> &str {
87 "log"
88 }
89
90 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
91 let config = LogConfig::from_uri(uri)?;
92 Ok(Box::new(LogEndpoint {
93 uri: uri.to_string(),
94 config,
95 }))
96 }
97}
98
99struct LogEndpoint {
104 uri: String,
105 config: LogConfig,
106}
107
108impl Endpoint for LogEndpoint {
109 fn uri(&self) -> &str {
110 &self.uri
111 }
112
113 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
114 Err(CamelError::EndpointCreationFailed(
115 "log endpoint does not support consumers".to_string(),
116 ))
117 }
118
119 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
120 Ok(BoxProcessor::new(LogProducer {
121 config: self.config.clone(),
122 }))
123 }
124}
125
126#[derive(Clone)]
131struct LogProducer {
132 config: LogConfig,
133}
134
135impl LogProducer {
136 fn format_exchange(&self, exchange: &Exchange) -> String {
137 let mut parts = Vec::new();
138
139 if self.config.show_body {
140 let body_str = match &exchange.input.body {
141 camel_api::Body::Empty => "[empty]".to_string(),
142 camel_api::Body::Text(s) => s.clone(),
143 camel_api::Body::Json(v) => v.to_string(),
144 camel_api::Body::Xml(s) => s.clone(),
145 camel_api::Body::Bytes(b) => format!("[{} bytes]", b.len()),
146 camel_api::Body::Stream(s) => format!("[Stream: origin={:?}]", s.metadata.origin),
147 };
148 parts.push(format!("Body: {body_str}"));
149 }
150
151 if self.config.show_headers && !exchange.input.headers.is_empty() {
152 let headers: Vec<String> = exchange
153 .input
154 .headers
155 .iter()
156 .map(|(k, v)| format!("{k}={v}"))
157 .collect();
158 parts.push(format!("Headers: {{{}}}", headers.join(", ")));
159 }
160
161 if parts.is_empty() {
162 format!("[{}] Exchange received", self.config.category)
163 } else {
164 format!("[{}] {}", self.config.category, parts.join(" | "))
165 }
166 }
167}
168
169impl Service<Exchange> for LogProducer {
170 type Response = Exchange;
171 type Error = CamelError;
172 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
173
174 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
175 Poll::Ready(Ok(()))
176 }
177
178 fn call(&mut self, exchange: Exchange) -> Self::Future {
179 let msg = self.format_exchange(&exchange);
180 let level = self.config.level;
181
182 Box::pin(async move {
183 match level {
184 LogLevel::Trace => trace!("{msg}"),
185 LogLevel::Debug => debug!("{msg}"),
186 LogLevel::Info => info!("{msg}"),
187 LogLevel::Warn => warn!("{msg}"),
188 LogLevel::Error => error!("{msg}"),
189 }
190
191 Ok(exchange)
192 })
193 }
194}
195
196#[cfg(test)]
201mod tests {
202 use super::*;
203 use camel_api::Message;
204 use tower::ServiceExt;
205
206 fn test_producer_ctx() -> ProducerContext {
207 ProducerContext::new()
208 }
209
210 #[test]
211 fn test_log_config_defaults() {
212 let config = LogConfig::from_uri("log:myCategory").unwrap();
213 assert_eq!(config.category, "myCategory");
214 assert_eq!(config.level, LogLevel::Info);
215 assert!(!config.show_headers);
216 assert!(config.show_body);
217 }
218
219 #[test]
220 fn test_log_config_with_params() {
221 let config =
222 LogConfig::from_uri("log:app?level=debug&showHeaders=true&showBody=false").unwrap();
223 assert_eq!(config.category, "app");
224 assert_eq!(config.level, LogLevel::Debug);
225 assert!(config.show_headers);
226 assert!(!config.show_body);
227 }
228
229 #[test]
230 fn test_log_config_wrong_scheme() {
231 let result = LogConfig::from_uri("timer:tick");
232 assert!(result.is_err());
233 }
234
235 #[test]
236 fn test_log_component_scheme() {
237 let component = LogComponent::new();
238 assert_eq!(component.scheme(), "log");
239 }
240
241 #[test]
242 fn test_log_endpoint_no_consumer() {
243 let component = LogComponent::new();
244 let endpoint = component.create_endpoint("log:info").unwrap();
245 assert!(endpoint.create_consumer().is_err());
246 }
247
248 #[test]
249 fn test_log_endpoint_creates_producer() {
250 let ctx = test_producer_ctx();
251 let component = LogComponent::new();
252 let endpoint = component.create_endpoint("log:info").unwrap();
253 assert!(endpoint.create_producer(&ctx).is_ok());
254 }
255
256 #[tokio::test]
257 async fn test_log_producer_processes_exchange() {
258 let ctx = test_producer_ctx();
259 let component = LogComponent::new();
260 let endpoint = component
261 .create_endpoint("log:test?showHeaders=true")
262 .unwrap();
263 let producer = endpoint.create_producer(&ctx).unwrap();
264
265 let mut exchange = Exchange::new(Message::new("hello world"));
266 exchange
267 .input
268 .set_header("source", serde_json::Value::String("test".into()));
269
270 let result = producer.oneshot(exchange).await.unwrap();
271 assert_eq!(result.input.body.as_text(), Some("hello world"));
273 }
274}