camel_processor/
map_body.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::body::Body;
8use camel_api::{CamelError, Exchange};
9
10#[derive(Clone)]
12pub struct MapBody<P, F> {
13 inner: P,
14 mapper: F,
15}
16
17impl<P, F> MapBody<P, F>
18where
19 F: Fn(Body) -> Body,
20{
21 pub fn new(inner: P, mapper: F) -> Self {
23 Self { inner, mapper }
24 }
25}
26
27#[derive(Clone)]
29pub struct MapBodyLayer<F> {
30 mapper: F,
31}
32
33impl<F> MapBodyLayer<F> {
34 pub fn new(mapper: F) -> Self {
35 Self { mapper }
36 }
37}
38
39impl<S, F> tower::Layer<S> for MapBodyLayer<F>
40where
41 F: Clone,
42{
43 type Service = MapBody<S, F>;
44
45 fn layer(&self, inner: S) -> Self::Service {
46 MapBody {
47 inner,
48 mapper: self.mapper.clone(),
49 }
50 }
51}
52
53impl<P, F> Service<Exchange> for MapBody<P, F>
54where
55 P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
56 P::Future: Send,
57 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
58{
59 type Response = Exchange;
60 type Error = CamelError;
61 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
62
63 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
64 self.inner.poll_ready(cx)
65 }
66
67 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
68 exchange.input.body = (self.mapper)(exchange.input.body);
69 let fut = self.inner.call(exchange);
70 Box::pin(fut)
71 }
72}
73
74#[cfg(test)]
75mod tests {
76 use super::*;
77 use camel_api::{IdentityProcessor, Message};
78 use tower::ServiceExt;
79
80 #[tokio::test]
81 async fn test_map_body_transforms_text() {
82 let exchange = Exchange::new(Message::new("hello"));
83
84 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
85 if let Some(text) = body.as_text() {
86 Body::Text(text.to_uppercase())
87 } else {
88 body
89 }
90 });
91
92 let result = mapper.oneshot(exchange).await.unwrap();
93 assert_eq!(result.input.body.as_text(), Some("HELLO"));
94 }
95
96 #[tokio::test]
97 async fn test_map_body_text_to_json() {
98 let exchange = Exchange::new(Message::new("value"));
99
100 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
101 if let Some(text) = body.as_text() {
102 Body::Json(serde_json::json!({"data": text}))
103 } else {
104 body
105 }
106 });
107
108 let result = mapper.oneshot(exchange).await.unwrap();
109 assert!(matches!(result.input.body, Body::Json(_)));
110 }
111
112 #[tokio::test]
113 async fn test_map_body_empty_passthrough() {
114 let exchange = Exchange::new(Message::default());
115
116 let mapper = MapBody::new(IdentityProcessor, |body: Body| body);
117
118 let result = mapper.oneshot(exchange).await.unwrap();
119 assert!(result.input.body.is_empty());
120 }
121
122 #[tokio::test]
123 async fn test_map_body_layer_composes() {
124 use tower::ServiceBuilder;
125
126 let svc = ServiceBuilder::new()
127 .layer(super::MapBodyLayer::new(|body: Body| {
128 if let Some(text) = body.as_text() {
129 Body::Text(text.to_uppercase())
130 } else {
131 body
132 }
133 }))
134 .service(IdentityProcessor);
135
136 let exchange = Exchange::new(Message::new("hello"));
137 let result = svc.oneshot(exchange).await.unwrap();
138 assert_eq!(result.input.body.as_text(), Some("HELLO"));
139 }
140}