Skip to main content

camel_processor/
map_body.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::body::Body;
8use camel_api::{CamelError, Exchange};
9
10/// A processor that transforms the message body using a mapping function.
11#[derive(Clone)]
12pub struct MapBody<P, F> {
13    inner: P,
14    mapper: F,
15}
16
17impl<P, F> MapBody<P, F>
18where
19    F: Fn(Body) -> Body,
20{
21    /// Create a new MapBody processor wrapping the inner processor.
22    pub fn new(inner: P, mapper: F) -> Self {
23        Self { inner, mapper }
24    }
25}
26
27/// A Tower Layer that wraps an inner service with a [`MapBody`].
28#[derive(Clone)]
29pub struct MapBodyLayer<F> {
30    mapper: F,
31}
32
33impl<F> MapBodyLayer<F> {
34    pub fn new(mapper: F) -> Self {
35        Self { mapper }
36    }
37}
38
39impl<S, F> tower::Layer<S> for MapBodyLayer<F>
40where
41    F: Clone,
42{
43    type Service = MapBody<S, F>;
44
45    fn layer(&self, inner: S) -> Self::Service {
46        MapBody {
47            inner,
48            mapper: self.mapper.clone(),
49        }
50    }
51}
52
53impl<P, F> Service<Exchange> for MapBody<P, F>
54where
55    P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
56    P::Future: Send,
57    F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
58{
59    type Response = Exchange;
60    type Error = CamelError;
61    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
62
63    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
64        self.inner.poll_ready(cx)
65    }
66
67    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
68        exchange.input.body = (self.mapper)(exchange.input.body);
69        let fut = self.inner.call(exchange);
70        Box::pin(fut)
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use super::*;
77    use camel_api::{IdentityProcessor, Message};
78    use tower::ServiceExt;
79
80    #[tokio::test]
81    async fn test_map_body_transforms_text() {
82        let exchange = Exchange::new(Message::new("hello"));
83
84        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
85            if let Some(text) = body.as_text() {
86                Body::Text(text.to_uppercase())
87            } else {
88                body
89            }
90        });
91
92        let result = mapper.oneshot(exchange).await.unwrap();
93        assert_eq!(result.input.body.as_text(), Some("HELLO"));
94    }
95
96    #[tokio::test]
97    async fn test_map_body_text_to_json() {
98        let exchange = Exchange::new(Message::new("value"));
99
100        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
101            if let Some(text) = body.as_text() {
102                Body::Json(serde_json::json!({"data": text}))
103            } else {
104                body
105            }
106        });
107
108        let result = mapper.oneshot(exchange).await.unwrap();
109        assert!(matches!(result.input.body, Body::Json(_)));
110    }
111
112    #[tokio::test]
113    async fn test_map_body_empty_passthrough() {
114        let exchange = Exchange::new(Message::default());
115
116        let mapper = MapBody::new(IdentityProcessor, |body: Body| body);
117
118        let result = mapper.oneshot(exchange).await.unwrap();
119        assert!(result.input.body.is_empty());
120    }
121
122    #[tokio::test]
123    async fn test_map_body_layer_composes() {
124        use tower::ServiceBuilder;
125
126        let svc = ServiceBuilder::new()
127            .layer(super::MapBodyLayer::new(|body: Body| {
128                if let Some(text) = body.as_text() {
129                    Body::Text(text.to_uppercase())
130                } else {
131                    body
132                }
133            }))
134            .service(IdentityProcessor);
135
136        let exchange = Exchange::new(Message::new("hello"));
137        let result = svc.oneshot(exchange).await.unwrap();
138        assert_eq!(result.input.body.as_text(), Some("HELLO"));
139    }
140}