1use camel_api::{CamelError, Exchange, IdentityProcessor};
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tower::Service;
6use tracing::{debug, error, info, trace, warn};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum LogLevel {
10 Trace,
11 Debug,
12 Info,
13 Warn,
14 Error,
15}
16
17#[derive(Clone)]
18pub struct LogProcessor {
19 inner: IdentityProcessor,
20 level: LogLevel,
21 message: String,
22}
23
24impl LogProcessor {
25 pub fn new(level: LogLevel, message: String) -> Self {
26 Self {
27 inner: IdentityProcessor,
28 level,
29 message,
30 }
31 }
32}
33
34impl Service<Exchange> for LogProcessor {
35 type Response = Exchange;
36 type Error = CamelError;
37 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
38
39 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
40 self.inner.poll_ready(cx)
41 }
42
43 fn call(&mut self, exchange: Exchange) -> Self::Future {
44 let msg = self.message.clone();
45 match self.level {
46 LogLevel::Trace => trace!("{}", msg),
47 LogLevel::Debug => debug!("{}", msg),
48 LogLevel::Info => info!("{}", msg),
49 LogLevel::Warn => warn!("{}", msg),
50 LogLevel::Error => error!("{}", msg),
51 }
52 self.inner.call(exchange)
53 }
54}
55
56#[derive(Clone)]
59pub struct DynamicLog<F> {
60 inner: IdentityProcessor,
61 level: LogLevel,
62 expr: F,
63}
64
65impl<F> DynamicLog<F>
66where
67 F: Fn(&Exchange) -> String + Clone + Send + Sync + 'static,
68{
69 pub fn new(level: LogLevel, expr: F) -> Self {
70 Self {
71 inner: IdentityProcessor,
72 level,
73 expr,
74 }
75 }
76}
77
78impl<F> Service<Exchange> for DynamicLog<F>
79where
80 F: Fn(&Exchange) -> String + Clone + Send + Sync + 'static,
81{
82 type Response = Exchange;
83 type Error = CamelError;
84 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
85
86 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87 self.inner.poll_ready(cx)
88 }
89
90 fn call(&mut self, exchange: Exchange) -> Self::Future {
91 let msg = (self.expr)(&exchange);
92 match self.level {
93 LogLevel::Trace => trace!("{}", msg),
94 LogLevel::Debug => debug!("{}", msg),
95 LogLevel::Info => info!("{}", msg),
96 LogLevel::Warn => warn!("{}", msg),
97 LogLevel::Error => error!("{}", msg),
98 }
99 self.inner.call(exchange)
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106 use camel_api::body::Body;
107 use camel_api::{Message, Value};
108 use tower::ServiceExt;
109
110 #[tokio::test]
111 async fn test_log_processor_passes_exchange_through() {
112 let mut processor = LogProcessor::new(LogLevel::Info, "test message".into());
113 let exchange = Exchange::default();
114 let result = processor.call(exchange).await;
115 assert!(result.is_ok());
116 }
117
118 #[tokio::test]
119 async fn test_log_processor_preserves_exchange_body() {
120 let mut processor = LogProcessor::new(LogLevel::Debug, "debug message".into());
121 let mut exchange = Exchange::default();
122 exchange.input.body = Body::Text("test body".into());
123 let result = processor.call(exchange).await.unwrap();
124 assert_eq!(result.input.body.as_text(), Some("test body"));
125 }
126
127 #[tokio::test]
128 async fn test_dynamic_log_evaluates_body() {
129 let svc = DynamicLog::new(LogLevel::Info, |ex: &Exchange| {
130 format!("body={}", ex.input.body.as_text().unwrap_or(""))
131 });
132 let exchange = Exchange::new(Message::new("hello"));
133 let result = svc.oneshot(exchange).await.unwrap();
134 assert_eq!(result.input.body.as_text(), Some("hello"));
136 }
137
138 #[tokio::test]
139 async fn test_dynamic_log_evaluates_header() {
140 let svc = DynamicLog::new(LogLevel::Info, |ex: &Exchange| {
141 let counter = ex
142 .input
143 .header("CamelTimerCounter")
144 .and_then(|v| v.as_i64())
145 .unwrap_or(0);
146 format!("{} World", counter)
147 });
148 let mut msg = Message::new("");
149 msg.set_header("CamelTimerCounter", Value::Number(42.into()));
150 let exchange = Exchange::new(msg);
151 let result = svc.oneshot(exchange).await.unwrap();
152 assert_eq!(
154 result.input.header("CamelTimerCounter"),
155 Some(&Value::Number(42.into()))
156 );
157 }
158}