Skip to main content

camel_processor/
log.rs

1use camel_api::{CamelError, Exchange, IdentityProcessor};
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tower::Service;
6use tracing::{debug, error, info, trace, warn};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum LogLevel {
10    Trace,
11    Debug,
12    Info,
13    Warn,
14    Error,
15}
16
17#[derive(Clone)]
18pub struct LogProcessor {
19    inner: IdentityProcessor,
20    level: LogLevel,
21    message: String,
22}
23
24impl LogProcessor {
25    pub fn new(level: LogLevel, message: String) -> Self {
26        Self {
27            inner: IdentityProcessor,
28            level,
29            message,
30        }
31    }
32}
33
34impl Service<Exchange> for LogProcessor {
35    type Response = Exchange;
36    type Error = CamelError;
37    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
38
39    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
40        self.inner.poll_ready(cx)
41    }
42
43    fn call(&mut self, exchange: Exchange) -> Self::Future {
44        let msg = self.message.clone();
45        match self.level {
46            LogLevel::Trace => trace!("{}", msg),
47            LogLevel::Debug => debug!("{}", msg),
48            LogLevel::Info => info!("{}", msg),
49            LogLevel::Warn => warn!("{}", msg),
50            LogLevel::Error => error!("{}", msg),
51        }
52        self.inner.call(exchange)
53    }
54}
55
56/// A log processor that evaluates a message expression against the Exchange at call-time.
57/// Analogous to [`DynamicSetHeader`](crate::dynamic_set_header::DynamicSetHeader).
58#[derive(Clone)]
59pub struct DynamicLog<F> {
60    inner: IdentityProcessor,
61    level: LogLevel,
62    expr: F,
63}
64
65impl<F> DynamicLog<F>
66where
67    F: Fn(&Exchange) -> String + Clone + Send + Sync + 'static,
68{
69    pub fn new(level: LogLevel, expr: F) -> Self {
70        Self {
71            inner: IdentityProcessor,
72            level,
73            expr,
74        }
75    }
76}
77
78impl<F> Service<Exchange> for DynamicLog<F>
79where
80    F: Fn(&Exchange) -> String + Clone + Send + Sync + 'static,
81{
82    type Response = Exchange;
83    type Error = CamelError;
84    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
85
86    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87        self.inner.poll_ready(cx)
88    }
89
90    fn call(&mut self, exchange: Exchange) -> Self::Future {
91        let msg = (self.expr)(&exchange);
92        match self.level {
93            LogLevel::Trace => trace!("{}", msg),
94            LogLevel::Debug => debug!("{}", msg),
95            LogLevel::Info => info!("{}", msg),
96            LogLevel::Warn => warn!("{}", msg),
97            LogLevel::Error => error!("{}", msg),
98        }
99        self.inner.call(exchange)
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106    use camel_api::body::Body;
107    use camel_api::{Message, Value};
108    use tower::ServiceExt;
109
110    #[tokio::test]
111    async fn test_log_processor_passes_exchange_through() {
112        let mut processor = LogProcessor::new(LogLevel::Info, "test message".into());
113        let exchange = Exchange::default();
114        let result = processor.call(exchange).await;
115        assert!(result.is_ok());
116    }
117
118    #[tokio::test]
119    async fn test_log_processor_preserves_exchange_body() {
120        let mut processor = LogProcessor::new(LogLevel::Debug, "debug message".into());
121        let mut exchange = Exchange::default();
122        exchange.input.body = Body::Text("test body".into());
123        let result = processor.call(exchange).await.unwrap();
124        assert_eq!(result.input.body.as_text(), Some("test body"));
125    }
126
127    #[tokio::test]
128    async fn test_dynamic_log_evaluates_body() {
129        let svc = DynamicLog::new(LogLevel::Info, |ex: &Exchange| {
130            format!("body={}", ex.input.body.as_text().unwrap_or(""))
131        });
132        let exchange = Exchange::new(Message::new("hello"));
133        let result = svc.oneshot(exchange).await.unwrap();
134        // Exchange passes through unchanged
135        assert_eq!(result.input.body.as_text(), Some("hello"));
136    }
137
138    #[tokio::test]
139    async fn test_dynamic_log_evaluates_header() {
140        let svc = DynamicLog::new(LogLevel::Info, |ex: &Exchange| {
141            let counter = ex
142                .input
143                .header("CamelTimerCounter")
144                .and_then(|v| v.as_i64())
145                .unwrap_or(0);
146            format!("{} World", counter)
147        });
148        let mut msg = Message::new("");
149        msg.set_header("CamelTimerCounter", Value::Number(42.into()));
150        let exchange = Exchange::new(msg);
151        let result = svc.oneshot(exchange).await.unwrap();
152        // Exchange passes through unchanged
153        assert_eq!(
154            result.input.header("CamelTimerCounter"),
155            Some(&Value::Number(42.into()))
156        );
157    }
158}