camel_processor/
marshal.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7
8use camel_api::body::Body;
9use camel_api::data_format::DataFormat;
10use camel_api::{CamelError, Exchange};
11
12#[derive(Clone)]
13pub struct MarshalService<P> {
14 inner: P,
15 format: Arc<dyn DataFormat>,
16}
17
18impl<P> MarshalService<P> {
19 pub fn new(inner: P, format: Arc<dyn DataFormat>) -> Self {
20 Self { inner, format }
21 }
22}
23
24#[derive(Clone)]
25pub struct UnmarshalService<P> {
26 inner: P,
27 format: Arc<dyn DataFormat>,
28}
29
30impl<P> UnmarshalService<P> {
31 pub fn new(inner: P, format: Arc<dyn DataFormat>) -> Self {
32 Self { inner, format }
33 }
34}
35
36impl<P> Service<Exchange> for MarshalService<P>
37where
38 P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
39 P::Future: Send,
40{
41 type Response = Exchange;
42 type Error = CamelError;
43 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
44
45 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
46 self.inner.poll_ready(cx)
47 }
48
49 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
50 let body = std::mem::replace(&mut exchange.input.body, Body::Empty);
51 let format = self.format.clone();
52 match format.marshal(body) {
53 Ok(new_body) => {
54 exchange.input.body = new_body;
55 let fut = self.inner.call(exchange);
56 Box::pin(fut)
57 }
58 Err(e) => Box::pin(async move { Err(e) }),
59 }
60 }
61}
62
63impl<P> Service<Exchange> for UnmarshalService<P>
64where
65 P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
66 P::Future: Send,
67{
68 type Response = Exchange;
69 type Error = CamelError;
70 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
71
72 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
73 self.inner.poll_ready(cx)
74 }
75
76 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
77 let body = std::mem::replace(&mut exchange.input.body, Body::Empty);
78 let format = self.format.clone();
79 match format.unmarshal(body) {
80 Ok(new_body) => {
81 exchange.input.body = new_body;
82 let fut = self.inner.call(exchange);
83 Box::pin(fut)
84 }
85 Err(e) => Box::pin(async move { Err(e) }),
86 }
87 }
88}
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93 use crate::data_format::builtin_data_format;
94 use camel_api::{IdentityProcessor, Message, Value};
95 use serde_json::json;
96 use tower::ServiceExt;
97
98 #[tokio::test]
99 async fn test_marshal_json_to_text() {
100 let df = builtin_data_format("json").unwrap();
101 let svc = MarshalService::new(IdentityProcessor, df);
102 let exchange = Exchange::new(Message::new(Body::Json(json!({"a": 1}))));
103 let result = svc.oneshot(exchange).await.unwrap();
104 assert!(matches!(result.input.body, Body::Text(_)));
105 }
106
107 #[tokio::test]
108 async fn test_unmarshal_text_to_json() {
109 let df = builtin_data_format("json").unwrap();
110 let svc = UnmarshalService::new(IdentityProcessor, df);
111 let exchange = Exchange::new(Message::new(Body::Text(r#"{"b":2}"#.to_string())));
112 let result = svc.oneshot(exchange).await.unwrap();
113 assert!(matches!(result.input.body, Body::Json(_)));
114 }
115
116 #[tokio::test]
117 async fn test_marshal_preserves_headers() {
118 let df = builtin_data_format("json").unwrap();
119 let svc = MarshalService::new(IdentityProcessor, df);
120 let mut msg = Message::new(Body::Json(json!(1)));
121 msg.set_header("keep", Value::Bool(true));
122 let exchange = Exchange::new(msg);
123 let result = svc.oneshot(exchange).await.unwrap();
124 assert_eq!(result.input.header("keep"), Some(&Value::Bool(true)));
125 }
126
127 #[tokio::test]
128 async fn test_unmarshal_invalid_returns_error() {
129 let df = builtin_data_format("json").unwrap();
130 let svc = UnmarshalService::new(IdentityProcessor, df);
131 let exchange = Exchange::new(Message::new(Body::Text("bad".to_string())));
132 let result = svc.oneshot(exchange).await;
133 assert!(matches!(result, Err(CamelError::TypeConversionFailed(_))));
134 }
135
136 #[tokio::test]
137 async fn test_marshal_xml_to_text() {
138 let df = builtin_data_format("xml").unwrap();
139 let svc = MarshalService::new(IdentityProcessor, df);
140 let exchange = Exchange::new(Message::new(Body::Xml("<root/>".to_string())));
141 let result = svc.oneshot(exchange).await.unwrap();
142 assert_eq!(result.input.body, Body::Text("<root/>".to_string()));
143 }
144
145 #[tokio::test]
146 async fn test_unmarshal_text_to_xml() {
147 let df = builtin_data_format("xml").unwrap();
148 let svc = UnmarshalService::new(IdentityProcessor, df);
149 let exchange = Exchange::new(Message::new(Body::Text("<root><c/></root>".to_string())));
150 let result = svc.oneshot(exchange).await.unwrap();
151 assert!(matches!(result.input.body, Body::Xml(_)));
152 }
153}