1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7
8use crate::error::CamelError;
9use crate::exchange::Exchange;
10
11pub trait Processor:
16 Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + Sync + 'static
17{
18}
19
20impl<P> Processor for P where
22 P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + Sync + 'static
23{
24}
25
26#[derive(Debug, Clone)]
28pub struct IdentityProcessor;
29
30impl Service<Exchange> for IdentityProcessor {
31 type Response = Exchange;
32 type Error = CamelError;
33 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
34
35 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
36 Poll::Ready(Ok(()))
37 }
38
39 fn call(&mut self, exchange: Exchange) -> Self::Future {
40 Box::pin(async move { Ok(exchange) })
41 }
42}
43
44pub type BoxProcessor = tower::util::BoxCloneService<Exchange, Exchange, CamelError>;
48
49pub trait BoxProcessorExt {
62 fn from_fn<F, Fut>(f: F) -> BoxProcessor
66 where
67 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
68 Fut: Future<Output = Result<Exchange, CamelError>> + Send + 'static;
69}
70
71impl BoxProcessorExt for BoxProcessor {
72 fn from_fn<F, Fut>(f: F) -> BoxProcessor
73 where
74 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
75 Fut: Future<Output = Result<Exchange, CamelError>> + Send + 'static,
76 {
77 BoxProcessor::new(ProcessorFn::new(f))
78 }
79}
80
81pub struct ProcessorFn<F> {
85 f: Arc<F>,
86}
87
88impl<F> Clone for ProcessorFn<F> {
90 fn clone(&self) -> Self {
91 Self {
92 f: Arc::clone(&self.f),
93 }
94 }
95}
96
97impl<F> ProcessorFn<F> {
98 pub fn new(f: F) -> Self {
99 Self { f: Arc::new(f) }
100 }
101}
102
103impl<F, Fut> Service<Exchange> for ProcessorFn<F>
104where
105 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
106 Fut: Future<Output = Result<Exchange, CamelError>> + Send + 'static,
107{
108 type Response = Exchange;
109 type Error = CamelError;
110 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
111
112 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
113 Poll::Ready(Ok(()))
114 }
115
116 fn call(&mut self, exchange: Exchange) -> Self::Future {
117 let f = Arc::clone(&self.f);
118 Box::pin(async move { f(exchange).await })
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125 use crate::message::Message;
126 use tower::ServiceExt;
127
128 #[tokio::test]
129 async fn test_identity_processor_passes_through() {
130 let exchange = Exchange::new(Message::new("hello"));
131 let processor = IdentityProcessor;
132
133 let result = processor.oneshot(exchange).await.unwrap();
134 assert_eq!(result.input.body.as_text(), Some("hello"));
135 }
136
137 #[tokio::test]
138 async fn test_identity_processor_preserves_headers() {
139 let mut exchange = Exchange::new(Message::default());
140 exchange
141 .input
142 .set_header("key", serde_json::Value::String("value".into()));
143
144 let processor = IdentityProcessor;
145 let result = processor.oneshot(exchange).await.unwrap();
146 assert_eq!(
147 result.input.header("key"),
148 Some(&serde_json::Value::String("value".into()))
149 );
150 }
151
152 #[tokio::test]
153 async fn test_identity_processor_preserves_properties() {
154 let mut exchange = Exchange::new(Message::default());
155 exchange.set_property("prop", serde_json::Value::Bool(true));
156
157 let processor = IdentityProcessor;
158 let result = processor.oneshot(exchange).await.unwrap();
159 assert_eq!(
160 result.property("prop"),
161 Some(&serde_json::Value::Bool(true))
162 );
163 }
164
165 #[tokio::test]
166 async fn test_processor_fn_transforms_exchange() {
167 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
168 ex.input.body = crate::body::Body::Text("transformed".into());
169 Ok(ex)
170 });
171
172 let exchange = Exchange::new(Message::new("original"));
173 let result = processor.oneshot(exchange).await.unwrap();
174 assert_eq!(result.input.body.as_text(), Some("transformed"));
175 }
176
177 #[tokio::test]
178 async fn test_processor_fn_can_return_error() {
179 let processor = ProcessorFn::new(|_ex: Exchange| async move {
180 Err(CamelError::ProcessorError("intentional error".into()))
181 });
182
183 let exchange = Exchange::new(Message::default());
184 let result: Result<Exchange, CamelError> = processor.oneshot(exchange).await;
185 assert!(result.is_err());
186 }
187
188 #[tokio::test]
189 async fn test_processor_fn_is_cloneable() {
190 let processor = ProcessorFn::new(|ex: Exchange| async move { Ok(ex) });
191 let cloned = processor.clone();
192
193 let exchange = Exchange::new(Message::new("test"));
194 let result = cloned.oneshot(exchange).await.unwrap();
195 assert_eq!(result.input.body.as_text(), Some("test"));
196 }
197
198 #[tokio::test]
199 async fn test_box_processor_from_identity() {
200 let processor: BoxProcessor = tower::util::BoxCloneService::new(IdentityProcessor);
201
202 let exchange = Exchange::new(Message::new("boxed"));
203 let result = processor.oneshot(exchange).await.unwrap();
204 assert_eq!(result.input.body.as_text(), Some("boxed"));
205 }
206
207 #[tokio::test]
208 async fn test_box_processor_from_processor_fn() {
209 let processor: BoxProcessor =
210 tower::util::BoxCloneService::new(ProcessorFn::new(|mut ex: Exchange| async move {
211 ex.input.body = crate::body::Body::Text("via_box".into());
212 Ok(ex)
213 }));
214
215 let exchange = Exchange::new(Message::new("original"));
216 let result = processor.oneshot(exchange).await.unwrap();
217 assert_eq!(result.input.body.as_text(), Some("via_box"));
218 }
219
220 #[tokio::test]
221 async fn test_box_processor_ext_from_fn() {
222 let processor = BoxProcessor::from_fn(|mut ex: Exchange| async move {
223 ex.input.body = crate::body::Body::Text("via_from_fn".into());
224 Ok(ex)
225 });
226
227 let exchange = Exchange::new(Message::new("original"));
228 let result = processor.oneshot(exchange).await.unwrap();
229 assert_eq!(result.input.body.as_text(), Some("via_from_fn"));
230 }
231}