Skip to main content

camel_processor/
dynamic_set_header.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::{CamelError, Exchange, Value};
8
9/// A processor that sets a header using an expression closure.
10/// The closure receives the full exchange (read-only) and returns a Value.
11#[derive(Clone)]
12pub struct DynamicSetHeader<P, F> {
13    inner: P,
14    key: String,
15    expr: F,
16}
17
18impl<P, F> DynamicSetHeader<P, F>
19where
20    F: Fn(&Exchange) -> Value,
21{
22    pub fn new(inner: P, key: impl Into<String>, expr: F) -> Self {
23        Self {
24            inner,
25            key: key.into(),
26            expr,
27        }
28    }
29}
30
31/// A Tower Layer that wraps an inner service with a [`DynamicSetHeader`].
32#[derive(Clone)]
33pub struct DynamicSetHeaderLayer<F> {
34    key: String,
35    expr: F,
36}
37
38impl<F> DynamicSetHeaderLayer<F> {
39    pub fn new(key: impl Into<String>, expr: F) -> Self {
40        Self {
41            key: key.into(),
42            expr,
43        }
44    }
45}
46
47impl<S, F> tower::Layer<S> for DynamicSetHeaderLayer<F>
48where
49    F: Clone,
50{
51    type Service = DynamicSetHeader<S, F>;
52
53    fn layer(&self, inner: S) -> Self::Service {
54        DynamicSetHeader {
55            inner,
56            key: self.key.clone(),
57            expr: self.expr.clone(),
58        }
59    }
60}
61
62impl<P, F> Service<Exchange> for DynamicSetHeader<P, F>
63where
64    P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
65    P::Future: Send,
66    F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
67{
68    type Response = Exchange;
69    type Error = CamelError;
70    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
71
72    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
73        self.inner.poll_ready(cx)
74    }
75
76    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
77        let value = (self.expr)(&exchange);
78        exchange.input.headers.insert(self.key.clone(), value);
79        let fut = self.inner.call(exchange);
80        Box::pin(fut)
81    }
82}
83
84#[cfg(test)]
85mod tests {
86    use camel_api::{Exchange, IdentityProcessor, Message, Value};
87    use tower::ServiceExt;
88
89    use super::*;
90
91    #[tokio::test]
92    async fn test_dynamic_set_header_from_body() {
93        let exchange = Exchange::new(Message::new("world"));
94
95        let svc = DynamicSetHeader::new(IdentityProcessor, "greeting", |ex: &Exchange| {
96            Value::String(format!("hello {}", ex.input.body.as_text().unwrap_or("")))
97        });
98
99        let result = svc.oneshot(exchange).await.unwrap();
100        assert_eq!(
101            result.input.header("greeting"),
102            Some(&Value::String("hello world".into()))
103        );
104    }
105
106    #[tokio::test]
107    async fn test_dynamic_set_header_overwrites_existing() {
108        let mut msg = Message::new("new");
109        msg.set_header("key", Value::String("old".into()));
110        let exchange = Exchange::new(msg);
111
112        let svc = DynamicSetHeader::new(IdentityProcessor, "key", |ex: &Exchange| {
113            Value::String(ex.input.body.as_text().unwrap_or("").into())
114        });
115
116        let result = svc.oneshot(exchange).await.unwrap();
117        assert_eq!(
118            result.input.header("key"),
119            Some(&Value::String("new".into()))
120        );
121    }
122
123    #[tokio::test]
124    async fn test_dynamic_set_header_preserves_body() {
125        let exchange = Exchange::new(Message::new("body content"));
126
127        let svc = DynamicSetHeader::new(IdentityProcessor, "len", |ex: &Exchange| {
128            let len = ex.input.body.as_text().map(|t| t.len() as i64).unwrap_or(0);
129            Value::Number(len.into())
130        });
131
132        let result = svc.oneshot(exchange).await.unwrap();
133        assert_eq!(result.input.body.as_text(), Some("body content"));
134        assert_eq!(result.input.header("len"), Some(&Value::Number(12.into())));
135    }
136
137    #[tokio::test]
138    async fn test_dynamic_set_header_layer_composes() {
139        use tower::ServiceBuilder;
140
141        let svc = ServiceBuilder::new()
142            .layer(DynamicSetHeaderLayer::new("computed", |_ex: &Exchange| {
143                Value::Bool(true)
144            }))
145            .service(IdentityProcessor);
146
147        let exchange = Exchange::new(Message::default());
148        let result = svc.oneshot(exchange).await.unwrap();
149        assert_eq!(result.input.header("computed"), Some(&Value::Bool(true)));
150    }
151}