Skip to main content

camel_processor/
routing_slip.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6use tower::ServiceExt;
7
8use camel_api::endpoint_pipeline::CAMEL_SLIP_ENDPOINT;
9use camel_api::{CamelError, EndpointPipelineConfig, Exchange, RoutingSlipConfig, Value};
10
11use crate::endpoint_pipeline::EndpointPipelineService;
12
13/// Routing Slip EIP implementation.
14///
15/// Evaluates an expression once to get a list of endpoint URIs, then routes
16/// the message through them sequentially in a pipeline fashion.
17#[derive(Clone)]
18pub struct RoutingSlipService {
19    config: RoutingSlipConfig,
20    pipeline: EndpointPipelineService,
21}
22
23impl RoutingSlipService {
24    pub fn new(config: RoutingSlipConfig, endpoint_resolver: camel_api::EndpointResolver) -> Self {
25        let pipeline_config = EndpointPipelineConfig {
26            cache_size: EndpointPipelineConfig::from_signed(config.cache_size),
27            ignore_invalid_endpoints: config.ignore_invalid_endpoints,
28        };
29        Self {
30            config,
31            pipeline: EndpointPipelineService::new(endpoint_resolver, pipeline_config),
32        }
33    }
34}
35
36impl Service<Exchange> for RoutingSlipService {
37    type Response = Exchange;
38    type Error = CamelError;
39    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
40
41    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
42        Poll::Ready(Ok(()))
43    }
44
45    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
46        let config = self.config.clone();
47        let pipeline = self.pipeline.clone();
48
49        Box::pin(async move {
50            let slip = match (config.expression)(&exchange) {
51                None => return Ok(exchange),
52                Some(s) => s,
53            };
54
55            for uri in slip.split(&config.uri_delimiter) {
56                let uri = uri.trim();
57                if uri.is_empty() {
58                    continue;
59                }
60
61                let endpoint = match pipeline.resolve(uri)? {
62                    Some(e) => e,
63                    None => continue,
64                };
65
66                exchange.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri.to_string()));
67
68                let mut endpoint = endpoint;
69                exchange = endpoint.ready().await?.call(exchange).await?;
70            }
71
72            Ok(exchange)
73        })
74    }
75}
76
77#[cfg(test)]
78mod tests {
79    use super::*;
80    use camel_api::{BoxProcessor, BoxProcessorExt, Message};
81    use std::sync::Arc;
82    use std::sync::atomic::{AtomicUsize, Ordering};
83
84    fn mock_resolver() -> camel_api::EndpointResolver {
85        Arc::new(|uri: &str| {
86            if uri.starts_with("mock:") {
87                Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
88            } else {
89                None
90            }
91        })
92    }
93
94    #[tokio::test]
95    async fn routing_slip_single_destination() {
96        let call_count = Arc::new(AtomicUsize::new(0));
97        let count_clone = call_count.clone();
98
99        let resolver = Arc::new(move |uri: &str| {
100            if uri == "mock:a" {
101                let count = count_clone.clone();
102                Some(BoxProcessor::from_fn(move |ex| {
103                    count.fetch_add(1, Ordering::SeqCst);
104                    Box::pin(async move { Ok(ex) })
105                }))
106            } else {
107                None
108            }
109        });
110
111        let config = RoutingSlipConfig::new(Arc::new(|_ex: &Exchange| Some("mock:a".to_string())));
112
113        let mut svc = RoutingSlipService::new(config, resolver);
114        let ex = Exchange::new(Message::new("test"));
115        let result = svc.ready().await.unwrap().call(ex).await;
116
117        assert!(result.is_ok());
118        assert_eq!(call_count.load(Ordering::SeqCst), 1);
119    }
120
121    #[tokio::test]
122    async fn routing_slip_multiple_destinations() {
123        let call_count = Arc::new(AtomicUsize::new(0));
124        let count_clone = call_count.clone();
125
126        let resolver = Arc::new(move |uri: &str| {
127            if uri.starts_with("mock:") {
128                let count = count_clone.clone();
129                Some(BoxProcessor::from_fn(move |ex| {
130                    count.fetch_add(1, Ordering::SeqCst);
131                    Box::pin(async move { Ok(ex) })
132                }))
133            } else {
134                None
135            }
136        });
137
138        let config = RoutingSlipConfig::new(Arc::new(|_ex: &Exchange| {
139            Some("mock:a,mock:b,mock:c".to_string())
140        }));
141
142        let mut svc = RoutingSlipService::new(config, resolver);
143        let ex = Exchange::new(Message::new("test"));
144        let result = svc.ready().await.unwrap().call(ex).await;
145
146        assert!(result.is_ok());
147        assert_eq!(call_count.load(Ordering::SeqCst), 3);
148    }
149
150    #[tokio::test]
151    async fn routing_slip_empty_expression() {
152        let config = RoutingSlipConfig::new(Arc::new(|_ex: &Exchange| None));
153
154        let mut svc = RoutingSlipService::new(config, mock_resolver());
155        let ex = Exchange::new(Message::new("test"));
156        let result = svc.ready().await.unwrap().call(ex).await;
157
158        assert!(result.is_ok());
159    }
160
161    #[tokio::test]
162    async fn routing_slip_empty_string() {
163        let config = RoutingSlipConfig::new(Arc::new(|_ex: &Exchange| Some(String::new())));
164
165        let mut svc = RoutingSlipService::new(config, mock_resolver());
166        let ex = Exchange::new(Message::new("test"));
167        let result = svc.ready().await.unwrap().call(ex).await;
168
169        assert!(result.is_ok());
170    }
171
172    #[tokio::test]
173    async fn routing_slip_invalid_endpoint_error() {
174        let config = RoutingSlipConfig::new(Arc::new(|_ex: &Exchange| {
175            Some("invalid:endpoint".to_string())
176        }))
177        .ignore_invalid_endpoints(false);
178
179        let mut svc = RoutingSlipService::new(config, mock_resolver());
180        let ex = Exchange::new(Message::new("test"));
181        let result = svc.ready().await.unwrap().call(ex).await;
182
183        assert!(result.is_err());
184        assert!(result.unwrap_err().to_string().contains("Invalid endpoint"));
185    }
186
187    #[tokio::test]
188    async fn routing_slip_ignore_invalid_endpoint() {
189        let call_count = Arc::new(AtomicUsize::new(0));
190        let count_clone = call_count.clone();
191
192        let resolver = Arc::new(move |uri: &str| {
193            if uri == "mock:valid" {
194                let count = count_clone.clone();
195                Some(BoxProcessor::from_fn(move |ex| {
196                    count.fetch_add(1, Ordering::SeqCst);
197                    Box::pin(async move { Ok(ex) })
198                }))
199            } else {
200                None
201            }
202        });
203
204        let config = RoutingSlipConfig::new(Arc::new(|_ex: &Exchange| {
205            Some("invalid:endpoint,mock:valid".to_string())
206        }))
207        .ignore_invalid_endpoints(true);
208
209        let mut svc = RoutingSlipService::new(config, resolver);
210        let ex = Exchange::new(Message::new("test"));
211        let result = svc.ready().await.unwrap().call(ex).await;
212
213        assert!(result.is_ok());
214        assert_eq!(call_count.load(Ordering::SeqCst), 1);
215    }
216
217    #[tokio::test]
218    async fn routing_slip_order_preserved() {
219        use std::sync::Mutex;
220
221        let order: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
222        let order_clone = order.clone();
223
224        let resolver = Arc::new(move |uri: &str| {
225            let order = order_clone.clone();
226            let uri = uri.to_string();
227            Some(BoxProcessor::from_fn(move |ex| {
228                order.lock().unwrap().push(uri.clone());
229                Box::pin(async move { Ok(ex) })
230            }))
231        });
232
233        let config = RoutingSlipConfig::new(Arc::new(|_ex: &Exchange| {
234            Some("mock:first,mock:second,mock:third".to_string())
235        }));
236
237        let mut svc = RoutingSlipService::new(config, resolver);
238        let ex = Exchange::new(Message::new("test"));
239        svc.ready().await.unwrap().call(ex).await.unwrap();
240
241        let order = order.lock().unwrap();
242        assert_eq!(*order, vec!["mock:first", "mock:second", "mock:third"]);
243    }
244
245    #[tokio::test]
246    async fn routing_slip_endpoint_property_set() {
247        let last_uri: Arc<std::sync::Mutex<Option<String>>> = Arc::new(std::sync::Mutex::new(None));
248        let last_uri_clone = last_uri.clone();
249
250        let resolver = Arc::new(move |uri: &str| {
251            let last = last_uri_clone.clone();
252            let _uri = uri.to_string();
253            Some(BoxProcessor::from_fn(move |ex| {
254                let prop = ex.property(CAMEL_SLIP_ENDPOINT).cloned();
255                *last.lock().unwrap() = prop.and_then(|v| v.as_str().map(String::from));
256                Box::pin(async move { Ok(ex) })
257            }))
258        });
259
260        let config =
261            RoutingSlipConfig::new(Arc::new(|_ex: &Exchange| Some("mock:a,mock:b".to_string())));
262
263        let mut svc = RoutingSlipService::new(config, resolver);
264        let ex = Exchange::new(Message::new("test"));
265        svc.ready().await.unwrap().call(ex).await.unwrap();
266
267        let last = last_uri.lock().unwrap();
268        assert_eq!(last.as_deref(), Some("mock:b"));
269    }
270
271    #[tokio::test]
272    async fn routing_slip_mutation_between_steps() {
273        let resolver = Arc::new(|uri: &str| {
274            if uri == "mock:mutate" {
275                Some(BoxProcessor::from_fn(|mut ex| {
276                    ex.input.body = camel_api::Body::Text("mutated".to_string());
277                    Box::pin(async move { Ok(ex) })
278                }))
279            } else if uri == "mock:verify" {
280                Some(BoxProcessor::from_fn(|ex| {
281                    let body = ex.input.body.as_text().unwrap_or("").to_string();
282                    assert_eq!(body, "mutated");
283                    Box::pin(async move { Ok(ex) })
284                }))
285            } else {
286                None
287            }
288        });
289
290        let config = RoutingSlipConfig::new(Arc::new(|_ex: &Exchange| {
291            Some("mock:mutate,mock:verify".to_string())
292        }));
293
294        let mut svc = RoutingSlipService::new(config, resolver);
295        let ex = Exchange::new(Message::new("original"));
296        let result = svc.ready().await.unwrap().call(ex).await;
297
298        assert!(result.is_ok());
299    }
300
301    #[tokio::test]
302    async fn routing_slip_cache_hit() {
303        let resolve_count = Arc::new(AtomicUsize::new(0));
304        let resolve_clone = resolve_count.clone();
305
306        let resolver = Arc::new(move |uri: &str| {
307            if uri.starts_with("mock:") {
308                resolve_clone.fetch_add(1, Ordering::SeqCst);
309                Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
310            } else {
311                None
312            }
313        });
314
315        let call_count = Arc::new(AtomicUsize::new(0));
316        let call_clone = call_count.clone();
317
318        let config = RoutingSlipConfig::new(Arc::new(move |_ex: &Exchange| {
319            let n = call_clone.fetch_add(1, Ordering::SeqCst);
320            if n < 2 {
321                Some("mock:a,mock:b".to_string())
322            } else {
323                None
324            }
325        }));
326
327        let mut svc = RoutingSlipService::new(config, resolver);
328        let ex1 = Exchange::new(Message::new("test1"));
329        svc.ready().await.unwrap().call(ex1).await.unwrap();
330        let ex2 = Exchange::new(Message::new("test2"));
331        svc.ready().await.unwrap().call(ex2).await.unwrap();
332
333        assert_eq!(resolve_count.load(Ordering::SeqCst), 2);
334    }
335
336    #[tokio::test]
337    async fn routing_slip_custom_delimiter() {
338        let order: Arc<std::sync::Mutex<Vec<String>>> = Arc::new(std::sync::Mutex::new(Vec::new()));
339        let order_clone = order.clone();
340
341        let resolver = Arc::new(move |uri: &str| {
342            let order = order_clone.clone();
343            let uri = uri.to_string();
344            Some(BoxProcessor::from_fn(move |ex| {
345                order.lock().unwrap().push(uri.clone());
346                Box::pin(async move { Ok(ex) })
347            }))
348        });
349
350        let config = RoutingSlipConfig::new(Arc::new(|_ex: &Exchange| {
351            Some("mock:x|mock:y|mock:z".to_string())
352        }))
353        .uri_delimiter("|");
354
355        let mut svc = RoutingSlipService::new(config, resolver);
356        let ex = Exchange::new(Message::new("test"));
357        svc.ready().await.unwrap().call(ex).await.unwrap();
358
359        let order = order.lock().unwrap();
360        assert_eq!(*order, vec!["mock:x", "mock:y", "mock:z"]);
361    }
362
363    #[tokio::test]
364    async fn routing_slip_expression_evaluated_once() {
365        let expr_count = Arc::new(AtomicUsize::new(0));
366        let expr_count_clone = expr_count.clone();
367
368        let resolver = Arc::new(|uri: &str| {
369            if uri.starts_with("mock:") {
370                Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
371            } else {
372                None
373            }
374        });
375
376        let config = RoutingSlipConfig::new(Arc::new(move |_ex: &Exchange| {
377            expr_count_clone.fetch_add(1, Ordering::SeqCst);
378            Some("mock:a,mock:b".to_string())
379        }));
380
381        let mut svc = RoutingSlipService::new(config, resolver);
382        let ex = Exchange::new(Message::new("test"));
383        svc.ready().await.unwrap().call(ex).await.unwrap();
384
385        assert_eq!(
386            expr_count.load(Ordering::SeqCst),
387            1,
388            "Expression must be evaluated exactly once"
389        );
390    }
391}