camel_processor/
dynamic_set_header.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::{CamelError, Exchange, Value};
8
9#[derive(Clone)]
12pub struct DynamicSetHeader<P, F> {
13 inner: P,
14 key: String,
15 expr: F,
16}
17
18impl<P, F> DynamicSetHeader<P, F>
19where
20 F: Fn(&Exchange) -> Value,
21{
22 pub fn new(inner: P, key: impl Into<String>, expr: F) -> Self {
23 Self {
24 inner,
25 key: key.into(),
26 expr,
27 }
28 }
29}
30
31#[derive(Clone)]
33pub struct DynamicSetHeaderLayer<F> {
34 key: String,
35 expr: F,
36}
37
38impl<F> DynamicSetHeaderLayer<F> {
39 pub fn new(key: impl Into<String>, expr: F) -> Self {
40 Self {
41 key: key.into(),
42 expr,
43 }
44 }
45}
46
47impl<S, F> tower::Layer<S> for DynamicSetHeaderLayer<F>
48where
49 F: Clone,
50{
51 type Service = DynamicSetHeader<S, F>;
52
53 fn layer(&self, inner: S) -> Self::Service {
54 DynamicSetHeader {
55 inner,
56 key: self.key.clone(),
57 expr: self.expr.clone(),
58 }
59 }
60}
61
62impl<P, F> Service<Exchange> for DynamicSetHeader<P, F>
63where
64 P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
65 P::Future: Send,
66 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
67{
68 type Response = Exchange;
69 type Error = CamelError;
70 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
71
72 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
73 self.inner.poll_ready(cx)
74 }
75
76 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
77 let value = (self.expr)(&exchange);
78 exchange.input.headers.insert(self.key.clone(), value);
79 let fut = self.inner.call(exchange);
80 Box::pin(fut)
81 }
82}
83
84#[cfg(test)]
85mod tests {
86 use camel_api::{Exchange, IdentityProcessor, Message, Value};
87 use tower::ServiceExt;
88
89 use super::*;
90
91 #[tokio::test]
92 async fn test_dynamic_set_header_from_body() {
93 let exchange = Exchange::new(Message::new("world"));
94
95 let svc = DynamicSetHeader::new(IdentityProcessor, "greeting", |ex: &Exchange| {
96 Value::String(format!("hello {}", ex.input.body.as_text().unwrap_or("")))
97 });
98
99 let result = svc.oneshot(exchange).await.unwrap();
100 assert_eq!(
101 result.input.header("greeting"),
102 Some(&Value::String("hello world".into()))
103 );
104 }
105
106 #[tokio::test]
107 async fn test_dynamic_set_header_overwrites_existing() {
108 let mut msg = Message::new("new");
109 msg.set_header("key", Value::String("old".into()));
110 let exchange = Exchange::new(msg);
111
112 let svc = DynamicSetHeader::new(IdentityProcessor, "key", |ex: &Exchange| {
113 Value::String(ex.input.body.as_text().unwrap_or("").into())
114 });
115
116 let result = svc.oneshot(exchange).await.unwrap();
117 assert_eq!(
118 result.input.header("key"),
119 Some(&Value::String("new".into()))
120 );
121 }
122
123 #[tokio::test]
124 async fn test_dynamic_set_header_preserves_body() {
125 let exchange = Exchange::new(Message::new("body content"));
126
127 let svc = DynamicSetHeader::new(IdentityProcessor, "len", |ex: &Exchange| {
128 let len = ex.input.body.as_text().map(|t| t.len() as i64).unwrap_or(0);
129 Value::Number(len.into())
130 });
131
132 let result = svc.oneshot(exchange).await.unwrap();
133 assert_eq!(result.input.body.as_text(), Some("body content"));
134 assert_eq!(result.input.header("len"), Some(&Value::Number(12.into())));
135 }
136
137 #[tokio::test]
138 async fn test_dynamic_set_header_layer_composes() {
139 use tower::ServiceBuilder;
140
141 let svc = ServiceBuilder::new()
142 .layer(DynamicSetHeaderLayer::new("computed", |_ex: &Exchange| {
143 Value::Bool(true)
144 }))
145 .service(IdentityProcessor);
146
147 let exchange = Exchange::new(Message::default());
148 let result = svc.oneshot(exchange).await.unwrap();
149 assert_eq!(result.input.header("computed"), Some(&Value::Bool(true)));
150 }
151}