Skip to main content

camel_processor/
set_header.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::{CamelError, Exchange, Value};
8
9/// A processor that sets a header on the exchange's input message.
10#[derive(Clone)]
11pub struct SetHeader<P> {
12    inner: P,
13    key: String,
14    value: Value,
15}
16
17impl<P> SetHeader<P> {
18    /// Create a new SetHeader processor that adds the given header.
19    pub fn new(inner: P, key: impl Into<String>, value: impl Into<Value>) -> Self {
20        Self {
21            inner,
22            key: key.into(),
23            value: value.into(),
24        }
25    }
26}
27
28/// A Tower Layer that wraps an inner service with a [`SetHeader`].
29#[derive(Clone)]
30pub struct SetHeaderLayer {
31    key: String,
32    value: Value,
33}
34
35impl SetHeaderLayer {
36    pub fn new(key: impl Into<String>, value: impl Into<Value>) -> Self {
37        Self {
38            key: key.into(),
39            value: value.into(),
40        }
41    }
42}
43
44impl<S> tower::Layer<S> for SetHeaderLayer {
45    type Service = SetHeader<S>;
46
47    fn layer(&self, inner: S) -> Self::Service {
48        SetHeader::new(inner, self.key.clone(), self.value.clone())
49    }
50}
51
52impl<P> Service<Exchange> for SetHeader<P>
53where
54    P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
55    P::Future: Send,
56{
57    type Response = Exchange;
58    type Error = CamelError;
59    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
60
61    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
62        self.inner.poll_ready(cx)
63    }
64
65    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
66        exchange
67            .input
68            .headers
69            .insert(self.key.clone(), self.value.clone());
70        let fut = self.inner.call(exchange);
71        Box::pin(fut)
72    }
73}
74
75#[cfg(test)]
76mod tests {
77    use super::*;
78    use camel_api::{IdentityProcessor, Message};
79    use tower::ServiceExt;
80
81    #[tokio::test]
82    async fn test_set_header_adds_header() {
83        let exchange = Exchange::new(Message::default());
84
85        let processor = SetHeader::new(IdentityProcessor, "source", Value::String("timer".into()));
86
87        let result = processor.oneshot(exchange).await.unwrap();
88        assert_eq!(
89            result.input.header("source"),
90            Some(&Value::String("timer".into()))
91        );
92    }
93
94    #[tokio::test]
95    async fn test_set_header_overwrites_existing() {
96        let mut exchange = Exchange::new(Message::default());
97        exchange
98            .input
99            .set_header("key", Value::String("old".into()));
100
101        let processor = SetHeader::new(IdentityProcessor, "key", Value::String("new".into()));
102
103        let result = processor.oneshot(exchange).await.unwrap();
104        assert_eq!(
105            result.input.header("key"),
106            Some(&Value::String("new".into()))
107        );
108    }
109
110    #[tokio::test]
111    async fn test_set_header_preserves_body() {
112        let exchange = Exchange::new(Message::new("body content"));
113
114        let processor = SetHeader::new(IdentityProcessor, "header", Value::Bool(true));
115
116        let result = processor.oneshot(exchange).await.unwrap();
117        assert_eq!(result.input.body.as_text(), Some("body content"));
118        assert_eq!(result.input.header("header"), Some(&Value::Bool(true)));
119    }
120
121    #[tokio::test]
122    async fn test_set_header_layer_composes() {
123        use tower::ServiceBuilder;
124
125        let svc = ServiceBuilder::new()
126            .layer(super::SetHeaderLayer::new(
127                "env",
128                Value::String("test".into()),
129            ))
130            .service(IdentityProcessor);
131
132        let exchange = Exchange::new(Message::default());
133        let result = svc.oneshot(exchange).await.unwrap();
134        assert_eq!(
135            result.input.header("env"),
136            Some(&Value::String("test".into()))
137        );
138    }
139}