Skip to main content

camel_processor/
convert_body.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::body_converter::{BodyType, convert};
8use camel_api::{CamelError, Exchange};
9
10/// A processor that converts the message body to a target type.
11///
12/// Supported conversions: Text ↔ Json ↔ Bytes.
13/// `Body::Stream` always returns `TypeConversionFailed` — wrap with
14/// [`StreamCacheService`](crate::StreamCacheService) to auto-materialize first.
15/// Same-type conversions are noops.
16#[derive(Clone)]
17pub struct ConvertBodyTo<P> {
18    inner: P,
19    target: BodyType,
20}
21
22impl<P> ConvertBodyTo<P> {
23    pub fn new(inner: P, target: BodyType) -> Self {
24        Self { inner, target }
25    }
26}
27
28impl<P> Service<Exchange> for ConvertBodyTo<P>
29where
30    P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
31    P::Future: Send,
32{
33    type Response = Exchange;
34    type Error = CamelError;
35    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
36
37    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
38        self.inner.poll_ready(cx)
39    }
40
41    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
42        let target = self.target;
43        let body = std::mem::replace(&mut exchange.input.body, camel_api::body::Body::Empty);
44        match convert(body, target) {
45            Ok(new_body) => {
46                exchange.input.body = new_body;
47                let fut = self.inner.call(exchange);
48                Box::pin(fut)
49            }
50            Err(e) => Box::pin(async move { Err(e) }),
51        }
52    }
53}
54
55#[cfg(test)]
56mod tests {
57    use super::*;
58    use bytes::Bytes;
59    use camel_api::body::Body;
60    use camel_api::{IdentityProcessor, Message};
61    use serde_json::json;
62    use tower::ServiceExt;
63
64    #[tokio::test]
65    async fn text_to_json_in_pipeline() {
66        let exchange = Exchange::new(Message::new(Body::Text(r#"{"n":1}"#.to_string())));
67        let svc = ConvertBodyTo::new(IdentityProcessor, BodyType::Json);
68        let result = svc.oneshot(exchange).await.unwrap();
69        assert_eq!(result.input.body, Body::Json(json!({"n": 1})));
70    }
71
72    #[tokio::test]
73    async fn json_to_text_in_pipeline() {
74        let exchange = Exchange::new(Message::new(Body::Json(json!({"x": 2}))));
75        let svc = ConvertBodyTo::new(IdentityProcessor, BodyType::Text);
76        let result = svc.oneshot(exchange).await.unwrap();
77        assert!(matches!(result.input.body, Body::Text(_)));
78    }
79
80    #[tokio::test]
81    async fn bytes_to_text_in_pipeline() {
82        let exchange = Exchange::new(Message::new(Body::Bytes(Bytes::from_static(b"hello"))));
83        let svc = ConvertBodyTo::new(IdentityProcessor, BodyType::Text);
84        let result = svc.oneshot(exchange).await.unwrap();
85        assert_eq!(result.input.body, Body::Text("hello".to_string()));
86    }
87
88    #[tokio::test]
89    async fn invalid_conversion_returns_err() {
90        let exchange = Exchange::new(Message::new(Body::Empty));
91        let svc = ConvertBodyTo::new(IdentityProcessor, BodyType::Text);
92        let result = svc.oneshot(exchange).await;
93        assert!(matches!(result, Err(CamelError::TypeConversionFailed(_))));
94    }
95
96    #[tokio::test]
97    async fn noop_same_type() {
98        let exchange = Exchange::new(Message::new(Body::Text("hi".to_string())));
99        let svc = ConvertBodyTo::new(IdentityProcessor, BodyType::Text);
100        let result = svc.oneshot(exchange).await.unwrap();
101        assert_eq!(result.input.body, Body::Text("hi".to_string()));
102    }
103}