Skip to main content

camel_processor/
set_body.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::body::Body;
8use camel_api::{CamelError, Exchange};
9
10/// A processor that sets the message body using an expression closure.
11/// The closure receives the full exchange (read-only) and returns a new Body.
12#[derive(Clone)]
13pub struct SetBody<P, F> {
14    inner: P,
15    expr: F,
16}
17
18impl<P, F> SetBody<P, F>
19where
20    F: Fn(&Exchange) -> Body,
21{
22    pub fn new(inner: P, expr: F) -> Self {
23        Self { inner, expr }
24    }
25}
26
27/// A Tower Layer that wraps an inner service with a [`SetBody`].
28#[derive(Clone)]
29pub struct SetBodyLayer<F> {
30    expr: F,
31}
32
33impl<F> SetBodyLayer<F> {
34    pub fn new(expr: F) -> Self {
35        Self { expr }
36    }
37}
38
39impl<S, F> tower::Layer<S> for SetBodyLayer<F>
40where
41    F: Clone,
42{
43    type Service = SetBody<S, F>;
44
45    fn layer(&self, inner: S) -> Self::Service {
46        SetBody {
47            inner,
48            expr: self.expr.clone(),
49        }
50    }
51}
52
53impl<P, F> Service<Exchange> for SetBody<P, F>
54where
55    P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
56    P::Future: Send,
57    F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
58{
59    type Response = Exchange;
60    type Error = CamelError;
61    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
62
63    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
64        self.inner.poll_ready(cx)
65    }
66
67    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
68        exchange.input.body = (self.expr)(&exchange);
69        let fut = self.inner.call(exchange);
70        Box::pin(fut)
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use camel_api::body::Body;
77    use camel_api::{Exchange, IdentityProcessor, Message};
78    use tower::ServiceExt;
79
80    use super::*;
81
82    #[tokio::test]
83    async fn test_set_body_static_replaces_body() {
84        let exchange = Exchange::new(Message::new("original"));
85        let svc = SetBody::new(IdentityProcessor, |_ex: &Exchange| {
86            Body::Text("replaced".into())
87        });
88        let result = svc.oneshot(exchange).await.unwrap();
89        assert_eq!(result.input.body.as_text(), Some("replaced"));
90    }
91
92    #[tokio::test]
93    async fn test_set_body_dynamic_reads_exchange() {
94        let mut msg = Message::new("hello");
95        msg.set_header("suffix", camel_api::Value::String("!".into()));
96        let exchange = Exchange::new(msg);
97
98        let svc = SetBody::new(IdentityProcessor, |ex: &Exchange| {
99            let base = ex.input.body.as_text().unwrap_or("");
100            let suffix = ex
101                .input
102                .header("suffix")
103                .and_then(|v| v.as_str())
104                .unwrap_or("");
105            Body::Text(format!("{}{}", base, suffix))
106        });
107
108        let result = svc.oneshot(exchange).await.unwrap();
109        assert_eq!(result.input.body.as_text(), Some("hello!"));
110    }
111
112    #[tokio::test]
113    async fn test_set_body_preserves_headers() {
114        let mut msg = Message::default();
115        msg.set_header("keep", camel_api::Value::Bool(true));
116        let exchange = Exchange::new(msg);
117
118        let svc = SetBody::new(IdentityProcessor, |_ex: &Exchange| Body::Text("new".into()));
119        let result = svc.oneshot(exchange).await.unwrap();
120        assert_eq!(
121            result.input.header("keep"),
122            Some(&camel_api::Value::Bool(true))
123        );
124        assert_eq!(result.input.body.as_text(), Some("new"));
125    }
126
127    #[tokio::test]
128    async fn test_set_body_layer_composes() {
129        use tower::ServiceBuilder;
130
131        let svc = ServiceBuilder::new()
132            .layer(SetBodyLayer::new(|_ex: &Exchange| {
133                Body::Text("layered".into())
134            }))
135            .service(IdentityProcessor);
136
137        let exchange = Exchange::new(Message::default());
138        let result = svc.oneshot(exchange).await.unwrap();
139        assert_eq!(result.input.body.as_text(), Some("layered"));
140    }
141}