Skip to main content

camel_processor/
log.rs

1use camel_api::{CamelError, Exchange, IdentityProcessor};
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tower::Service;
6use tracing::{debug, error, info, trace, warn};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum LogLevel {
10    Trace,
11    Debug,
12    Info,
13    Warn,
14    Error,
15}
16
17#[derive(Clone)]
18pub struct LogProcessor {
19    inner: IdentityProcessor,
20    level: LogLevel,
21    message: String,
22}
23
24impl LogProcessor {
25    pub fn new(level: LogLevel, message: String) -> Self {
26        Self {
27            inner: IdentityProcessor,
28            level,
29            message,
30        }
31    }
32}
33
34impl Service<Exchange> for LogProcessor {
35    type Response = Exchange;
36    type Error = CamelError;
37    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
38
39    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
40        self.inner.poll_ready(cx)
41    }
42
43    fn call(&mut self, exchange: Exchange) -> Self::Future {
44        let msg = self.message.clone();
45        match self.level {
46            LogLevel::Trace => trace!("{}", msg),
47            LogLevel::Debug => debug!("{}", msg),
48            LogLevel::Info => info!("{}", msg),
49            LogLevel::Warn => warn!("{}", msg),
50            LogLevel::Error => error!("{}", msg),
51        }
52        self.inner.call(exchange)
53    }
54}
55
56#[cfg(test)]
57mod tests {
58    use super::*;
59    use camel_api::body::Body;
60
61    #[tokio::test]
62    async fn test_log_processor_passes_exchange_through() {
63        let mut processor = LogProcessor::new(LogLevel::Info, "test message".into());
64        let exchange = Exchange::default();
65        let result = processor.call(exchange).await;
66        assert!(result.is_ok());
67    }
68
69    #[tokio::test]
70    async fn test_log_processor_preserves_exchange_body() {
71        let mut processor = LogProcessor::new(LogLevel::Debug, "debug message".into());
72        let mut exchange = Exchange::default();
73        exchange.input.body = Body::Text("test body".into());
74        let result = processor.call(exchange).await.unwrap();
75        assert_eq!(result.input.body.as_text(), Some("test body"));
76    }
77}