1use camel_api::{CamelError, Exchange, IdentityProcessor};
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tower::Service;
6use tracing::{debug, error, info, trace, warn};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum LogLevel {
10 Trace,
11 Debug,
12 Info,
13 Warn,
14 Error,
15}
16
17#[derive(Clone)]
18pub struct LogProcessor {
19 inner: IdentityProcessor,
20 level: LogLevel,
21 message: String,
22}
23
24impl LogProcessor {
29 pub fn new(level: LogLevel, message: String) -> Self {
30 Self {
31 inner: IdentityProcessor,
32 level,
33 message,
34 }
35 }
36}
37
38impl Service<Exchange> for LogProcessor {
39 type Response = Exchange;
40 type Error = CamelError;
41 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
42
43 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
44 self.inner.poll_ready(cx)
45 }
46
47 fn call(&mut self, exchange: Exchange) -> Self::Future {
48 let msg = self.message.clone();
49 let exchange_id = exchange.correlation_id.clone();
50 let body_preview = exchange
51 .input
52 .body
53 .as_text()
54 .unwrap_or("")
55 .chars()
56 .take(64)
57 .collect::<String>();
58 debug!(exchange_id = %exchange_id, body_preview = %body_preview, "LogProcessor processing exchange");
59 match self.level {
60 LogLevel::Trace => trace!(exchange_id = %exchange_id, "{}", msg),
61 LogLevel::Debug => debug!(exchange_id = %exchange_id, "{}", msg),
62 LogLevel::Info => info!(exchange_id = %exchange_id, "{}", msg),
63 LogLevel::Warn => warn!(exchange_id = %exchange_id, "{}", msg),
64 LogLevel::Error => error!(exchange_id = %exchange_id, "{}", msg),
65 }
66 self.inner.call(exchange)
67 }
68}
69
70#[derive(Clone)]
73pub struct DynamicLog<F> {
74 inner: IdentityProcessor,
75 level: LogLevel,
76 expr: F,
77}
78
79impl<F> DynamicLog<F>
80where
81 F: Fn(&Exchange) -> String + Clone + Send + Sync + 'static,
82{
83 pub fn new(level: LogLevel, expr: F) -> Self {
84 Self {
85 inner: IdentityProcessor,
86 level,
87 expr,
88 }
89 }
90}
91
92impl<F> Service<Exchange> for DynamicLog<F>
93where
94 F: Fn(&Exchange) -> String + Clone + Send + Sync + 'static,
95{
96 type Response = Exchange;
97 type Error = CamelError;
98 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
99
100 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
101 self.inner.poll_ready(cx)
102 }
103
104 fn call(&mut self, exchange: Exchange) -> Self::Future {
105 let exchange_id = exchange.correlation_id.clone();
106 let msg = (self.expr)(&exchange);
107 match self.level {
108 LogLevel::Trace => trace!(exchange_id = %exchange_id, "{}", msg),
109 LogLevel::Debug => debug!(exchange_id = %exchange_id, "{}", msg),
110 LogLevel::Info => info!(exchange_id = %exchange_id, "{}", msg),
111 LogLevel::Warn => warn!(exchange_id = %exchange_id, "{}", msg),
112 LogLevel::Error => error!(exchange_id = %exchange_id, "{}", msg),
113 }
114 self.inner.call(exchange)
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use super::*;
121 use camel_api::body::Body;
122 use camel_api::{Message, Value};
123 use tower::ServiceExt;
124
125 #[tokio::test]
126 async fn test_log_processor_passes_exchange_through() {
127 let mut processor = LogProcessor::new(LogLevel::Info, "test message".into());
128 let exchange = Exchange::default();
129 let result = processor.call(exchange).await;
130 assert!(result.is_ok());
131 }
132
133 #[tokio::test]
134 async fn test_log_processor_preserves_exchange_body() {
135 let mut processor = LogProcessor::new(LogLevel::Debug, "debug message".into());
136 let mut exchange = Exchange::default();
137 exchange.input.body = Body::Text("test body".into());
138 let result = processor.call(exchange).await.unwrap();
139 assert_eq!(result.input.body.as_text(), Some("test body"));
140 }
141
142 #[tokio::test]
143 async fn test_dynamic_log_evaluates_body() {
144 let svc = DynamicLog::new(LogLevel::Info, |ex: &Exchange| {
145 format!("body={}", ex.input.body.as_text().unwrap_or(""))
146 });
147 let exchange = Exchange::new(Message::new("hello"));
148 let result = svc.oneshot(exchange).await.unwrap();
149 assert_eq!(result.input.body.as_text(), Some("hello"));
151 }
152
153 #[tokio::test]
154 async fn test_dynamic_log_evaluates_header() {
155 let svc = DynamicLog::new(LogLevel::Info, |ex: &Exchange| {
156 let counter = ex
157 .input
158 .header("CamelTimerCounter")
159 .and_then(|v| v.as_i64())
160 .unwrap_or(0);
161 format!("{} World", counter)
162 });
163 let mut msg = Message::new("");
164 msg.set_header("CamelTimerCounter", Value::Number(42.into()));
165 let exchange = Exchange::new(msg);
166 let result = svc.oneshot(exchange).await.unwrap();
167 assert_eq!(
169 result.input.header("CamelTimerCounter"),
170 Some(&Value::Number(42.into()))
171 );
172 }
173}