Skip to main content

camel_processor/
dynamic_router.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Instant;
5
6use tower::Service;
7use tower::ServiceExt;
8
9use camel_api::endpoint_pipeline::{CAMEL_SLIP_ENDPOINT, EndpointPipelineConfig, EndpointResolver};
10use camel_api::{CamelError, DynamicRouterConfig, Exchange, Value};
11
12use crate::endpoint_pipeline::EndpointPipelineService;
13
14#[derive(Clone)]
15pub struct DynamicRouterService {
16    config: DynamicRouterConfig,
17    pipeline: EndpointPipelineService,
18}
19
20impl DynamicRouterService {
21    pub fn new(config: DynamicRouterConfig, endpoint_resolver: EndpointResolver) -> Self {
22        let pipeline_config = EndpointPipelineConfig {
23            cache_size: EndpointPipelineConfig::from_signed(config.cache_size),
24            ignore_invalid_endpoints: config.ignore_invalid_endpoints,
25        };
26
27        Self {
28            config,
29            pipeline: EndpointPipelineService::new(endpoint_resolver, pipeline_config),
30        }
31    }
32}
33
34impl Service<Exchange> for DynamicRouterService {
35    type Response = Exchange;
36    type Error = CamelError;
37    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
38
39    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
40        Poll::Ready(Ok(()))
41    }
42
43    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
44        let config = self.config.clone();
45        let pipeline = self.pipeline.clone();
46
47        Box::pin(async move {
48            let start = Instant::now();
49            let mut iterations = 0;
50            let mut last_destinations: Option<String> = None;
51
52            loop {
53                iterations += 1;
54
55                if iterations > config.max_iterations {
56                    return Err(CamelError::ProcessorError(format!(
57                        "Dynamic router exceeded max iterations ({})",
58                        config.max_iterations
59                    )));
60                }
61
62                if let Some(timeout) = config.timeout
63                    && start.elapsed() > timeout
64                {
65                    return Err(CamelError::ProcessorError(format!(
66                        "Dynamic router timed out after {:?}",
67                        timeout
68                    )));
69                }
70
71                let destinations = match (config.expression)(&exchange) {
72                    None => break,
73                    Some(uris) => uris,
74                };
75
76                if last_destinations.as_deref() == Some(destinations.as_str()) {
77                    return Err(CamelError::ProcessorError(format!(
78                        "Dynamic router detected infinite loop: expression returned the same destination '{}' on consecutive iterations. The destination endpoint must clear or update the routing header to signal completion.",
79                        destinations
80                    )));
81                }
82
83                last_destinations = Some(destinations.clone());
84
85                for uri in destinations.split(&config.uri_delimiter) {
86                    let uri = uri.trim();
87                    if uri.is_empty() {
88                        continue;
89                    }
90
91                    let endpoint = match pipeline.resolve(uri)? {
92                        Some(e) => e,
93                        None => {
94                            continue;
95                        }
96                    };
97
98                    exchange.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri.to_string()));
99
100                    let mut endpoint = endpoint;
101                    exchange = endpoint.ready().await?.call(exchange).await?;
102                }
103            }
104
105            Ok(exchange)
106        })
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use super::*;
113    use camel_api::{BoxProcessor, BoxProcessorExt, Message};
114    use std::sync::Arc;
115    use std::sync::atomic::{AtomicUsize, Ordering};
116    use tower::ServiceExt;
117
118    fn make_config<F>(f: F) -> DynamicRouterConfig
119    where
120        F: Fn(&Exchange) -> Option<String> + Send + Sync + 'static,
121    {
122        DynamicRouterConfig::new(Arc::new(f))
123    }
124
125    fn mock_resolver() -> EndpointResolver {
126        Arc::new(|uri: &str| {
127            if uri.starts_with("mock:") {
128                Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
129            } else {
130                None
131            }
132        })
133    }
134
135    #[tokio::test]
136    async fn test_dynamic_router_single_destination() {
137        let call_count = Arc::new(AtomicUsize::new(0));
138        let count_clone = call_count.clone();
139        let expr_count = Arc::new(AtomicUsize::new(0));
140        let expr_count_clone = expr_count.clone();
141
142        let resolver = Arc::new(move |uri: &str| {
143            if uri == "mock:a" {
144                let count = count_clone.clone();
145                Some(BoxProcessor::from_fn(move |ex| {
146                    count.fetch_add(1, Ordering::SeqCst);
147                    Box::pin(async move { Ok(ex) })
148                }))
149            } else {
150                None
151            }
152        });
153
154        let config = DynamicRouterConfig::new(Arc::new(move |ex: &Exchange| {
155            let count = expr_count_clone.fetch_add(1, Ordering::SeqCst);
156            if count == 0 {
157                ex.input
158                    .header("dest")
159                    .and_then(|v| v.as_str().map(|s| s.to_string()))
160            } else {
161                None
162            }
163        }));
164
165        let mut svc = DynamicRouterService::new(config, resolver);
166
167        let mut ex = Exchange::new(Message::new("test"));
168        ex.input.set_header("dest", Value::String("mock:a".into()));
169
170        let _result = svc.ready().await.unwrap().call(ex).await.unwrap();
171        assert_eq!(call_count.load(Ordering::SeqCst), 1);
172    }
173
174    #[tokio::test]
175    async fn test_dynamic_router_loop_terminates_on_none() {
176        let iterations = Arc::new(AtomicUsize::new(0));
177        let iterations_clone = iterations.clone();
178
179        let config = DynamicRouterConfig::new(Arc::new(move |_ex: &Exchange| {
180            let count = iterations_clone.fetch_add(1, Ordering::SeqCst);
181            match count {
182                0 => Some("mock:a".to_string()),
183                1 => Some("mock:b".to_string()),
184                _ => None,
185            }
186        }));
187
188        let mut svc = DynamicRouterService::new(config, mock_resolver());
189
190        let ex = Exchange::new(Message::new("test"));
191        let result = svc.ready().await.unwrap().call(ex).await;
192
193        assert!(result.is_ok());
194        assert_eq!(iterations.load(Ordering::SeqCst), 3);
195    }
196
197    #[tokio::test]
198    async fn test_dynamic_router_max_iterations() {
199        let config = make_config(|_| Some("mock:a".to_string())).max_iterations(5);
200
201        let mut svc = DynamicRouterService::new(config, mock_resolver());
202
203        let ex = Exchange::new(Message::new("test"));
204        let result = svc.ready().await.unwrap().call(ex).await;
205
206        assert!(result.is_err());
207        let err = result.unwrap_err().to_string();
208        assert!(err.contains("infinite loop"));
209        assert!(err.contains("same destination"));
210    }
211
212    #[tokio::test]
213    async fn test_dynamic_router_detects_same_destination_loop() {
214        let config = make_config(|_| Some("mock:loop".to_string())).max_iterations(100);
215
216        let mut svc = DynamicRouterService::new(config, mock_resolver());
217
218        let ex = Exchange::new(Message::new("test"));
219        let result = svc.ready().await.unwrap().call(ex).await;
220
221        assert!(result.is_err());
222        let err = result.unwrap_err().to_string();
223        assert!(err.contains("infinite loop"));
224        assert!(err.contains("mock:loop"));
225    }
226
227    #[tokio::test]
228    async fn test_dynamic_router_invalid_endpoint_error() {
229        let config =
230            make_config(|_| Some("invalid:endpoint".to_string())).ignore_invalid_endpoints(false);
231
232        let mut svc = DynamicRouterService::new(config, mock_resolver());
233
234        let ex = Exchange::new(Message::new("test"));
235        let result = svc.ready().await.unwrap().call(ex).await;
236
237        assert!(result.is_err());
238        let err = result.unwrap_err().to_string();
239        assert!(err.contains("Invalid endpoint"));
240    }
241
242    #[tokio::test]
243    async fn test_dynamic_router_ignore_invalid_endpoint() {
244        let call_count = Arc::new(AtomicUsize::new(0));
245        let count_clone = call_count.clone();
246        let expr_count = Arc::new(AtomicUsize::new(0));
247        let expr_count_clone = expr_count.clone();
248
249        let resolver = Arc::new(move |uri: &str| {
250            if uri == "mock:valid" {
251                let count = count_clone.clone();
252                Some(BoxProcessor::from_fn(move |ex| {
253                    count.fetch_add(1, Ordering::SeqCst);
254                    Box::pin(async move { Ok(ex) })
255                }))
256            } else {
257                None
258            }
259        });
260
261        let config = DynamicRouterConfig::new(Arc::new(move |_ex: &Exchange| {
262            let count = expr_count_clone.fetch_add(1, Ordering::SeqCst);
263            if count == 0 {
264                Some("invalid:endpoint,mock:valid".to_string())
265            } else {
266                None
267            }
268        }))
269        .ignore_invalid_endpoints(true);
270
271        let mut svc = DynamicRouterService::new(config, resolver);
272
273        let ex = Exchange::new(Message::new("test"));
274        let result = svc.ready().await.unwrap().call(ex).await;
275
276        assert!(result.is_ok());
277        assert_eq!(call_count.load(Ordering::SeqCst), 1);
278    }
279
280    #[tokio::test]
281    async fn test_dynamic_router_cache_size_enforced() {
282        // Verify that the cache never exceeds the configured capacity.
283        let resolver_call_count = Arc::new(AtomicUsize::new(0));
284        let count_clone = resolver_call_count.clone();
285
286        let resolver: EndpointResolver = Arc::new(move |uri: &str| {
287            if uri.starts_with("mock:") {
288                count_clone.fetch_add(1, Ordering::SeqCst);
289                Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
290            } else {
291                None
292            }
293        });
294
295        // Cache capacity of 2; we will route through 3 distinct URIs.
296        let expr_count = Arc::new(AtomicUsize::new(0));
297        let expr_count_clone = expr_count.clone();
298        let config = DynamicRouterConfig::new(Arc::new(move |_ex: &Exchange| {
299            let n = expr_count_clone.fetch_add(1, Ordering::SeqCst);
300            match n {
301                0 => Some("mock:a".to_string()),
302                1 => Some("mock:b".to_string()),
303                2 => Some("mock:c".to_string()),
304                _ => None,
305            }
306        }))
307        .cache_size(2);
308
309        let mut svc = DynamicRouterService::new(config, resolver);
310
311        let ex = Exchange::new(Message::new("test"));
312        svc.ready().await.unwrap().call(ex).await.unwrap();
313
314        // The cache capacity is 2 so mock:a, mock:b, mock:c each required a resolver call
315        // (mock:a is evicted before mock:c is inserted). All three resolver calls must have happened.
316        assert_eq!(resolver_call_count.load(Ordering::SeqCst), 3);
317    }
318}