1use camel_api::{CamelError, Exchange, IdentityProcessor};
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tower::Service;
6use tracing::{debug, info, trace, warn};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum LogLevel {
10 Trace,
11 Debug,
12 Info,
13 Warn,
14 Error,
15}
16
17#[derive(Clone)]
18pub struct LogProcessor {
19 inner: IdentityProcessor,
20 level: LogLevel,
21 message: String,
22}
23
24impl LogProcessor {
29 pub fn new(level: LogLevel, message: String) -> Self {
30 Self {
31 inner: IdentityProcessor,
32 level,
33 message,
34 }
35 }
36}
37
38impl Service<Exchange> for LogProcessor {
39 type Response = Exchange;
40 type Error = CamelError;
41 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
42
43 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
44 self.inner.poll_ready(cx)
45 }
46
47 fn call(&mut self, exchange: Exchange) -> Self::Future {
48 let msg = self.message.clone();
49 let exchange_id = exchange.correlation_id.clone();
50 let body_preview = exchange
51 .input
52 .body
53 .as_text()
54 .unwrap_or("")
55 .chars()
56 .take(64)
57 .collect::<String>();
58 debug!(exchange_id = %exchange_id, body_preview = %body_preview, "LogProcessor processing exchange");
59 match self.level {
60 LogLevel::Trace => trace!(exchange_id = %exchange_id, "{}", msg),
61 LogLevel::Debug => debug!(exchange_id = %exchange_id, "{}", msg),
62 LogLevel::Info => info!(exchange_id = %exchange_id, "{}", msg),
63 LogLevel::Warn => warn!(exchange_id = %exchange_id, "{}", msg),
64 LogLevel::Error => warn!(exchange_id = %exchange_id, "{}", msg),
66 }
67 self.inner.call(exchange)
68 }
69}
70
71#[derive(Clone)]
74pub struct DynamicLog<F> {
75 inner: IdentityProcessor,
76 level: LogLevel,
77 expr: F,
78}
79
80impl<F> DynamicLog<F>
81where
82 F: Fn(&Exchange) -> String + Clone + Send + Sync + 'static,
83{
84 pub fn new(level: LogLevel, expr: F) -> Self {
85 Self {
86 inner: IdentityProcessor,
87 level,
88 expr,
89 }
90 }
91}
92
93impl<F> Service<Exchange> for DynamicLog<F>
94where
95 F: Fn(&Exchange) -> String + Clone + Send + Sync + 'static,
96{
97 type Response = Exchange;
98 type Error = CamelError;
99 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
100
101 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
102 self.inner.poll_ready(cx)
103 }
104
105 fn call(&mut self, exchange: Exchange) -> Self::Future {
106 let exchange_id = exchange.correlation_id.clone();
107 let msg = (self.expr)(&exchange);
108 match self.level {
109 LogLevel::Trace => trace!(exchange_id = %exchange_id, "{}", msg),
110 LogLevel::Debug => debug!(exchange_id = %exchange_id, "{}", msg),
111 LogLevel::Info => info!(exchange_id = %exchange_id, "{}", msg),
112 LogLevel::Warn => warn!(exchange_id = %exchange_id, "{}", msg),
113 LogLevel::Error => warn!(exchange_id = %exchange_id, "{}", msg),
115 }
116 self.inner.call(exchange)
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123 use camel_api::body::Body;
124 use camel_api::{Message, Value};
125 use tower::ServiceExt;
126
127 #[tokio::test]
128 async fn test_log_processor_passes_exchange_through() {
129 let mut processor = LogProcessor::new(LogLevel::Info, "test message".into());
130 let exchange = Exchange::default();
131 let result = processor.call(exchange).await;
132 assert!(result.is_ok());
133 }
134
135 #[tokio::test]
136 async fn test_log_processor_preserves_exchange_body() {
137 let mut processor = LogProcessor::new(LogLevel::Debug, "debug message".into());
138 let mut exchange = Exchange::default();
139 exchange.input.body = Body::Text("test body".into());
140 let result = processor.call(exchange).await.unwrap();
141 assert_eq!(result.input.body.as_text(), Some("test body"));
142 }
143
144 #[tokio::test]
145 async fn test_dynamic_log_evaluates_body() {
146 let svc = DynamicLog::new(LogLevel::Info, |ex: &Exchange| {
147 format!("body={}", ex.input.body.as_text().unwrap_or(""))
148 });
149 let exchange = Exchange::new(Message::new("hello"));
150 let result = svc.oneshot(exchange).await.unwrap();
151 assert_eq!(result.input.body.as_text(), Some("hello"));
153 }
154
155 #[tokio::test]
156 async fn test_dynamic_log_evaluates_header() {
157 let svc = DynamicLog::new(LogLevel::Info, |ex: &Exchange| {
158 let counter = ex
159 .input
160 .header("CamelTimerCounter")
161 .and_then(|v| v.as_i64())
162 .unwrap_or(0);
163 format!("{} World", counter)
164 });
165 let mut msg = Message::new("");
166 msg.set_header("CamelTimerCounter", Value::Number(42.into()));
167 let exchange = Exchange::new(msg);
168 let result = svc.oneshot(exchange).await.unwrap();
169 assert_eq!(
171 result.input.header("CamelTimerCounter"),
172 Some(&Value::Number(42.into()))
173 );
174 }
175}