Skip to main content

camel_processor/
convert_body.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::body_converter::{BodyType, convert};
8use camel_api::{CamelError, Exchange};
9
10/// A processor that converts the message body to a target type.
11///
12/// Supported conversions: Text ↔ Json ↔ Bytes.
13/// `Body::Stream` always returns `TypeConversionFailed` — materialize first.
14/// Same-type conversions are noops.
15#[derive(Clone)]
16pub struct ConvertBodyTo<P> {
17    inner: P,
18    target: BodyType,
19}
20
21impl<P> ConvertBodyTo<P> {
22    pub fn new(inner: P, target: BodyType) -> Self {
23        Self { inner, target }
24    }
25}
26
27impl<P> Service<Exchange> for ConvertBodyTo<P>
28where
29    P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
30    P::Future: Send,
31{
32    type Response = Exchange;
33    type Error = CamelError;
34    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
35
36    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
37        self.inner.poll_ready(cx)
38    }
39
40    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
41        let target = self.target;
42        let body = std::mem::replace(&mut exchange.input.body, camel_api::body::Body::Empty);
43        match convert(body, target) {
44            Ok(new_body) => {
45                exchange.input.body = new_body;
46                let fut = self.inner.call(exchange);
47                Box::pin(fut)
48            }
49            Err(e) => Box::pin(async move { Err(e) }),
50        }
51    }
52}
53
54#[cfg(test)]
55mod tests {
56    use super::*;
57    use bytes::Bytes;
58    use camel_api::body::Body;
59    use camel_api::{IdentityProcessor, Message};
60    use serde_json::json;
61    use tower::ServiceExt;
62
63    #[tokio::test]
64    async fn text_to_json_in_pipeline() {
65        let exchange = Exchange::new(Message::new(Body::Text(r#"{"n":1}"#.to_string())));
66        let svc = ConvertBodyTo::new(IdentityProcessor, BodyType::Json);
67        let result = svc.oneshot(exchange).await.unwrap();
68        assert_eq!(result.input.body, Body::Json(json!({"n": 1})));
69    }
70
71    #[tokio::test]
72    async fn json_to_text_in_pipeline() {
73        let exchange = Exchange::new(Message::new(Body::Json(json!({"x": 2}))));
74        let svc = ConvertBodyTo::new(IdentityProcessor, BodyType::Text);
75        let result = svc.oneshot(exchange).await.unwrap();
76        assert!(matches!(result.input.body, Body::Text(_)));
77    }
78
79    #[tokio::test]
80    async fn bytes_to_text_in_pipeline() {
81        let exchange = Exchange::new(Message::new(Body::Bytes(Bytes::from_static(b"hello"))));
82        let svc = ConvertBodyTo::new(IdentityProcessor, BodyType::Text);
83        let result = svc.oneshot(exchange).await.unwrap();
84        assert_eq!(result.input.body, Body::Text("hello".to_string()));
85    }
86
87    #[tokio::test]
88    async fn invalid_conversion_returns_err() {
89        let exchange = Exchange::new(Message::new(Body::Empty));
90        let svc = ConvertBodyTo::new(IdentityProcessor, BodyType::Text);
91        let result = svc.oneshot(exchange).await;
92        assert!(matches!(result, Err(CamelError::TypeConversionFailed(_))));
93    }
94
95    #[tokio::test]
96    async fn noop_same_type() {
97        let exchange = Exchange::new(Message::new(Body::Text("hi".to_string())));
98        let svc = ConvertBodyTo::new(IdentityProcessor, BodyType::Text);
99        let result = svc.oneshot(exchange).await.unwrap();
100        assert_eq!(result.input.body, Body::Text("hi".to_string()));
101    }
102}