Skip to main content

camel_processor/
marshal.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7
8use camel_api::body::Body;
9use camel_api::data_format::DataFormat;
10use camel_api::{CamelError, Exchange};
11
12#[derive(Clone)]
13pub struct MarshalService<P> {
14    inner: P,
15    format: Arc<dyn DataFormat>,
16}
17
18impl<P> MarshalService<P> {
19    pub fn new(inner: P, format: Arc<dyn DataFormat>) -> Self {
20        Self { inner, format }
21    }
22}
23
24#[derive(Clone)]
25pub struct UnmarshalService<P> {
26    inner: P,
27    format: Arc<dyn DataFormat>,
28}
29
30impl<P> UnmarshalService<P> {
31    pub fn new(inner: P, format: Arc<dyn DataFormat>) -> Self {
32        Self { inner, format }
33    }
34}
35
36impl<P> Service<Exchange> for MarshalService<P>
37where
38    P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
39    P::Future: Send,
40{
41    type Response = Exchange;
42    type Error = CamelError;
43    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
44
45    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
46        self.inner.poll_ready(cx)
47    }
48
49    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
50        let body = std::mem::replace(&mut exchange.input.body, Body::Empty);
51        let format = self.format.clone();
52        match format.marshal(body) {
53            Ok(new_body) => {
54                exchange.input.body = new_body;
55                let fut = self.inner.call(exchange);
56                Box::pin(fut)
57            }
58            Err(e) => Box::pin(async move { Err(e) }),
59        }
60    }
61}
62
63impl<P> Service<Exchange> for UnmarshalService<P>
64where
65    P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
66    P::Future: Send,
67{
68    type Response = Exchange;
69    type Error = CamelError;
70    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
71
72    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
73        self.inner.poll_ready(cx)
74    }
75
76    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
77        let body = std::mem::replace(&mut exchange.input.body, Body::Empty);
78        let format = self.format.clone();
79        match format.unmarshal(body) {
80            Ok(new_body) => {
81                exchange.input.body = new_body;
82                let fut = self.inner.call(exchange);
83                Box::pin(fut)
84            }
85            Err(e) => Box::pin(async move { Err(e) }),
86        }
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93    use crate::data_format::builtin_data_format;
94    use camel_api::{IdentityProcessor, Message, Value};
95    use serde_json::json;
96    use std::sync::Arc;
97    use tower::ServiceExt;
98
99    struct FailingDataFormat;
100
101    impl DataFormat for FailingDataFormat {
102        fn name(&self) -> &str {
103            "failing"
104        }
105
106        fn marshal(&self, _body: Body) -> Result<Body, CamelError> {
107            Err(CamelError::ProcessorError("marshal-fail".to_string()))
108        }
109
110        fn unmarshal(&self, _body: Body) -> Result<Body, CamelError> {
111            Err(CamelError::ProcessorError("unmarshal-fail".to_string()))
112        }
113    }
114
115    #[tokio::test]
116    async fn test_marshal_json_to_text() {
117        let df = builtin_data_format("json").unwrap();
118        let svc = MarshalService::new(IdentityProcessor, df);
119        let exchange = Exchange::new(Message::new(Body::Json(json!({"a": 1}))));
120        let result = svc.oneshot(exchange).await.unwrap();
121        assert!(matches!(result.input.body, Body::Text(_)));
122    }
123
124    #[tokio::test]
125    async fn test_unmarshal_text_to_json() {
126        let df = builtin_data_format("json").unwrap();
127        let svc = UnmarshalService::new(IdentityProcessor, df);
128        let exchange = Exchange::new(Message::new(Body::Text(r#"{"b":2}"#.to_string())));
129        let result = svc.oneshot(exchange).await.unwrap();
130        assert!(matches!(result.input.body, Body::Json(_)));
131    }
132
133    #[tokio::test]
134    async fn test_marshal_preserves_headers() {
135        let df = builtin_data_format("json").unwrap();
136        let svc = MarshalService::new(IdentityProcessor, df);
137        let mut msg = Message::new(Body::Json(json!(1)));
138        msg.set_header("keep", Value::Bool(true));
139        let exchange = Exchange::new(msg);
140        let result = svc.oneshot(exchange).await.unwrap();
141        assert_eq!(result.input.header("keep"), Some(&Value::Bool(true)));
142    }
143
144    #[tokio::test]
145    async fn test_unmarshal_invalid_returns_error() {
146        let df = builtin_data_format("json").unwrap();
147        let svc = UnmarshalService::new(IdentityProcessor, df);
148        let exchange = Exchange::new(Message::new(Body::Text("bad".to_string())));
149        let result = svc.oneshot(exchange).await;
150        assert!(matches!(result, Err(CamelError::TypeConversionFailed(_))));
151    }
152
153    #[tokio::test]
154    async fn test_marshal_xml_json_to_text() {
155        let df = builtin_data_format("xml").unwrap();
156        let svc = MarshalService::new(IdentityProcessor, df);
157        let exchange = Exchange::new(Message::new(Body::Json(json!({"root": {"c": "1"}}))));
158        let result = svc.oneshot(exchange).await.unwrap();
159        match result.input.body {
160            Body::Text(s) => {
161                assert_eq!(s, "<root><c>1</c></root>");
162            }
163            _ => panic!("expected Body::Text, got {:?}", result.input.body),
164        }
165    }
166
167    #[tokio::test]
168    async fn test_unmarshal_text_to_json_via_xml() {
169        let df = builtin_data_format("xml").unwrap();
170        let svc = UnmarshalService::new(IdentityProcessor, df);
171        let exchange = Exchange::new(Message::new(Body::Text("<root><c/></root>".to_string())));
172        let result = svc.oneshot(exchange).await.unwrap();
173        match result.input.body {
174            Body::Json(v) => {
175                assert_eq!(v["root"]["c"], serde_json::Value::Null);
176                assert_eq!(v, json!({"root": {"c": null}}));
177            }
178            _ => panic!("expected Body::Json, got {:?}", result.input.body),
179        }
180    }
181
182    #[tokio::test]
183    async fn test_marshal_error_propagates() {
184        let svc = MarshalService::new(IdentityProcessor, Arc::new(FailingDataFormat));
185        let exchange = Exchange::new(Message::new(Body::Text("x".to_string())));
186        let result = svc.oneshot(exchange).await;
187        assert!(matches!(result, Err(CamelError::ProcessorError(msg)) if msg == "marshal-fail"));
188    }
189
190    #[tokio::test]
191    async fn test_unmarshal_error_propagates() {
192        let svc = UnmarshalService::new(IdentityProcessor, Arc::new(FailingDataFormat));
193        let exchange = Exchange::new(Message::new(Body::Text("x".to_string())));
194        let result = svc.oneshot(exchange).await;
195        assert!(matches!(result, Err(CamelError::ProcessorError(msg)) if msg == "unmarshal-fail"));
196    }
197}