camel_processor/
set_header.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::{CamelError, Exchange, Value};
8
9#[derive(Clone)]
11pub struct SetHeader<P> {
12 inner: P,
13 key: String,
14 value: Value,
15}
16
17impl<P> SetHeader<P> {
18 pub fn new(inner: P, key: impl Into<String>, value: impl Into<Value>) -> Self {
20 Self {
21 inner,
22 key: key.into(),
23 value: value.into(),
24 }
25 }
26}
27
28#[derive(Clone)]
30pub struct SetHeaderLayer {
31 key: String,
32 value: Value,
33}
34
35impl SetHeaderLayer {
36 pub fn new(key: impl Into<String>, value: impl Into<Value>) -> Self {
37 Self {
38 key: key.into(),
39 value: value.into(),
40 }
41 }
42}
43
44impl<S> tower::Layer<S> for SetHeaderLayer {
45 type Service = SetHeader<S>;
46
47 fn layer(&self, inner: S) -> Self::Service {
48 SetHeader::new(inner, self.key.clone(), self.value.clone())
49 }
50}
51
52impl<P> Service<Exchange> for SetHeader<P>
53where
54 P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
55 P::Future: Send,
56{
57 type Response = Exchange;
58 type Error = CamelError;
59 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
60
61 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
62 self.inner.poll_ready(cx)
63 }
64
65 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
66 exchange
67 .input
68 .headers
69 .insert(self.key.clone(), self.value.clone());
70 let fut = self.inner.call(exchange);
71 Box::pin(fut)
72 }
73}
74
75#[cfg(test)]
76mod tests {
77 use super::*;
78 use camel_api::{IdentityProcessor, Message};
79 use tower::ServiceExt;
80
81 #[tokio::test]
82 async fn test_set_header_adds_header() {
83 let exchange = Exchange::new(Message::default());
84
85 let processor = SetHeader::new(IdentityProcessor, "source", Value::String("timer".into()));
86
87 let result = processor.oneshot(exchange).await.unwrap();
88 assert_eq!(
89 result.input.header("source"),
90 Some(&Value::String("timer".into()))
91 );
92 }
93
94 #[tokio::test]
95 async fn test_set_header_overwrites_existing() {
96 let mut exchange = Exchange::new(Message::default());
97 exchange
98 .input
99 .set_header("key", Value::String("old".into()));
100
101 let processor = SetHeader::new(IdentityProcessor, "key", Value::String("new".into()));
102
103 let result = processor.oneshot(exchange).await.unwrap();
104 assert_eq!(
105 result.input.header("key"),
106 Some(&Value::String("new".into()))
107 );
108 }
109
110 #[tokio::test]
111 async fn test_set_header_preserves_body() {
112 let exchange = Exchange::new(Message::new("body content"));
113
114 let processor = SetHeader::new(IdentityProcessor, "header", Value::Bool(true));
115
116 let result = processor.oneshot(exchange).await.unwrap();
117 assert_eq!(result.input.body.as_text(), Some("body content"));
118 assert_eq!(result.input.header("header"), Some(&Value::Bool(true)));
119 }
120
121 #[tokio::test]
122 async fn test_set_header_layer_composes() {
123 use tower::ServiceBuilder;
124
125 let svc = ServiceBuilder::new()
126 .layer(super::SetHeaderLayer::new(
127 "env",
128 Value::String("test".into()),
129 ))
130 .service(IdentityProcessor);
131
132 let exchange = Exchange::new(Message::default());
133 let result = svc.oneshot(exchange).await.unwrap();
134 assert_eq!(
135 result.input.header("env"),
136 Some(&Value::String("test".into()))
137 );
138 }
139}