camel_processor/
routing_slip.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6use tower::ServiceExt;
7
8use camel_api::endpoint_pipeline::CAMEL_SLIP_ENDPOINT;
9use camel_api::{CamelError, EndpointPipelineConfig, Exchange, RoutingSlipConfig, Value};
10
11use crate::endpoint_pipeline::EndpointPipelineService;
12
13#[derive(Clone)]
18pub struct RoutingSlipService {
19 config: RoutingSlipConfig,
20 pipeline: EndpointPipelineService,
21}
22
23impl RoutingSlipService {
24 pub fn new(config: RoutingSlipConfig, endpoint_resolver: camel_api::EndpointResolver) -> Self {
25 let pipeline_config = EndpointPipelineConfig {
26 cache_size: EndpointPipelineConfig::from_signed(config.cache_size),
27 ignore_invalid_endpoints: config.ignore_invalid_endpoints,
28 };
29 Self {
30 config,
31 pipeline: EndpointPipelineService::new(endpoint_resolver, pipeline_config),
32 }
33 }
34}
35
36impl Service<Exchange> for RoutingSlipService {
37 type Response = Exchange;
38 type Error = CamelError;
39 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
40
41 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
42 Poll::Ready(Ok(()))
43 }
44
45 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
46 let config = self.config.clone();
47 let pipeline = self.pipeline.clone();
48
49 Box::pin(async move {
50 let slip = match (config.expression)(&exchange) {
51 None => return Ok(exchange),
52 Some(s) => s,
53 };
54
55 for uri in slip.split(&config.uri_delimiter) {
56 let uri = uri.trim();
57 if uri.is_empty() {
58 continue;
59 }
60
61 let endpoint = match pipeline.resolve(uri)? {
62 Some(e) => e,
63 None => continue,
64 };
65
66 exchange.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri.to_string()));
67
68 let mut endpoint = endpoint;
69 exchange = endpoint.ready().await?.call(exchange).await?;
70 }
71
72 Ok(exchange)
73 })
74 }
75}
76
77#[cfg(test)]
78mod tests {
79 use super::*;
80 use camel_api::{BoxProcessor, BoxProcessorExt, Message};
81 use std::sync::Arc;
82 use std::sync::atomic::{AtomicUsize, Ordering};
83
84 fn mock_resolver() -> camel_api::EndpointResolver {
85 Arc::new(|uri: &str| {
86 if uri.starts_with("mock:") {
87 Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
88 } else {
89 None
90 }
91 })
92 }
93
94 #[tokio::test]
95 async fn routing_slip_single_destination() {
96 let call_count = Arc::new(AtomicUsize::new(0));
97 let count_clone = call_count.clone();
98
99 let resolver = Arc::new(move |uri: &str| {
100 if uri == "mock:a" {
101 let count = count_clone.clone();
102 Some(BoxProcessor::from_fn(move |ex| {
103 count.fetch_add(1, Ordering::SeqCst);
104 Box::pin(async move { Ok(ex) })
105 }))
106 } else {
107 None
108 }
109 });
110
111 let config = RoutingSlipConfig::new(Arc::new(|_ex: &Exchange| Some("mock:a".to_string())));
112
113 let mut svc = RoutingSlipService::new(config, resolver);
114 let ex = Exchange::new(Message::new("test"));
115 let result = svc.ready().await.unwrap().call(ex).await;
116
117 assert!(result.is_ok());
118 assert_eq!(call_count.load(Ordering::SeqCst), 1);
119 }
120
121 #[tokio::test]
122 async fn routing_slip_multiple_destinations() {
123 let call_count = Arc::new(AtomicUsize::new(0));
124 let count_clone = call_count.clone();
125
126 let resolver = Arc::new(move |uri: &str| {
127 if uri.starts_with("mock:") {
128 let count = count_clone.clone();
129 Some(BoxProcessor::from_fn(move |ex| {
130 count.fetch_add(1, Ordering::SeqCst);
131 Box::pin(async move { Ok(ex) })
132 }))
133 } else {
134 None
135 }
136 });
137
138 let config = RoutingSlipConfig::new(Arc::new(|_ex: &Exchange| {
139 Some("mock:a,mock:b,mock:c".to_string())
140 }));
141
142 let mut svc = RoutingSlipService::new(config, resolver);
143 let ex = Exchange::new(Message::new("test"));
144 let result = svc.ready().await.unwrap().call(ex).await;
145
146 assert!(result.is_ok());
147 assert_eq!(call_count.load(Ordering::SeqCst), 3);
148 }
149
150 #[tokio::test]
151 async fn routing_slip_empty_expression() {
152 let config = RoutingSlipConfig::new(Arc::new(|_ex: &Exchange| None));
153
154 let mut svc = RoutingSlipService::new(config, mock_resolver());
155 let ex = Exchange::new(Message::new("test"));
156 let result = svc.ready().await.unwrap().call(ex).await;
157
158 assert!(result.is_ok());
159 }
160
161 #[tokio::test]
162 async fn routing_slip_empty_string() {
163 let config = RoutingSlipConfig::new(Arc::new(|_ex: &Exchange| Some(String::new())));
164
165 let mut svc = RoutingSlipService::new(config, mock_resolver());
166 let ex = Exchange::new(Message::new("test"));
167 let result = svc.ready().await.unwrap().call(ex).await;
168
169 assert!(result.is_ok());
170 }
171
172 #[tokio::test]
173 async fn routing_slip_invalid_endpoint_error() {
174 let config = RoutingSlipConfig::new(Arc::new(|_ex: &Exchange| {
175 Some("invalid:endpoint".to_string())
176 }))
177 .ignore_invalid_endpoints(false);
178
179 let mut svc = RoutingSlipService::new(config, mock_resolver());
180 let ex = Exchange::new(Message::new("test"));
181 let result = svc.ready().await.unwrap().call(ex).await;
182
183 assert!(result.is_err());
184 assert!(result.unwrap_err().to_string().contains("Invalid endpoint"));
185 }
186
187 #[tokio::test]
188 async fn routing_slip_ignore_invalid_endpoint() {
189 let call_count = Arc::new(AtomicUsize::new(0));
190 let count_clone = call_count.clone();
191
192 let resolver = Arc::new(move |uri: &str| {
193 if uri == "mock:valid" {
194 let count = count_clone.clone();
195 Some(BoxProcessor::from_fn(move |ex| {
196 count.fetch_add(1, Ordering::SeqCst);
197 Box::pin(async move { Ok(ex) })
198 }))
199 } else {
200 None
201 }
202 });
203
204 let config = RoutingSlipConfig::new(Arc::new(|_ex: &Exchange| {
205 Some("invalid:endpoint,mock:valid".to_string())
206 }))
207 .ignore_invalid_endpoints(true);
208
209 let mut svc = RoutingSlipService::new(config, resolver);
210 let ex = Exchange::new(Message::new("test"));
211 let result = svc.ready().await.unwrap().call(ex).await;
212
213 assert!(result.is_ok());
214 assert_eq!(call_count.load(Ordering::SeqCst), 1);
215 }
216
217 #[tokio::test]
218 async fn routing_slip_order_preserved() {
219 use std::sync::Mutex;
220
221 let order: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
222 let order_clone = order.clone();
223
224 let resolver = Arc::new(move |uri: &str| {
225 let order = order_clone.clone();
226 let uri = uri.to_string();
227 Some(BoxProcessor::from_fn(move |ex| {
228 order.lock().unwrap().push(uri.clone());
229 Box::pin(async move { Ok(ex) })
230 }))
231 });
232
233 let config = RoutingSlipConfig::new(Arc::new(|_ex: &Exchange| {
234 Some("mock:first,mock:second,mock:third".to_string())
235 }));
236
237 let mut svc = RoutingSlipService::new(config, resolver);
238 let ex = Exchange::new(Message::new("test"));
239 svc.ready().await.unwrap().call(ex).await.unwrap();
240
241 let order = order.lock().unwrap();
242 assert_eq!(*order, vec!["mock:first", "mock:second", "mock:third"]);
243 }
244
245 #[tokio::test]
246 async fn routing_slip_endpoint_property_set() {
247 let last_uri: Arc<std::sync::Mutex<Option<String>>> = Arc::new(std::sync::Mutex::new(None));
248 let last_uri_clone = last_uri.clone();
249
250 let resolver = Arc::new(move |uri: &str| {
251 let last = last_uri_clone.clone();
252 let _uri = uri.to_string();
253 Some(BoxProcessor::from_fn(move |ex| {
254 let prop = ex.property(CAMEL_SLIP_ENDPOINT).cloned();
255 *last.lock().unwrap() = prop.and_then(|v| v.as_str().map(String::from));
256 Box::pin(async move { Ok(ex) })
257 }))
258 });
259
260 let config =
261 RoutingSlipConfig::new(Arc::new(|_ex: &Exchange| Some("mock:a,mock:b".to_string())));
262
263 let mut svc = RoutingSlipService::new(config, resolver);
264 let ex = Exchange::new(Message::new("test"));
265 svc.ready().await.unwrap().call(ex).await.unwrap();
266
267 let last = last_uri.lock().unwrap();
268 assert_eq!(last.as_deref(), Some("mock:b"));
269 }
270
271 #[tokio::test]
272 async fn routing_slip_mutation_between_steps() {
273 let resolver = Arc::new(|uri: &str| {
274 if uri == "mock:mutate" {
275 Some(BoxProcessor::from_fn(|mut ex| {
276 ex.input.body = camel_api::Body::Text("mutated".to_string());
277 Box::pin(async move { Ok(ex) })
278 }))
279 } else if uri == "mock:verify" {
280 Some(BoxProcessor::from_fn(|ex| {
281 let body = ex.input.body.as_text().unwrap_or("").to_string();
282 assert_eq!(body, "mutated");
283 Box::pin(async move { Ok(ex) })
284 }))
285 } else {
286 None
287 }
288 });
289
290 let config = RoutingSlipConfig::new(Arc::new(|_ex: &Exchange| {
291 Some("mock:mutate,mock:verify".to_string())
292 }));
293
294 let mut svc = RoutingSlipService::new(config, resolver);
295 let ex = Exchange::new(Message::new("original"));
296 let result = svc.ready().await.unwrap().call(ex).await;
297
298 assert!(result.is_ok());
299 }
300
301 #[tokio::test]
302 async fn routing_slip_cache_hit() {
303 let resolve_count = Arc::new(AtomicUsize::new(0));
304 let resolve_clone = resolve_count.clone();
305
306 let resolver = Arc::new(move |uri: &str| {
307 if uri.starts_with("mock:") {
308 resolve_clone.fetch_add(1, Ordering::SeqCst);
309 Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
310 } else {
311 None
312 }
313 });
314
315 let call_count = Arc::new(AtomicUsize::new(0));
316 let call_clone = call_count.clone();
317
318 let config = RoutingSlipConfig::new(Arc::new(move |_ex: &Exchange| {
319 let n = call_clone.fetch_add(1, Ordering::SeqCst);
320 if n < 2 {
321 Some("mock:a,mock:b".to_string())
322 } else {
323 None
324 }
325 }));
326
327 let mut svc = RoutingSlipService::new(config, resolver);
328 let ex1 = Exchange::new(Message::new("test1"));
329 svc.ready().await.unwrap().call(ex1).await.unwrap();
330 let ex2 = Exchange::new(Message::new("test2"));
331 svc.ready().await.unwrap().call(ex2).await.unwrap();
332
333 assert_eq!(resolve_count.load(Ordering::SeqCst), 2);
334 }
335
336 #[tokio::test]
337 async fn routing_slip_custom_delimiter() {
338 let order: Arc<std::sync::Mutex<Vec<String>>> = Arc::new(std::sync::Mutex::new(Vec::new()));
339 let order_clone = order.clone();
340
341 let resolver = Arc::new(move |uri: &str| {
342 let order = order_clone.clone();
343 let uri = uri.to_string();
344 Some(BoxProcessor::from_fn(move |ex| {
345 order.lock().unwrap().push(uri.clone());
346 Box::pin(async move { Ok(ex) })
347 }))
348 });
349
350 let config = RoutingSlipConfig::new(Arc::new(|_ex: &Exchange| {
351 Some("mock:x|mock:y|mock:z".to_string())
352 }))
353 .uri_delimiter("|");
354
355 let mut svc = RoutingSlipService::new(config, resolver);
356 let ex = Exchange::new(Message::new("test"));
357 svc.ready().await.unwrap().call(ex).await.unwrap();
358
359 let order = order.lock().unwrap();
360 assert_eq!(*order, vec!["mock:x", "mock:y", "mock:z"]);
361 }
362
363 #[tokio::test]
364 async fn routing_slip_expression_evaluated_once() {
365 let expr_count = Arc::new(AtomicUsize::new(0));
366 let expr_count_clone = expr_count.clone();
367
368 let resolver = Arc::new(|uri: &str| {
369 if uri.starts_with("mock:") {
370 Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
371 } else {
372 None
373 }
374 });
375
376 let config = RoutingSlipConfig::new(Arc::new(move |_ex: &Exchange| {
377 expr_count_clone.fetch_add(1, Ordering::SeqCst);
378 Some("mock:a,mock:b".to_string())
379 }));
380
381 let mut svc = RoutingSlipService::new(config, resolver);
382 let ex = Exchange::new(Message::new("test"));
383 svc.ready().await.unwrap().call(ex).await.unwrap();
384
385 assert_eq!(
386 expr_count.load(Ordering::SeqCst),
387 1,
388 "Expression must be evaluated exactly once"
389 );
390 }
391}