camel_processor/
dynamic_router.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Instant;
5
6use tower::Service;
7use tower::ServiceExt;
8
9use camel_api::endpoint_pipeline::{CAMEL_SLIP_ENDPOINT, EndpointPipelineConfig, EndpointResolver};
10use camel_api::{CamelError, DynamicRouterConfig, Exchange, Value};
11
12use crate::endpoint_pipeline::EndpointPipelineService;
13
14#[derive(Clone)]
15pub struct DynamicRouterService {
16 config: DynamicRouterConfig,
17 pipeline: EndpointPipelineService,
18}
19
20impl DynamicRouterService {
21 pub fn new(config: DynamicRouterConfig, endpoint_resolver: EndpointResolver) -> Self {
22 let pipeline_config = EndpointPipelineConfig {
23 cache_size: EndpointPipelineConfig::from_signed(config.cache_size),
24 ignore_invalid_endpoints: config.ignore_invalid_endpoints,
25 };
26
27 Self {
28 config,
29 pipeline: EndpointPipelineService::new(endpoint_resolver, pipeline_config),
30 }
31 }
32}
33
34impl Service<Exchange> for DynamicRouterService {
35 type Response = Exchange;
36 type Error = CamelError;
37 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
38
39 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
40 Poll::Ready(Ok(()))
41 }
42
43 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
44 let config = self.config.clone();
45 let pipeline = self.pipeline.clone();
46
47 Box::pin(async move {
48 let start = Instant::now();
49 let mut iterations = 0;
50 let mut last_destinations: Option<String> = None;
51
52 loop {
53 iterations += 1;
54
55 if iterations > config.max_iterations {
56 return Err(CamelError::ProcessorError(format!(
57 "Dynamic router exceeded max iterations ({})",
58 config.max_iterations
59 )));
60 }
61
62 if let Some(timeout) = config.timeout
63 && start.elapsed() > timeout
64 {
65 return Err(CamelError::ProcessorError(format!(
66 "Dynamic router timed out after {:?}",
67 timeout
68 )));
69 }
70
71 let destinations = match (config.expression)(&exchange) {
72 None => break,
73 Some(uris) => uris,
74 };
75
76 if last_destinations.as_deref() == Some(destinations.as_str()) {
77 return Err(CamelError::ProcessorError(format!(
78 "Dynamic router detected infinite loop: expression returned the same destination '{}' on consecutive iterations. The destination endpoint must clear or update the routing header to signal completion.",
79 destinations
80 )));
81 }
82
83 last_destinations = Some(destinations.clone());
84
85 for uri in destinations.split(&config.uri_delimiter) {
86 let uri = uri.trim();
87 if uri.is_empty() {
88 continue;
89 }
90
91 let endpoint = match pipeline.resolve(uri)? {
92 Some(e) => e,
93 None => {
94 continue;
95 }
96 };
97
98 exchange.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri.to_string()));
99
100 let mut endpoint = endpoint;
101 exchange = endpoint.ready().await?.call(exchange).await?;
102 }
103 }
104
105 Ok(exchange)
106 })
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use super::*;
113 use camel_api::{BoxProcessor, BoxProcessorExt, Message};
114 use std::sync::Arc;
115 use std::sync::atomic::{AtomicUsize, Ordering};
116 use tower::ServiceExt;
117
118 fn make_config<F>(f: F) -> DynamicRouterConfig
119 where
120 F: Fn(&Exchange) -> Option<String> + Send + Sync + 'static,
121 {
122 DynamicRouterConfig::new(Arc::new(f))
123 }
124
125 fn mock_resolver() -> EndpointResolver {
126 Arc::new(|uri: &str| {
127 if uri.starts_with("mock:") {
128 Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
129 } else {
130 None
131 }
132 })
133 }
134
135 #[tokio::test]
136 async fn test_dynamic_router_single_destination() {
137 let call_count = Arc::new(AtomicUsize::new(0));
138 let count_clone = call_count.clone();
139 let expr_count = Arc::new(AtomicUsize::new(0));
140 let expr_count_clone = expr_count.clone();
141
142 let resolver = Arc::new(move |uri: &str| {
143 if uri == "mock:a" {
144 let count = count_clone.clone();
145 Some(BoxProcessor::from_fn(move |ex| {
146 count.fetch_add(1, Ordering::SeqCst);
147 Box::pin(async move { Ok(ex) })
148 }))
149 } else {
150 None
151 }
152 });
153
154 let config = DynamicRouterConfig::new(Arc::new(move |ex: &Exchange| {
155 let count = expr_count_clone.fetch_add(1, Ordering::SeqCst);
156 if count == 0 {
157 ex.input
158 .header("dest")
159 .and_then(|v| v.as_str().map(|s| s.to_string()))
160 } else {
161 None
162 }
163 }));
164
165 let mut svc = DynamicRouterService::new(config, resolver);
166
167 let mut ex = Exchange::new(Message::new("test"));
168 ex.input.set_header("dest", Value::String("mock:a".into()));
169
170 let _result = svc.ready().await.unwrap().call(ex).await.unwrap();
171 assert_eq!(call_count.load(Ordering::SeqCst), 1);
172 }
173
174 #[tokio::test]
175 async fn test_dynamic_router_loop_terminates_on_none() {
176 let iterations = Arc::new(AtomicUsize::new(0));
177 let iterations_clone = iterations.clone();
178
179 let config = DynamicRouterConfig::new(Arc::new(move |_ex: &Exchange| {
180 let count = iterations_clone.fetch_add(1, Ordering::SeqCst);
181 match count {
182 0 => Some("mock:a".to_string()),
183 1 => Some("mock:b".to_string()),
184 _ => None,
185 }
186 }));
187
188 let mut svc = DynamicRouterService::new(config, mock_resolver());
189
190 let ex = Exchange::new(Message::new("test"));
191 let result = svc.ready().await.unwrap().call(ex).await;
192
193 assert!(result.is_ok());
194 assert_eq!(iterations.load(Ordering::SeqCst), 3);
195 }
196
197 #[tokio::test]
198 async fn test_dynamic_router_max_iterations() {
199 let config = make_config(|_| Some("mock:a".to_string())).max_iterations(5);
200
201 let mut svc = DynamicRouterService::new(config, mock_resolver());
202
203 let ex = Exchange::new(Message::new("test"));
204 let result = svc.ready().await.unwrap().call(ex).await;
205
206 assert!(result.is_err());
207 let err = result.unwrap_err().to_string();
208 assert!(err.contains("infinite loop"));
209 assert!(err.contains("same destination"));
210 }
211
212 #[tokio::test]
213 async fn test_dynamic_router_detects_same_destination_loop() {
214 let config = make_config(|_| Some("mock:loop".to_string())).max_iterations(100);
215
216 let mut svc = DynamicRouterService::new(config, mock_resolver());
217
218 let ex = Exchange::new(Message::new("test"));
219 let result = svc.ready().await.unwrap().call(ex).await;
220
221 assert!(result.is_err());
222 let err = result.unwrap_err().to_string();
223 assert!(err.contains("infinite loop"));
224 assert!(err.contains("mock:loop"));
225 }
226
227 #[tokio::test]
228 async fn test_dynamic_router_invalid_endpoint_error() {
229 let config =
230 make_config(|_| Some("invalid:endpoint".to_string())).ignore_invalid_endpoints(false);
231
232 let mut svc = DynamicRouterService::new(config, mock_resolver());
233
234 let ex = Exchange::new(Message::new("test"));
235 let result = svc.ready().await.unwrap().call(ex).await;
236
237 assert!(result.is_err());
238 let err = result.unwrap_err().to_string();
239 assert!(err.contains("Invalid endpoint"));
240 }
241
242 #[tokio::test]
243 async fn test_dynamic_router_ignore_invalid_endpoint() {
244 let call_count = Arc::new(AtomicUsize::new(0));
245 let count_clone = call_count.clone();
246 let expr_count = Arc::new(AtomicUsize::new(0));
247 let expr_count_clone = expr_count.clone();
248
249 let resolver = Arc::new(move |uri: &str| {
250 if uri == "mock:valid" {
251 let count = count_clone.clone();
252 Some(BoxProcessor::from_fn(move |ex| {
253 count.fetch_add(1, Ordering::SeqCst);
254 Box::pin(async move { Ok(ex) })
255 }))
256 } else {
257 None
258 }
259 });
260
261 let config = DynamicRouterConfig::new(Arc::new(move |_ex: &Exchange| {
262 let count = expr_count_clone.fetch_add(1, Ordering::SeqCst);
263 if count == 0 {
264 Some("invalid:endpoint,mock:valid".to_string())
265 } else {
266 None
267 }
268 }))
269 .ignore_invalid_endpoints(true);
270
271 let mut svc = DynamicRouterService::new(config, resolver);
272
273 let ex = Exchange::new(Message::new("test"));
274 let result = svc.ready().await.unwrap().call(ex).await;
275
276 assert!(result.is_ok());
277 assert_eq!(call_count.load(Ordering::SeqCst), 1);
278 }
279
280 #[tokio::test]
281 async fn test_dynamic_router_cache_size_enforced() {
282 let resolver_call_count = Arc::new(AtomicUsize::new(0));
284 let count_clone = resolver_call_count.clone();
285
286 let resolver: EndpointResolver = Arc::new(move |uri: &str| {
287 if uri.starts_with("mock:") {
288 count_clone.fetch_add(1, Ordering::SeqCst);
289 Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
290 } else {
291 None
292 }
293 });
294
295 let expr_count = Arc::new(AtomicUsize::new(0));
297 let expr_count_clone = expr_count.clone();
298 let config = DynamicRouterConfig::new(Arc::new(move |_ex: &Exchange| {
299 let n = expr_count_clone.fetch_add(1, Ordering::SeqCst);
300 match n {
301 0 => Some("mock:a".to_string()),
302 1 => Some("mock:b".to_string()),
303 2 => Some("mock:c".to_string()),
304 _ => None,
305 }
306 }))
307 .cache_size(2);
308
309 let mut svc = DynamicRouterService::new(config, resolver);
310
311 let ex = Exchange::new(Message::new("test"));
312 svc.ready().await.unwrap().call(ex).await.unwrap();
313
314 assert_eq!(resolver_call_count.load(Ordering::SeqCst), 3);
317 }
318}