camel_processor/
set_body.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::body::Body;
8use camel_api::{CamelError, Exchange};
9
10#[derive(Clone)]
13pub struct SetBody<P, F> {
14 inner: P,
15 expr: F,
16}
17
18impl<P, F> SetBody<P, F>
19where
20 F: Fn(&Exchange) -> Body,
21{
22 pub fn new(inner: P, expr: F) -> Self {
23 Self { inner, expr }
24 }
25}
26
27#[derive(Clone)]
29pub struct SetBodyLayer<F> {
30 expr: F,
31}
32
33impl<F> SetBodyLayer<F> {
34 pub fn new(expr: F) -> Self {
35 Self { expr }
36 }
37}
38
39impl<S, F> tower::Layer<S> for SetBodyLayer<F>
40where
41 F: Clone,
42{
43 type Service = SetBody<S, F>;
44
45 fn layer(&self, inner: S) -> Self::Service {
46 SetBody {
47 inner,
48 expr: self.expr.clone(),
49 }
50 }
51}
52
53impl<P, F> Service<Exchange> for SetBody<P, F>
54where
55 P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
56 P::Future: Send,
57 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
58{
59 type Response = Exchange;
60 type Error = CamelError;
61 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
62
63 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
64 self.inner.poll_ready(cx)
65 }
66
67 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
68 exchange.input.body = (self.expr)(&exchange);
69 let fut = self.inner.call(exchange);
70 Box::pin(fut)
71 }
72}
73
74#[cfg(test)]
75mod tests {
76 use camel_api::body::Body;
77 use camel_api::{Exchange, IdentityProcessor, Message};
78 use tower::ServiceExt;
79
80 use super::*;
81
82 #[tokio::test]
83 async fn test_set_body_static_replaces_body() {
84 let exchange = Exchange::new(Message::new("original"));
85 let svc = SetBody::new(IdentityProcessor, |_ex: &Exchange| {
86 Body::Text("replaced".into())
87 });
88 let result = svc.oneshot(exchange).await.unwrap();
89 assert_eq!(result.input.body.as_text(), Some("replaced"));
90 }
91
92 #[tokio::test]
93 async fn test_set_body_dynamic_reads_exchange() {
94 let mut msg = Message::new("hello");
95 msg.set_header("suffix", camel_api::Value::String("!".into()));
96 let exchange = Exchange::new(msg);
97
98 let svc = SetBody::new(IdentityProcessor, |ex: &Exchange| {
99 let base = ex.input.body.as_text().unwrap_or("");
100 let suffix = ex
101 .input
102 .header("suffix")
103 .and_then(|v| v.as_str())
104 .unwrap_or("");
105 Body::Text(format!("{}{}", base, suffix))
106 });
107
108 let result = svc.oneshot(exchange).await.unwrap();
109 assert_eq!(result.input.body.as_text(), Some("hello!"));
110 }
111
112 #[tokio::test]
113 async fn test_set_body_preserves_headers() {
114 let mut msg = Message::default();
115 msg.set_header("keep", camel_api::Value::Bool(true));
116 let exchange = Exchange::new(msg);
117
118 let svc = SetBody::new(IdentityProcessor, |_ex: &Exchange| Body::Text("new".into()));
119 let result = svc.oneshot(exchange).await.unwrap();
120 assert_eq!(
121 result.input.header("keep"),
122 Some(&camel_api::Value::Bool(true))
123 );
124 assert_eq!(result.input.body.as_text(), Some("new"));
125 }
126
127 #[tokio::test]
128 async fn test_set_body_layer_composes() {
129 use tower::ServiceBuilder;
130
131 let svc = ServiceBuilder::new()
132 .layer(SetBodyLayer::new(|_ex: &Exchange| {
133 Body::Text("layered".into())
134 }))
135 .service(IdentityProcessor);
136
137 let exchange = Exchange::new(Message::default());
138 let result = svc.oneshot(exchange).await.unwrap();
139 assert_eq!(result.input.body.as_text(), Some("layered"));
140 }
141}