Skip to main content

camel_processor/
dynamic_router.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Instant;
5
6use tower::Service;
7use tower::ServiceExt;
8
9use camel_api::endpoint_pipeline::{CAMEL_SLIP_ENDPOINT, EndpointPipelineConfig, EndpointResolver};
10use camel_api::{CamelError, DynamicRouterConfig, Exchange, Value};
11
12use crate::endpoint_pipeline::EndpointPipelineService;
13
14#[derive(Clone)]
15pub struct DynamicRouterService {
16    config: DynamicRouterConfig,
17    pipeline: EndpointPipelineService,
18}
19
20impl DynamicRouterService {
21    pub fn new(config: DynamicRouterConfig, endpoint_resolver: EndpointResolver) -> Self {
22        let pipeline_config = EndpointPipelineConfig {
23            cache_size: EndpointPipelineConfig::from_signed(config.cache_size),
24            ignore_invalid_endpoints: config.ignore_invalid_endpoints,
25        };
26
27        Self {
28            config,
29            pipeline: EndpointPipelineService::new(endpoint_resolver, pipeline_config),
30        }
31    }
32}
33
34impl Service<Exchange> for DynamicRouterService {
35    type Response = Exchange;
36    type Error = CamelError;
37    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
38
39    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
40        Poll::Ready(Ok(()))
41    }
42
43    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
44        let config = self.config.clone();
45        let pipeline = self.pipeline.clone();
46
47        Box::pin(async move {
48            let start = Instant::now();
49            let mut iterations = 0;
50
51            loop {
52                iterations += 1;
53
54                if iterations > config.max_iterations {
55                    return Err(CamelError::ProcessorError(format!(
56                        "Dynamic router exceeded max iterations ({})",
57                        config.max_iterations
58                    )));
59                }
60
61                if let Some(timeout) = config.timeout
62                    && start.elapsed() > timeout
63                {
64                    return Err(CamelError::ProcessorError(format!(
65                        "Dynamic router timed out after {:?}",
66                        timeout
67                    )));
68                }
69
70                let destinations = match (config.expression)(&exchange) {
71                    None => break,
72                    Some(uris) => uris,
73                };
74
75                for uri in destinations.split(&config.uri_delimiter) {
76                    let uri = uri.trim();
77                    if uri.is_empty() {
78                        continue;
79                    }
80
81                    let endpoint = match pipeline.resolve(uri)? {
82                        Some(e) => e,
83                        None => {
84                            continue;
85                        }
86                    };
87
88                    exchange.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri.to_string()));
89
90                    let mut endpoint = endpoint;
91                    exchange = endpoint.ready().await?.call(exchange).await?;
92                }
93            }
94
95            Ok(exchange)
96        })
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103    use camel_api::{BoxProcessor, BoxProcessorExt, Message};
104    use std::sync::Arc;
105    use std::sync::atomic::{AtomicUsize, Ordering};
106    use tower::ServiceExt;
107
108    fn make_config<F>(f: F) -> DynamicRouterConfig
109    where
110        F: Fn(&Exchange) -> Option<String> + Send + Sync + 'static,
111    {
112        DynamicRouterConfig::new(Arc::new(f))
113    }
114
115    fn mock_resolver() -> EndpointResolver {
116        Arc::new(|uri: &str| {
117            if uri.starts_with("mock:") {
118                Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
119            } else {
120                None
121            }
122        })
123    }
124
125    #[tokio::test]
126    async fn test_dynamic_router_single_destination() {
127        let call_count = Arc::new(AtomicUsize::new(0));
128        let count_clone = call_count.clone();
129        let expr_count = Arc::new(AtomicUsize::new(0));
130        let expr_count_clone = expr_count.clone();
131
132        let resolver = Arc::new(move |uri: &str| {
133            if uri == "mock:a" {
134                let count = count_clone.clone();
135                Some(BoxProcessor::from_fn(move |ex| {
136                    count.fetch_add(1, Ordering::SeqCst);
137                    Box::pin(async move { Ok(ex) })
138                }))
139            } else {
140                None
141            }
142        });
143
144        let config = DynamicRouterConfig::new(Arc::new(move |ex: &Exchange| {
145            let count = expr_count_clone.fetch_add(1, Ordering::SeqCst);
146            if count == 0 {
147                ex.input
148                    .header("dest")
149                    .and_then(|v| v.as_str().map(|s| s.to_string()))
150            } else {
151                None
152            }
153        }));
154
155        let mut svc = DynamicRouterService::new(config, resolver);
156
157        let mut ex = Exchange::new(Message::new("test"));
158        ex.input.set_header("dest", Value::String("mock:a".into()));
159
160        let _result = svc.ready().await.unwrap().call(ex).await.unwrap();
161        assert_eq!(call_count.load(Ordering::SeqCst), 1);
162    }
163
164    #[tokio::test]
165    async fn test_dynamic_router_loop_terminates_on_none() {
166        let iterations = Arc::new(AtomicUsize::new(0));
167        let iterations_clone = iterations.clone();
168
169        let config = DynamicRouterConfig::new(Arc::new(move |_ex: &Exchange| {
170            let count = iterations_clone.fetch_add(1, Ordering::SeqCst);
171            if count < 2 {
172                Some("mock:a".to_string())
173            } else {
174                None
175            }
176        }));
177
178        let mut svc = DynamicRouterService::new(config, mock_resolver());
179
180        let ex = Exchange::new(Message::new("test"));
181        let result = svc.ready().await.unwrap().call(ex).await;
182
183        assert!(result.is_ok());
184        assert_eq!(iterations.load(Ordering::SeqCst), 3);
185    }
186
187    #[tokio::test]
188    async fn test_dynamic_router_max_iterations() {
189        let config = make_config(|_| Some("mock:a".to_string())).max_iterations(5);
190
191        let mut svc = DynamicRouterService::new(config, mock_resolver());
192
193        let ex = Exchange::new(Message::new("test"));
194        let result = svc.ready().await.unwrap().call(ex).await;
195
196        assert!(result.is_err());
197        let err = result.unwrap_err().to_string();
198        assert!(err.contains("max iterations"));
199    }
200
201    #[tokio::test]
202    async fn test_dynamic_router_invalid_endpoint_error() {
203        let config =
204            make_config(|_| Some("invalid:endpoint".to_string())).ignore_invalid_endpoints(false);
205
206        let mut svc = DynamicRouterService::new(config, mock_resolver());
207
208        let ex = Exchange::new(Message::new("test"));
209        let result = svc.ready().await.unwrap().call(ex).await;
210
211        assert!(result.is_err());
212        let err = result.unwrap_err().to_string();
213        assert!(err.contains("Invalid endpoint"));
214    }
215
216    #[tokio::test]
217    async fn test_dynamic_router_ignore_invalid_endpoint() {
218        let call_count = Arc::new(AtomicUsize::new(0));
219        let count_clone = call_count.clone();
220        let expr_count = Arc::new(AtomicUsize::new(0));
221        let expr_count_clone = expr_count.clone();
222
223        let resolver = Arc::new(move |uri: &str| {
224            if uri == "mock:valid" {
225                let count = count_clone.clone();
226                Some(BoxProcessor::from_fn(move |ex| {
227                    count.fetch_add(1, Ordering::SeqCst);
228                    Box::pin(async move { Ok(ex) })
229                }))
230            } else {
231                None
232            }
233        });
234
235        let config = DynamicRouterConfig::new(Arc::new(move |_ex: &Exchange| {
236            let count = expr_count_clone.fetch_add(1, Ordering::SeqCst);
237            if count == 0 {
238                Some("invalid:endpoint,mock:valid".to_string())
239            } else {
240                None
241            }
242        }))
243        .ignore_invalid_endpoints(true);
244
245        let mut svc = DynamicRouterService::new(config, resolver);
246
247        let ex = Exchange::new(Message::new("test"));
248        let result = svc.ready().await.unwrap().call(ex).await;
249
250        assert!(result.is_ok());
251        assert_eq!(call_count.load(Ordering::SeqCst), 1);
252    }
253
254    #[tokio::test]
255    async fn test_dynamic_router_cache_size_enforced() {
256        // Verify that the cache never exceeds the configured capacity.
257        let resolver_call_count = Arc::new(AtomicUsize::new(0));
258        let count_clone = resolver_call_count.clone();
259
260        let resolver: EndpointResolver = Arc::new(move |uri: &str| {
261            if uri.starts_with("mock:") {
262                count_clone.fetch_add(1, Ordering::SeqCst);
263                Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
264            } else {
265                None
266            }
267        });
268
269        // Cache capacity of 2; we will route through 3 distinct URIs.
270        let expr_count = Arc::new(AtomicUsize::new(0));
271        let expr_count_clone = expr_count.clone();
272        let config = DynamicRouterConfig::new(Arc::new(move |_ex: &Exchange| {
273            let n = expr_count_clone.fetch_add(1, Ordering::SeqCst);
274            match n {
275                0 => Some("mock:a".to_string()),
276                1 => Some("mock:b".to_string()),
277                2 => Some("mock:c".to_string()),
278                _ => None,
279            }
280        }))
281        .cache_size(2);
282
283        let mut svc = DynamicRouterService::new(config, resolver);
284
285        let ex = Exchange::new(Message::new("test"));
286        svc.ready().await.unwrap().call(ex).await.unwrap();
287
288        // The cache capacity is 2 so mock:a, mock:b, mock:c each required a resolver call
289        // (mock:a is evicted before mock:c is inserted). All three resolver calls must have happened.
290        assert_eq!(resolver_call_count.load(Ordering::SeqCst), 3);
291    }
292}