camel_processor/
dynamic_router.rs1use std::collections::{HashMap, VecDeque};
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::sync::Mutex;
6use std::task::{Context, Poll};
7use std::time::Instant;
8
9use tower::Service;
10use tower::ServiceExt;
11
12use camel_api::{BoxProcessor, CamelError, DynamicRouterConfig, Exchange, Value};
13
14const CAMEL_SLIP_ENDPOINT: &str = "CamelSlipEndpoint";
15
16pub type EndpointResolver = Arc<dyn Fn(&str) -> Option<BoxProcessor> + Send + Sync>;
17
18struct EndpointCache {
19 map: HashMap<String, BoxProcessor>,
20 order: VecDeque<String>,
21}
22
23impl EndpointCache {
24 fn new() -> Self {
25 Self {
26 map: HashMap::new(),
27 order: VecDeque::new(),
28 }
29 }
30
31 fn get(&self, uri: &str) -> Option<BoxProcessor> {
32 self.map.get(uri).cloned()
33 }
34
35 fn insert(&mut self, uri: String, endpoint: BoxProcessor, capacity: usize) {
37 if self.map.contains_key(&uri) {
38 return;
39 }
40 if self.map.len() >= capacity
41 && let Some(oldest) = self.order.pop_front()
42 {
43 self.map.remove(&oldest);
44 }
45 self.order.push_back(uri.clone());
46 self.map.insert(uri, endpoint);
47 }
48}
49
50#[derive(Clone)]
51pub struct DynamicRouterService {
52 config: DynamicRouterConfig,
53 endpoint_resolver: EndpointResolver,
54 endpoint_cache: Arc<Mutex<EndpointCache>>,
55}
56
57impl DynamicRouterService {
58 pub fn new(config: DynamicRouterConfig, endpoint_resolver: EndpointResolver) -> Self {
59 Self {
60 config,
61 endpoint_resolver,
62 endpoint_cache: Arc::new(Mutex::new(EndpointCache::new())),
63 }
64 }
65}
66
67impl Service<Exchange> for DynamicRouterService {
68 type Response = Exchange;
69 type Error = CamelError;
70 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
71
72 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
73 Poll::Ready(Ok(()))
74 }
75
76 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
77 let config = self.config.clone();
78 let resolver = self.endpoint_resolver.clone();
79 let cache = self.endpoint_cache.clone();
80
81 Box::pin(async move {
82 let start = Instant::now();
83 let mut iterations = 0;
84
85 loop {
86 iterations += 1;
87
88 if iterations > config.max_iterations {
89 return Err(CamelError::ProcessorError(format!(
90 "Dynamic router exceeded max iterations ({})",
91 config.max_iterations
92 )));
93 }
94
95 if let Some(timeout) = config.timeout
96 && start.elapsed() > timeout
97 {
98 return Err(CamelError::ProcessorError(format!(
99 "Dynamic router timed out after {:?}",
100 timeout
101 )));
102 }
103
104 let destinations = match (config.expression)(&exchange) {
105 None => break,
106 Some(uris) => uris,
107 };
108
109 for uri in destinations.split(&config.uri_delimiter) {
110 let uri = uri.trim();
111 if uri.is_empty() {
112 continue;
113 }
114
115 let endpoint = {
116 let cache_guard = cache.lock().unwrap();
117 cache_guard.get(uri)
118 };
119
120 let endpoint = match endpoint {
121 Some(e) => e,
122 None => {
123 let e = match (resolver)(uri) {
124 Some(e) => e,
125 None => {
126 if config.ignore_invalid_endpoints {
127 continue;
128 } else {
129 return Err(CamelError::ProcessorError(format!(
130 "Invalid endpoint: {}",
131 uri
132 )));
133 }
134 }
135 };
136 if config.cache_size > 0 {
137 let mut cache_guard = cache.lock().unwrap();
138 cache_guard.insert(
139 uri.to_string(),
140 e.clone(),
141 config.cache_size as usize,
142 );
143 }
144 e
145 }
146 };
147
148 exchange.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri.to_string()));
149
150 let mut endpoint = endpoint;
151 exchange = endpoint.ready().await?.call(exchange).await?;
152 }
153 }
154
155 Ok(exchange)
156 })
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163 use camel_api::{BoxProcessorExt, Message};
164 use std::sync::atomic::{AtomicUsize, Ordering};
165 use tower::ServiceExt;
166
167 fn make_config<F>(f: F) -> DynamicRouterConfig
168 where
169 F: Fn(&Exchange) -> Option<String> + Send + Sync + 'static,
170 {
171 DynamicRouterConfig::new(Arc::new(f))
172 }
173
174 fn mock_resolver() -> EndpointResolver {
175 Arc::new(|uri: &str| {
176 if uri.starts_with("mock:") {
177 Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
178 } else {
179 None
180 }
181 })
182 }
183
184 #[tokio::test]
185 async fn test_dynamic_router_single_destination() {
186 let call_count = Arc::new(AtomicUsize::new(0));
187 let count_clone = call_count.clone();
188 let expr_count = Arc::new(AtomicUsize::new(0));
189 let expr_count_clone = expr_count.clone();
190
191 let resolver = Arc::new(move |uri: &str| {
192 if uri == "mock:a" {
193 let count = count_clone.clone();
194 Some(BoxProcessor::from_fn(move |ex| {
195 count.fetch_add(1, Ordering::SeqCst);
196 Box::pin(async move { Ok(ex) })
197 }))
198 } else {
199 None
200 }
201 });
202
203 let config = DynamicRouterConfig::new(Arc::new(move |ex: &Exchange| {
204 let count = expr_count_clone.fetch_add(1, Ordering::SeqCst);
205 if count == 0 {
206 ex.input
207 .header("dest")
208 .and_then(|v| v.as_str().map(|s| s.to_string()))
209 } else {
210 None
211 }
212 }));
213
214 let mut svc = DynamicRouterService::new(config, resolver);
215
216 let mut ex = Exchange::new(Message::new("test"));
217 ex.input.set_header("dest", Value::String("mock:a".into()));
218
219 let _result = svc.ready().await.unwrap().call(ex).await.unwrap();
220 assert_eq!(call_count.load(Ordering::SeqCst), 1);
221 }
222
223 #[tokio::test]
224 async fn test_dynamic_router_loop_terminates_on_none() {
225 let iterations = Arc::new(AtomicUsize::new(0));
226 let iterations_clone = iterations.clone();
227
228 let config = DynamicRouterConfig::new(Arc::new(move |_ex: &Exchange| {
229 let count = iterations_clone.fetch_add(1, Ordering::SeqCst);
230 if count < 2 {
231 Some("mock:a".to_string())
232 } else {
233 None
234 }
235 }));
236
237 let mut svc = DynamicRouterService::new(config, mock_resolver());
238
239 let ex = Exchange::new(Message::new("test"));
240 let result = svc.ready().await.unwrap().call(ex).await;
241
242 assert!(result.is_ok());
243 assert_eq!(iterations.load(Ordering::SeqCst), 3);
244 }
245
246 #[tokio::test]
247 async fn test_dynamic_router_max_iterations() {
248 let config = make_config(|_| Some("mock:a".to_string())).max_iterations(5);
249
250 let mut svc = DynamicRouterService::new(config, mock_resolver());
251
252 let ex = Exchange::new(Message::new("test"));
253 let result = svc.ready().await.unwrap().call(ex).await;
254
255 assert!(result.is_err());
256 let err = result.unwrap_err().to_string();
257 assert!(err.contains("max iterations"));
258 }
259
260 #[tokio::test]
261 async fn test_dynamic_router_invalid_endpoint_error() {
262 let config =
263 make_config(|_| Some("invalid:endpoint".to_string())).ignore_invalid_endpoints(false);
264
265 let mut svc = DynamicRouterService::new(config, mock_resolver());
266
267 let ex = Exchange::new(Message::new("test"));
268 let result = svc.ready().await.unwrap().call(ex).await;
269
270 assert!(result.is_err());
271 let err = result.unwrap_err().to_string();
272 assert!(err.contains("Invalid endpoint"));
273 }
274
275 #[tokio::test]
276 async fn test_dynamic_router_ignore_invalid_endpoint() {
277 let call_count = Arc::new(AtomicUsize::new(0));
278 let count_clone = call_count.clone();
279 let expr_count = Arc::new(AtomicUsize::new(0));
280 let expr_count_clone = expr_count.clone();
281
282 let resolver = Arc::new(move |uri: &str| {
283 if uri == "mock:valid" {
284 let count = count_clone.clone();
285 Some(BoxProcessor::from_fn(move |ex| {
286 count.fetch_add(1, Ordering::SeqCst);
287 Box::pin(async move { Ok(ex) })
288 }))
289 } else {
290 None
291 }
292 });
293
294 let config = DynamicRouterConfig::new(Arc::new(move |_ex: &Exchange| {
295 let count = expr_count_clone.fetch_add(1, Ordering::SeqCst);
296 if count == 0 {
297 Some("invalid:endpoint,mock:valid".to_string())
298 } else {
299 None
300 }
301 }))
302 .ignore_invalid_endpoints(true);
303
304 let mut svc = DynamicRouterService::new(config, resolver);
305
306 let ex = Exchange::new(Message::new("test"));
307 let result = svc.ready().await.unwrap().call(ex).await;
308
309 assert!(result.is_ok());
310 assert_eq!(call_count.load(Ordering::SeqCst), 1);
311 }
312
313 #[tokio::test]
314 async fn test_dynamic_router_cache_size_enforced() {
315 let resolver_call_count = Arc::new(AtomicUsize::new(0));
317 let count_clone = resolver_call_count.clone();
318
319 let resolver: EndpointResolver = Arc::new(move |uri: &str| {
320 if uri.starts_with("mock:") {
321 count_clone.fetch_add(1, Ordering::SeqCst);
322 Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
323 } else {
324 None
325 }
326 });
327
328 let expr_count = Arc::new(AtomicUsize::new(0));
330 let expr_count_clone = expr_count.clone();
331 let config = DynamicRouterConfig::new(Arc::new(move |_ex: &Exchange| {
332 let n = expr_count_clone.fetch_add(1, Ordering::SeqCst);
333 match n {
334 0 => Some("mock:a".to_string()),
335 1 => Some("mock:b".to_string()),
336 2 => Some("mock:c".to_string()),
337 _ => None,
338 }
339 }))
340 .cache_size(2);
341
342 let mut svc = DynamicRouterService::new(config, resolver);
343
344 let ex = Exchange::new(Message::new("test"));
345 svc.ready().await.unwrap().call(ex).await.unwrap();
346
347 assert_eq!(resolver_call_count.load(Ordering::SeqCst), 3);
350 }
351}