Skip to main content

camel_processor/
log.rs

1use camel_api::{CamelError, Exchange, IdentityProcessor};
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tower::Service;
6use tracing::{debug, error, info, trace, warn};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum LogLevel {
10    Trace,
11    Debug,
12    Info,
13    Warn,
14    Error,
15}
16
17#[derive(Clone)]
18pub struct LogProcessor {
19    inner: IdentityProcessor,
20    level: LogLevel,
21    message: String,
22}
23
24// TODO(PROC-004): Add metrics instrumentation — processed count, error count, latency histograms
25// are not yet instrumented on processors. Consider wiring MetricsCollector into LogProcessor and
26// incrementing a counter on each call, recording elapsed time, and tracking errors.
27
28impl LogProcessor {
29    pub fn new(level: LogLevel, message: String) -> Self {
30        Self {
31            inner: IdentityProcessor,
32            level,
33            message,
34        }
35    }
36}
37
38impl Service<Exchange> for LogProcessor {
39    type Response = Exchange;
40    type Error = CamelError;
41    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
42
43    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
44        self.inner.poll_ready(cx)
45    }
46
47    fn call(&mut self, exchange: Exchange) -> Self::Future {
48        let msg = self.message.clone();
49        let exchange_id = exchange.correlation_id.clone();
50        let body_preview = exchange
51            .input
52            .body
53            .as_text()
54            .unwrap_or("")
55            .chars()
56            .take(64)
57            .collect::<String>();
58        debug!(exchange_id = %exchange_id, body_preview = %body_preview, "LogProcessor processing exchange");
59        match self.level {
60            LogLevel::Trace => trace!(exchange_id = %exchange_id, "{}", msg),
61            LogLevel::Debug => debug!(exchange_id = %exchange_id, "{}", msg),
62            LogLevel::Info => info!(exchange_id = %exchange_id, "{}", msg),
63            LogLevel::Warn => warn!(exchange_id = %exchange_id, "{}", msg),
64            LogLevel::Error => error!(exchange_id = %exchange_id, "{}", msg),
65        }
66        self.inner.call(exchange)
67    }
68}
69
70/// A log processor that evaluates a message expression against the Exchange at call-time.
71/// Analogous to [`DynamicSetHeader`](crate::dynamic_set_header::DynamicSetHeader).
72#[derive(Clone)]
73pub struct DynamicLog<F> {
74    inner: IdentityProcessor,
75    level: LogLevel,
76    expr: F,
77}
78
79impl<F> DynamicLog<F>
80where
81    F: Fn(&Exchange) -> String + Clone + Send + Sync + 'static,
82{
83    pub fn new(level: LogLevel, expr: F) -> Self {
84        Self {
85            inner: IdentityProcessor,
86            level,
87            expr,
88        }
89    }
90}
91
92impl<F> Service<Exchange> for DynamicLog<F>
93where
94    F: Fn(&Exchange) -> String + Clone + Send + Sync + 'static,
95{
96    type Response = Exchange;
97    type Error = CamelError;
98    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
99
100    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
101        self.inner.poll_ready(cx)
102    }
103
104    fn call(&mut self, exchange: Exchange) -> Self::Future {
105        let exchange_id = exchange.correlation_id.clone();
106        let msg = (self.expr)(&exchange);
107        match self.level {
108            LogLevel::Trace => trace!(exchange_id = %exchange_id, "{}", msg),
109            LogLevel::Debug => debug!(exchange_id = %exchange_id, "{}", msg),
110            LogLevel::Info => info!(exchange_id = %exchange_id, "{}", msg),
111            LogLevel::Warn => warn!(exchange_id = %exchange_id, "{}", msg),
112            LogLevel::Error => error!(exchange_id = %exchange_id, "{}", msg),
113        }
114        self.inner.call(exchange)
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121    use camel_api::body::Body;
122    use camel_api::{Message, Value};
123    use tower::ServiceExt;
124
125    #[tokio::test]
126    async fn test_log_processor_passes_exchange_through() {
127        let mut processor = LogProcessor::new(LogLevel::Info, "test message".into());
128        let exchange = Exchange::default();
129        let result = processor.call(exchange).await;
130        assert!(result.is_ok());
131    }
132
133    #[tokio::test]
134    async fn test_log_processor_preserves_exchange_body() {
135        let mut processor = LogProcessor::new(LogLevel::Debug, "debug message".into());
136        let mut exchange = Exchange::default();
137        exchange.input.body = Body::Text("test body".into());
138        let result = processor.call(exchange).await.unwrap();
139        assert_eq!(result.input.body.as_text(), Some("test body"));
140    }
141
142    #[tokio::test]
143    async fn test_dynamic_log_evaluates_body() {
144        let svc = DynamicLog::new(LogLevel::Info, |ex: &Exchange| {
145            format!("body={}", ex.input.body.as_text().unwrap_or(""))
146        });
147        let exchange = Exchange::new(Message::new("hello"));
148        let result = svc.oneshot(exchange).await.unwrap();
149        // Exchange passes through unchanged
150        assert_eq!(result.input.body.as_text(), Some("hello"));
151    }
152
153    #[tokio::test]
154    async fn test_dynamic_log_evaluates_header() {
155        let svc = DynamicLog::new(LogLevel::Info, |ex: &Exchange| {
156            let counter = ex
157                .input
158                .header("CamelTimerCounter")
159                .and_then(|v| v.as_i64())
160                .unwrap_or(0);
161            format!("{} World", counter)
162        });
163        let mut msg = Message::new("");
164        msg.set_header("CamelTimerCounter", Value::Number(42.into()));
165        let exchange = Exchange::new(msg);
166        let result = svc.oneshot(exchange).await.unwrap();
167        // Exchange passes through unchanged
168        assert_eq!(
169            result.input.header("CamelTimerCounter"),
170            Some(&Value::Number(42.into()))
171        );
172    }
173}