Skip to main content

camel_processor/
log.rs

1use camel_api::{CamelError, Exchange, IdentityProcessor};
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tower::Service;
6use tracing::{debug, info, trace, warn};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum LogLevel {
10    Trace,
11    Debug,
12    Info,
13    Warn,
14    Error,
15}
16
17#[derive(Clone)]
18pub struct LogProcessor {
19    inner: IdentityProcessor,
20    level: LogLevel,
21    message: String,
22}
23
24// TODO(PROC-004): Add metrics instrumentation — processed count, error count, latency histograms
25// are not yet instrumented on processors. Consider wiring MetricsCollector into LogProcessor and
26// incrementing a counter on each call, recording elapsed time, and tracking errors.
27
28impl LogProcessor {
29    pub fn new(level: LogLevel, message: String) -> Self {
30        Self {
31            inner: IdentityProcessor,
32            level,
33            message,
34        }
35    }
36}
37
38impl Service<Exchange> for LogProcessor {
39    type Response = Exchange;
40    type Error = CamelError;
41    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
42
43    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
44        self.inner.poll_ready(cx)
45    }
46
47    fn call(&mut self, exchange: Exchange) -> Self::Future {
48        let msg = self.message.clone();
49        let exchange_id = exchange.correlation_id.clone();
50        let body_preview = exchange
51            .input
52            .body
53            .as_text()
54            .unwrap_or("")
55            .chars()
56            .take(64)
57            .collect::<String>();
58        debug!(exchange_id = %exchange_id, body_preview = %body_preview, "LogProcessor processing exchange");
59        match self.level {
60            LogLevel::Trace => trace!(exchange_id = %exchange_id, "{}", msg),
61            LogLevel::Debug => debug!(exchange_id = %exchange_id, "{}", msg),
62            LogLevel::Info => info!(exchange_id = %exchange_id, "{}", msg),
63            LogLevel::Warn => warn!(exchange_id = %exchange_id, "{}", msg),
64            // log-policy: handler-owned
65            LogLevel::Error => warn!(exchange_id = %exchange_id, "{}", msg),
66        }
67        self.inner.call(exchange)
68    }
69}
70
71/// A log processor that evaluates a message expression against the Exchange at call-time.
72/// Analogous to [`DynamicSetHeader`](crate::dynamic_set_header::DynamicSetHeader).
73#[derive(Clone)]
74pub struct DynamicLog<F> {
75    inner: IdentityProcessor,
76    level: LogLevel,
77    expr: F,
78}
79
80impl<F> DynamicLog<F>
81where
82    F: Fn(&Exchange) -> String + Clone + Send + Sync + 'static,
83{
84    pub fn new(level: LogLevel, expr: F) -> Self {
85        Self {
86            inner: IdentityProcessor,
87            level,
88            expr,
89        }
90    }
91}
92
93impl<F> Service<Exchange> for DynamicLog<F>
94where
95    F: Fn(&Exchange) -> String + Clone + Send + Sync + 'static,
96{
97    type Response = Exchange;
98    type Error = CamelError;
99    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
100
101    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
102        self.inner.poll_ready(cx)
103    }
104
105    fn call(&mut self, exchange: Exchange) -> Self::Future {
106        let exchange_id = exchange.correlation_id.clone();
107        let msg = (self.expr)(&exchange);
108        match self.level {
109            LogLevel::Trace => trace!(exchange_id = %exchange_id, "{}", msg),
110            LogLevel::Debug => debug!(exchange_id = %exchange_id, "{}", msg),
111            LogLevel::Info => info!(exchange_id = %exchange_id, "{}", msg),
112            LogLevel::Warn => warn!(exchange_id = %exchange_id, "{}", msg),
113            // log-policy: handler-owned
114            LogLevel::Error => warn!(exchange_id = %exchange_id, "{}", msg),
115        }
116        self.inner.call(exchange)
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123    use camel_api::body::Body;
124    use camel_api::{Message, Value};
125    use tower::ServiceExt;
126
127    #[tokio::test]
128    async fn test_log_processor_passes_exchange_through() {
129        let mut processor = LogProcessor::new(LogLevel::Info, "test message".into());
130        let exchange = Exchange::default();
131        let result = processor.call(exchange).await;
132        assert!(result.is_ok());
133    }
134
135    #[tokio::test]
136    async fn test_log_processor_preserves_exchange_body() {
137        let mut processor = LogProcessor::new(LogLevel::Debug, "debug message".into());
138        let mut exchange = Exchange::default();
139        exchange.input.body = Body::Text("test body".into());
140        let result = processor.call(exchange).await.unwrap();
141        assert_eq!(result.input.body.as_text(), Some("test body"));
142    }
143
144    #[tokio::test]
145    async fn test_dynamic_log_evaluates_body() {
146        let svc = DynamicLog::new(LogLevel::Info, |ex: &Exchange| {
147            format!("body={}", ex.input.body.as_text().unwrap_or(""))
148        });
149        let exchange = Exchange::new(Message::new("hello"));
150        let result = svc.oneshot(exchange).await.unwrap();
151        // Exchange passes through unchanged
152        assert_eq!(result.input.body.as_text(), Some("hello"));
153    }
154
155    #[tokio::test]
156    async fn test_dynamic_log_evaluates_header() {
157        let svc = DynamicLog::new(LogLevel::Info, |ex: &Exchange| {
158            let counter = ex
159                .input
160                .header("CamelTimerCounter")
161                .and_then(|v| v.as_i64())
162                .unwrap_or(0);
163            format!("{} World", counter)
164        });
165        let mut msg = Message::new("");
166        msg.set_header("CamelTimerCounter", Value::Number(42.into()));
167        let exchange = Exchange::new(msg);
168        let result = svc.oneshot(exchange).await.unwrap();
169        // Exchange passes through unchanged
170        assert_eq!(
171            result.input.header("CamelTimerCounter"),
172            Some(&Value::Number(42.into()))
173        );
174    }
175}