Skip to main content

camel_processor/
dynamic_router.rs

1use std::collections::{HashMap, VecDeque};
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::sync::Mutex;
6use std::task::{Context, Poll};
7use std::time::Instant;
8
9use tower::Service;
10use tower::ServiceExt;
11
12use camel_api::{BoxProcessor, CamelError, DynamicRouterConfig, Exchange, Value};
13
14const CAMEL_SLIP_ENDPOINT: &str = "CamelSlipEndpoint";
15
16pub type EndpointResolver = Arc<dyn Fn(&str) -> Option<BoxProcessor> + Send + Sync>;
17
18struct EndpointCache {
19    map: HashMap<String, BoxProcessor>,
20    order: VecDeque<String>,
21}
22
23impl EndpointCache {
24    fn new() -> Self {
25        Self {
26            map: HashMap::new(),
27            order: VecDeque::new(),
28        }
29    }
30
31    fn get(&self, uri: &str) -> Option<BoxProcessor> {
32        self.map.get(uri).cloned()
33    }
34
35    /// Insert `uri -> endpoint`, evicting the oldest entry when at capacity.
36    fn insert(&mut self, uri: String, endpoint: BoxProcessor, capacity: usize) {
37        if self.map.contains_key(&uri) {
38            return;
39        }
40        if self.map.len() >= capacity
41            && let Some(oldest) = self.order.pop_front()
42        {
43            self.map.remove(&oldest);
44        }
45        self.order.push_back(uri.clone());
46        self.map.insert(uri, endpoint);
47    }
48}
49
50#[derive(Clone)]
51pub struct DynamicRouterService {
52    config: DynamicRouterConfig,
53    endpoint_resolver: EndpointResolver,
54    endpoint_cache: Arc<Mutex<EndpointCache>>,
55}
56
57impl DynamicRouterService {
58    pub fn new(config: DynamicRouterConfig, endpoint_resolver: EndpointResolver) -> Self {
59        Self {
60            config,
61            endpoint_resolver,
62            endpoint_cache: Arc::new(Mutex::new(EndpointCache::new())),
63        }
64    }
65}
66
67impl Service<Exchange> for DynamicRouterService {
68    type Response = Exchange;
69    type Error = CamelError;
70    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
71
72    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
73        Poll::Ready(Ok(()))
74    }
75
76    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
77        let config = self.config.clone();
78        let resolver = self.endpoint_resolver.clone();
79        let cache = self.endpoint_cache.clone();
80
81        Box::pin(async move {
82            let start = Instant::now();
83            let mut iterations = 0;
84
85            loop {
86                iterations += 1;
87
88                if iterations > config.max_iterations {
89                    return Err(CamelError::ProcessorError(format!(
90                        "Dynamic router exceeded max iterations ({})",
91                        config.max_iterations
92                    )));
93                }
94
95                if let Some(timeout) = config.timeout
96                    && start.elapsed() > timeout
97                {
98                    return Err(CamelError::ProcessorError(format!(
99                        "Dynamic router timed out after {:?}",
100                        timeout
101                    )));
102                }
103
104                let destinations = match (config.expression)(&exchange) {
105                    None => break,
106                    Some(uris) => uris,
107                };
108
109                for uri in destinations.split(&config.uri_delimiter) {
110                    let uri = uri.trim();
111                    if uri.is_empty() {
112                        continue;
113                    }
114
115                    let endpoint = {
116                        let cache_guard = cache.lock().unwrap();
117                        cache_guard.get(uri)
118                    };
119
120                    let endpoint = match endpoint {
121                        Some(e) => e,
122                        None => {
123                            let e = match (resolver)(uri) {
124                                Some(e) => e,
125                                None => {
126                                    if config.ignore_invalid_endpoints {
127                                        continue;
128                                    } else {
129                                        return Err(CamelError::ProcessorError(format!(
130                                            "Invalid endpoint: {}",
131                                            uri
132                                        )));
133                                    }
134                                }
135                            };
136                            if config.cache_size > 0 {
137                                let mut cache_guard = cache.lock().unwrap();
138                                cache_guard.insert(
139                                    uri.to_string(),
140                                    e.clone(),
141                                    config.cache_size as usize,
142                                );
143                            }
144                            e
145                        }
146                    };
147
148                    exchange.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri.to_string()));
149
150                    let mut endpoint = endpoint;
151                    exchange = endpoint.ready().await?.call(exchange).await?;
152                }
153            }
154
155            Ok(exchange)
156        })
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163    use camel_api::{BoxProcessorExt, Message};
164    use std::sync::atomic::{AtomicUsize, Ordering};
165    use tower::ServiceExt;
166
167    fn make_config<F>(f: F) -> DynamicRouterConfig
168    where
169        F: Fn(&Exchange) -> Option<String> + Send + Sync + 'static,
170    {
171        DynamicRouterConfig::new(Arc::new(f))
172    }
173
174    fn mock_resolver() -> EndpointResolver {
175        Arc::new(|uri: &str| {
176            if uri.starts_with("mock:") {
177                Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
178            } else {
179                None
180            }
181        })
182    }
183
184    #[tokio::test]
185    async fn test_dynamic_router_single_destination() {
186        let call_count = Arc::new(AtomicUsize::new(0));
187        let count_clone = call_count.clone();
188        let expr_count = Arc::new(AtomicUsize::new(0));
189        let expr_count_clone = expr_count.clone();
190
191        let resolver = Arc::new(move |uri: &str| {
192            if uri == "mock:a" {
193                let count = count_clone.clone();
194                Some(BoxProcessor::from_fn(move |ex| {
195                    count.fetch_add(1, Ordering::SeqCst);
196                    Box::pin(async move { Ok(ex) })
197                }))
198            } else {
199                None
200            }
201        });
202
203        let config = DynamicRouterConfig::new(Arc::new(move |ex: &Exchange| {
204            let count = expr_count_clone.fetch_add(1, Ordering::SeqCst);
205            if count == 0 {
206                ex.input
207                    .header("dest")
208                    .and_then(|v| v.as_str().map(|s| s.to_string()))
209            } else {
210                None
211            }
212        }));
213
214        let mut svc = DynamicRouterService::new(config, resolver);
215
216        let mut ex = Exchange::new(Message::new("test"));
217        ex.input.set_header("dest", Value::String("mock:a".into()));
218
219        let _result = svc.ready().await.unwrap().call(ex).await.unwrap();
220        assert_eq!(call_count.load(Ordering::SeqCst), 1);
221    }
222
223    #[tokio::test]
224    async fn test_dynamic_router_loop_terminates_on_none() {
225        let iterations = Arc::new(AtomicUsize::new(0));
226        let iterations_clone = iterations.clone();
227
228        let config = DynamicRouterConfig::new(Arc::new(move |_ex: &Exchange| {
229            let count = iterations_clone.fetch_add(1, Ordering::SeqCst);
230            if count < 2 {
231                Some("mock:a".to_string())
232            } else {
233                None
234            }
235        }));
236
237        let mut svc = DynamicRouterService::new(config, mock_resolver());
238
239        let ex = Exchange::new(Message::new("test"));
240        let result = svc.ready().await.unwrap().call(ex).await;
241
242        assert!(result.is_ok());
243        assert_eq!(iterations.load(Ordering::SeqCst), 3);
244    }
245
246    #[tokio::test]
247    async fn test_dynamic_router_max_iterations() {
248        let config = make_config(|_| Some("mock:a".to_string())).max_iterations(5);
249
250        let mut svc = DynamicRouterService::new(config, mock_resolver());
251
252        let ex = Exchange::new(Message::new("test"));
253        let result = svc.ready().await.unwrap().call(ex).await;
254
255        assert!(result.is_err());
256        let err = result.unwrap_err().to_string();
257        assert!(err.contains("max iterations"));
258    }
259
260    #[tokio::test]
261    async fn test_dynamic_router_invalid_endpoint_error() {
262        let config =
263            make_config(|_| Some("invalid:endpoint".to_string())).ignore_invalid_endpoints(false);
264
265        let mut svc = DynamicRouterService::new(config, mock_resolver());
266
267        let ex = Exchange::new(Message::new("test"));
268        let result = svc.ready().await.unwrap().call(ex).await;
269
270        assert!(result.is_err());
271        let err = result.unwrap_err().to_string();
272        assert!(err.contains("Invalid endpoint"));
273    }
274
275    #[tokio::test]
276    async fn test_dynamic_router_ignore_invalid_endpoint() {
277        let call_count = Arc::new(AtomicUsize::new(0));
278        let count_clone = call_count.clone();
279        let expr_count = Arc::new(AtomicUsize::new(0));
280        let expr_count_clone = expr_count.clone();
281
282        let resolver = Arc::new(move |uri: &str| {
283            if uri == "mock:valid" {
284                let count = count_clone.clone();
285                Some(BoxProcessor::from_fn(move |ex| {
286                    count.fetch_add(1, Ordering::SeqCst);
287                    Box::pin(async move { Ok(ex) })
288                }))
289            } else {
290                None
291            }
292        });
293
294        let config = DynamicRouterConfig::new(Arc::new(move |_ex: &Exchange| {
295            let count = expr_count_clone.fetch_add(1, Ordering::SeqCst);
296            if count == 0 {
297                Some("invalid:endpoint,mock:valid".to_string())
298            } else {
299                None
300            }
301        }))
302        .ignore_invalid_endpoints(true);
303
304        let mut svc = DynamicRouterService::new(config, resolver);
305
306        let ex = Exchange::new(Message::new("test"));
307        let result = svc.ready().await.unwrap().call(ex).await;
308
309        assert!(result.is_ok());
310        assert_eq!(call_count.load(Ordering::SeqCst), 1);
311    }
312
313    #[tokio::test]
314    async fn test_dynamic_router_cache_size_enforced() {
315        // Verify that the cache never exceeds the configured capacity.
316        let resolver_call_count = Arc::new(AtomicUsize::new(0));
317        let count_clone = resolver_call_count.clone();
318
319        let resolver: EndpointResolver = Arc::new(move |uri: &str| {
320            if uri.starts_with("mock:") {
321                count_clone.fetch_add(1, Ordering::SeqCst);
322                Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
323            } else {
324                None
325            }
326        });
327
328        // Cache capacity of 2; we will route through 3 distinct URIs.
329        let expr_count = Arc::new(AtomicUsize::new(0));
330        let expr_count_clone = expr_count.clone();
331        let config = DynamicRouterConfig::new(Arc::new(move |_ex: &Exchange| {
332            let n = expr_count_clone.fetch_add(1, Ordering::SeqCst);
333            match n {
334                0 => Some("mock:a".to_string()),
335                1 => Some("mock:b".to_string()),
336                2 => Some("mock:c".to_string()),
337                _ => None,
338            }
339        }))
340        .cache_size(2);
341
342        let mut svc = DynamicRouterService::new(config, resolver);
343
344        let ex = Exchange::new(Message::new("test"));
345        svc.ready().await.unwrap().call(ex).await.unwrap();
346
347        // The cache capacity is 2 so mock:a, mock:b, mock:c each required a resolver call
348        // (mock:a is evicted before mock:c is inserted). All three resolver calls must have happened.
349        assert_eq!(resolver_call_count.load(Ordering::SeqCst), 3);
350    }
351}