camel_processor/
dynamic_router.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Instant;
5
6use tower::Service;
7use tower::ServiceExt;
8
9use camel_api::endpoint_pipeline::{CAMEL_SLIP_ENDPOINT, EndpointPipelineConfig, EndpointResolver};
10use camel_api::{CamelError, DynamicRouterConfig, Exchange, Value};
11
12use crate::endpoint_pipeline::EndpointPipelineService;
13
14#[derive(Clone)]
15pub struct DynamicRouterService {
16 config: DynamicRouterConfig,
17 pipeline: EndpointPipelineService,
18}
19
20impl DynamicRouterService {
21 pub fn new(config: DynamicRouterConfig, endpoint_resolver: EndpointResolver) -> Self {
22 let pipeline_config = EndpointPipelineConfig {
23 cache_size: EndpointPipelineConfig::from_signed(config.cache_size),
24 ignore_invalid_endpoints: config.ignore_invalid_endpoints,
25 };
26
27 Self {
28 config,
29 pipeline: EndpointPipelineService::new(endpoint_resolver, pipeline_config),
30 }
31 }
32}
33
34impl Service<Exchange> for DynamicRouterService {
35 type Response = Exchange;
36 type Error = CamelError;
37 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
38
39 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
40 Poll::Ready(Ok(()))
41 }
42
43 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
44 let config = self.config.clone();
45 let pipeline = self.pipeline.clone();
46
47 Box::pin(async move {
48 let start = Instant::now();
49 let mut iterations = 0;
50
51 loop {
52 iterations += 1;
53
54 if iterations > config.max_iterations {
55 return Err(CamelError::ProcessorError(format!(
56 "Dynamic router exceeded max iterations ({})",
57 config.max_iterations
58 )));
59 }
60
61 if let Some(timeout) = config.timeout
62 && start.elapsed() > timeout
63 {
64 return Err(CamelError::ProcessorError(format!(
65 "Dynamic router timed out after {:?}",
66 timeout
67 )));
68 }
69
70 let destinations = match (config.expression)(&exchange) {
71 None => break,
72 Some(uris) => uris,
73 };
74
75 for uri in destinations.split(&config.uri_delimiter) {
76 let uri = uri.trim();
77 if uri.is_empty() {
78 continue;
79 }
80
81 let endpoint = match pipeline.resolve(uri)? {
82 Some(e) => e,
83 None => {
84 continue;
85 }
86 };
87
88 exchange.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri.to_string()));
89
90 let mut endpoint = endpoint;
91 exchange = endpoint.ready().await?.call(exchange).await?;
92 }
93 }
94
95 Ok(exchange)
96 })
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103 use camel_api::{BoxProcessor, BoxProcessorExt, Message};
104 use std::sync::Arc;
105 use std::sync::atomic::{AtomicUsize, Ordering};
106 use tower::ServiceExt;
107
108 fn make_config<F>(f: F) -> DynamicRouterConfig
109 where
110 F: Fn(&Exchange) -> Option<String> + Send + Sync + 'static,
111 {
112 DynamicRouterConfig::new(Arc::new(f))
113 }
114
115 fn mock_resolver() -> EndpointResolver {
116 Arc::new(|uri: &str| {
117 if uri.starts_with("mock:") {
118 Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
119 } else {
120 None
121 }
122 })
123 }
124
125 #[tokio::test]
126 async fn test_dynamic_router_single_destination() {
127 let call_count = Arc::new(AtomicUsize::new(0));
128 let count_clone = call_count.clone();
129 let expr_count = Arc::new(AtomicUsize::new(0));
130 let expr_count_clone = expr_count.clone();
131
132 let resolver = Arc::new(move |uri: &str| {
133 if uri == "mock:a" {
134 let count = count_clone.clone();
135 Some(BoxProcessor::from_fn(move |ex| {
136 count.fetch_add(1, Ordering::SeqCst);
137 Box::pin(async move { Ok(ex) })
138 }))
139 } else {
140 None
141 }
142 });
143
144 let config = DynamicRouterConfig::new(Arc::new(move |ex: &Exchange| {
145 let count = expr_count_clone.fetch_add(1, Ordering::SeqCst);
146 if count == 0 {
147 ex.input
148 .header("dest")
149 .and_then(|v| v.as_str().map(|s| s.to_string()))
150 } else {
151 None
152 }
153 }));
154
155 let mut svc = DynamicRouterService::new(config, resolver);
156
157 let mut ex = Exchange::new(Message::new("test"));
158 ex.input.set_header("dest", Value::String("mock:a".into()));
159
160 let _result = svc.ready().await.unwrap().call(ex).await.unwrap();
161 assert_eq!(call_count.load(Ordering::SeqCst), 1);
162 }
163
164 #[tokio::test]
165 async fn test_dynamic_router_loop_terminates_on_none() {
166 let iterations = Arc::new(AtomicUsize::new(0));
167 let iterations_clone = iterations.clone();
168
169 let config = DynamicRouterConfig::new(Arc::new(move |_ex: &Exchange| {
170 let count = iterations_clone.fetch_add(1, Ordering::SeqCst);
171 if count < 2 {
172 Some("mock:a".to_string())
173 } else {
174 None
175 }
176 }));
177
178 let mut svc = DynamicRouterService::new(config, mock_resolver());
179
180 let ex = Exchange::new(Message::new("test"));
181 let result = svc.ready().await.unwrap().call(ex).await;
182
183 assert!(result.is_ok());
184 assert_eq!(iterations.load(Ordering::SeqCst), 3);
185 }
186
187 #[tokio::test]
188 async fn test_dynamic_router_max_iterations() {
189 let config = make_config(|_| Some("mock:a".to_string())).max_iterations(5);
190
191 let mut svc = DynamicRouterService::new(config, mock_resolver());
192
193 let ex = Exchange::new(Message::new("test"));
194 let result = svc.ready().await.unwrap().call(ex).await;
195
196 assert!(result.is_err());
197 let err = result.unwrap_err().to_string();
198 assert!(err.contains("max iterations"));
199 }
200
201 #[tokio::test]
202 async fn test_dynamic_router_invalid_endpoint_error() {
203 let config =
204 make_config(|_| Some("invalid:endpoint".to_string())).ignore_invalid_endpoints(false);
205
206 let mut svc = DynamicRouterService::new(config, mock_resolver());
207
208 let ex = Exchange::new(Message::new("test"));
209 let result = svc.ready().await.unwrap().call(ex).await;
210
211 assert!(result.is_err());
212 let err = result.unwrap_err().to_string();
213 assert!(err.contains("Invalid endpoint"));
214 }
215
216 #[tokio::test]
217 async fn test_dynamic_router_ignore_invalid_endpoint() {
218 let call_count = Arc::new(AtomicUsize::new(0));
219 let count_clone = call_count.clone();
220 let expr_count = Arc::new(AtomicUsize::new(0));
221 let expr_count_clone = expr_count.clone();
222
223 let resolver = Arc::new(move |uri: &str| {
224 if uri == "mock:valid" {
225 let count = count_clone.clone();
226 Some(BoxProcessor::from_fn(move |ex| {
227 count.fetch_add(1, Ordering::SeqCst);
228 Box::pin(async move { Ok(ex) })
229 }))
230 } else {
231 None
232 }
233 });
234
235 let config = DynamicRouterConfig::new(Arc::new(move |_ex: &Exchange| {
236 let count = expr_count_clone.fetch_add(1, Ordering::SeqCst);
237 if count == 0 {
238 Some("invalid:endpoint,mock:valid".to_string())
239 } else {
240 None
241 }
242 }))
243 .ignore_invalid_endpoints(true);
244
245 let mut svc = DynamicRouterService::new(config, resolver);
246
247 let ex = Exchange::new(Message::new("test"));
248 let result = svc.ready().await.unwrap().call(ex).await;
249
250 assert!(result.is_ok());
251 assert_eq!(call_count.load(Ordering::SeqCst), 1);
252 }
253
254 #[tokio::test]
255 async fn test_dynamic_router_cache_size_enforced() {
256 let resolver_call_count = Arc::new(AtomicUsize::new(0));
258 let count_clone = resolver_call_count.clone();
259
260 let resolver: EndpointResolver = Arc::new(move |uri: &str| {
261 if uri.starts_with("mock:") {
262 count_clone.fetch_add(1, Ordering::SeqCst);
263 Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
264 } else {
265 None
266 }
267 });
268
269 let expr_count = Arc::new(AtomicUsize::new(0));
271 let expr_count_clone = expr_count.clone();
272 let config = DynamicRouterConfig::new(Arc::new(move |_ex: &Exchange| {
273 let n = expr_count_clone.fetch_add(1, Ordering::SeqCst);
274 match n {
275 0 => Some("mock:a".to_string()),
276 1 => Some("mock:b".to_string()),
277 2 => Some("mock:c".to_string()),
278 _ => None,
279 }
280 }))
281 .cache_size(2);
282
283 let mut svc = DynamicRouterService::new(config, resolver);
284
285 let ex = Exchange::new(Message::new("test"));
286 svc.ready().await.unwrap().call(ex).await.unwrap();
287
288 assert_eq!(resolver_call_count.load(Ordering::SeqCst), 3);
291 }
292}