camel_processor/
loop_eip.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Service, ServiceExt};
6
7use camel_api::loop_eip::{LoopConfig, LoopMode, MAX_LOOP_ITERATIONS};
8use camel_api::{BoxProcessor, CamelError, Exchange, Value};
9
10pub const CAMEL_LOOP_INDEX: &str = "CamelLoopIndex";
11pub const CAMEL_LOOP_SIZE: &str = "CamelLoopSize";
12
13#[derive(Clone)]
14pub struct LoopService {
15 config: LoopConfig,
16 sub_pipeline: BoxProcessor,
17}
18
19impl LoopService {
20 pub fn new(config: LoopConfig, sub_pipeline: BoxProcessor) -> Self {
21 Self {
22 config,
23 sub_pipeline,
24 }
25 }
26}
27
28impl Service<Exchange> for LoopService {
29 type Response = Exchange;
30 type Error = CamelError;
31 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
32
33 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
34 Poll::Ready(Ok(()))
35 }
36
37 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
38 let config = self.config.clone();
39 let mut pipeline = self.sub_pipeline.clone();
40
41 Box::pin(async move {
42 match config.mode {
43 LoopMode::Count(n) => {
44 exchange.set_property(CAMEL_LOOP_SIZE, Value::from(n as u64));
45 for i in 0..n {
46 exchange.set_property(CAMEL_LOOP_INDEX, Value::from(i as u64));
47 exchange = pipeline.ready().await?.call(exchange).await?;
48 }
49 }
50 LoopMode::While(ref predicate) => {
51 exchange.set_property(CAMEL_LOOP_SIZE, Value::from(0u64));
52 for i in 0..MAX_LOOP_ITERATIONS {
53 if !predicate(&exchange) {
54 break;
55 }
56 exchange.set_property(CAMEL_LOOP_INDEX, Value::from(i as u64));
57 exchange = pipeline.ready().await?.call(exchange).await?;
58 }
59 if predicate(&exchange) {
60 tracing::warn!(
61 "Loop while-mode hit MAX_LOOP_ITERATIONS ({}) safety guard. Predicate still true.",
62 MAX_LOOP_ITERATIONS
63 );
64 }
65 }
66 }
67 Ok(exchange)
68 })
69 }
70}
71
72pub struct LoopSegment {
84 pub config: camel_api::loop_eip::LoopConfig,
85 pub body: camel_api::OutcomeSegment,
86}
87
88impl Clone for LoopSegment {
89 fn clone(&self) -> Self {
90 Self {
91 config: self.config.clone(),
92 body: self.body.clone(),
93 }
94 }
95}
96
97impl camel_api::OutcomePipeline for LoopSegment {
98 fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
99 Box::new(self.clone())
100 }
101
102 fn run<'a>(
103 &'a mut self,
104 exchange: camel_api::Exchange,
105 ) -> Pin<Box<dyn Future<Output = camel_api::PipelineOutcome> + Send + 'a>> {
106 use camel_api::loop_eip::MAX_LOOP_ITERATIONS;
107 use camel_api::{PipelineOutcome, Value};
108
109 let config = self.config.clone();
110 let body = &mut self.body;
111
112 Box::pin(async move {
113 match config.mode {
114 camel_api::loop_eip::LoopMode::Count(n) => {
115 let mut ex = exchange;
116 ex.set_property(CAMEL_LOOP_SIZE, Value::from(n as u64));
117 for i in 0..n {
118 ex.set_property(CAMEL_LOOP_INDEX, Value::from(i as u64));
119 match body.run(ex).await {
120 PipelineOutcome::Completed(next) => {
121 ex = next;
122 }
123 other => return other,
124 }
125 }
126 PipelineOutcome::Completed(ex)
127 }
128 camel_api::loop_eip::LoopMode::While(ref predicate) => {
129 let mut ex = exchange;
130 ex.set_property(CAMEL_LOOP_SIZE, Value::from(0u64));
131 let mut i = 0u64;
132 while i < MAX_LOOP_ITERATIONS as u64 {
133 if !predicate(&ex) {
134 break;
135 }
136 ex.set_property(CAMEL_LOOP_INDEX, Value::from(i));
137 match body.run(ex).await {
138 PipelineOutcome::Completed(next) => {
139 ex = next;
140 }
141 other => return other,
142 }
143 i += 1;
144 }
145 if predicate(&ex) {
146 tracing::warn!(
147 "Loop while-mode hit MAX_LOOP_ITERATIONS ({}) safety guard. Predicate still true.",
148 MAX_LOOP_ITERATIONS
149 );
150 }
151 PipelineOutcome::Completed(ex)
152 }
153 }
154 })
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use std::sync::atomic::{AtomicUsize, Ordering};
161 use std::sync::{Arc, Mutex};
162
163 use camel_api::loop_eip::{LoopConfig, LoopMode, MAX_LOOP_ITERATIONS};
164 use camel_api::{
165 Body, BoxProcessor, BoxProcessorExt, CamelError, Exchange, IdentityProcessor, Message,
166 };
167 use tower::{Service, ServiceExt};
168
169 use super::{CAMEL_LOOP_INDEX, CAMEL_LOOP_SIZE, LoopService};
170
171 fn identity_pipeline() -> BoxProcessor {
172 BoxProcessor::new(IdentityProcessor)
173 }
174
175 fn counter_pipeline(counter: Arc<AtomicUsize>) -> BoxProcessor {
176 BoxProcessor::from_fn(move |exchange: Exchange| {
177 let counter = Arc::clone(&counter);
178 Box::pin(async move {
179 counter.fetch_add(1, Ordering::SeqCst);
180 Ok(exchange)
181 })
182 })
183 }
184
185 #[tokio::test]
186 async fn test_loop_count_iterates_n_times() {
187 let counter = Arc::new(AtomicUsize::new(0));
188 let config = LoopConfig::new(LoopMode::Count(3));
189 let mut service = LoopService::new(config, counter_pipeline(Arc::clone(&counter)));
190
191 let exchange = Exchange::new(Message::new("test"));
192 let result = service.ready().await.unwrap().call(exchange).await;
193
194 assert!(result.is_ok());
195 assert_eq!(counter.load(Ordering::SeqCst), 3);
196 }
197
198 #[tokio::test]
199 async fn test_loop_count_sets_properties() {
200 let seen_indices = Arc::new(Mutex::new(Vec::<u64>::new()));
201 let seen_indices_for_pipeline = Arc::clone(&seen_indices);
202
203 let pipeline = BoxProcessor::from_fn(move |exchange: Exchange| {
204 let seen_indices = Arc::clone(&seen_indices_for_pipeline);
205 Box::pin(async move {
206 if let Some(index) = exchange.property(CAMEL_LOOP_INDEX).and_then(|v| v.as_u64()) {
207 seen_indices.lock().unwrap().push(index);
208 }
209 Ok(exchange)
210 })
211 });
212
213 let config = LoopConfig::new(LoopMode::Count(3));
214 let mut service = LoopService::new(config, pipeline);
215
216 let exchange = Exchange::new(Message::new("test"));
217 let result = service.ready().await.unwrap().call(exchange).await.unwrap();
218
219 assert_eq!(*seen_indices.lock().unwrap(), vec![0, 1, 2]);
220 assert_eq!(
221 result.property(CAMEL_LOOP_SIZE).and_then(|v| v.as_u64()),
222 Some(3)
223 );
224 }
225
226 #[tokio::test]
227 async fn test_loop_count_zero_is_noop() {
228 let config = LoopConfig::new(LoopMode::Count(0));
229 let mut service = LoopService::new(config, identity_pipeline());
230
231 let exchange = Exchange::new(Message::new("test"));
232 let result = service.ready().await.unwrap().call(exchange).await.unwrap();
233
234 assert_eq!(result.input.body.as_text(), Some("test"));
235 assert_eq!(
236 result.property(CAMEL_LOOP_SIZE).and_then(|v| v.as_u64()),
237 Some(0)
238 );
239 assert!(result.property(CAMEL_LOOP_INDEX).is_none());
240 }
241
242 #[tokio::test]
243 async fn test_loop_while_stops_when_predicate_false() {
244 let counter = Arc::new(AtomicUsize::new(0));
245
246 let predicate = Arc::new(|exchange: &Exchange| {
247 exchange
248 .property("iterations")
249 .and_then(|v| v.as_u64())
250 .unwrap_or(0)
251 < 2
252 });
253
254 let counter_for_pipeline = Arc::clone(&counter);
255 let pipeline = BoxProcessor::from_fn(move |mut exchange: Exchange| {
256 let counter = Arc::clone(&counter_for_pipeline);
257 Box::pin(async move {
258 let current = exchange
259 .property("iterations")
260 .and_then(|v| v.as_u64())
261 .unwrap_or(0);
262 exchange.set_property("iterations", current + 1);
263 counter.fetch_add(1, Ordering::SeqCst);
264 Ok(exchange)
265 })
266 });
267
268 let config = LoopConfig::new(LoopMode::While(predicate));
269 let mut service = LoopService::new(config, pipeline);
270
271 let exchange = Exchange::new(Message::new("test"));
272 let result = service.ready().await.unwrap().call(exchange).await.unwrap();
273
274 assert_eq!(counter.load(Ordering::SeqCst), 2);
275 assert_eq!(
276 result.property("iterations").and_then(|v| v.as_u64()),
277 Some(2)
278 );
279 assert_eq!(
280 result.property(CAMEL_LOOP_INDEX).and_then(|v| v.as_u64()),
281 Some(1)
282 );
283 assert_eq!(
284 result.property(CAMEL_LOOP_SIZE).and_then(|v| v.as_u64()),
285 Some(0)
286 );
287 }
288
289 #[tokio::test]
290 async fn test_loop_while_respects_max_iterations() {
291 let counter = Arc::new(AtomicUsize::new(0));
292 let predicate = Arc::new(|_exchange: &Exchange| true);
293 let config = LoopConfig::new(LoopMode::While(predicate));
294 let mut service = LoopService::new(config, counter_pipeline(Arc::clone(&counter)));
295
296 let exchange = Exchange::new(Message::new("test"));
297 let result = service.ready().await.unwrap().call(exchange).await;
298
299 assert!(result.is_ok());
300 assert_eq!(counter.load(Ordering::SeqCst), MAX_LOOP_ITERATIONS);
301 }
302
303 #[tokio::test]
304 async fn test_loop_error_propagation() {
305 let pipeline = BoxProcessor::from_fn(|_exchange: Exchange| {
306 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
307 });
308
309 let config = LoopConfig::new(LoopMode::Count(3));
310 let mut service = LoopService::new(config, pipeline);
311
312 let exchange = Exchange::new(Message::new("test"));
313 let result = service.ready().await.unwrap().call(exchange).await;
314
315 assert!(matches!(result, Err(CamelError::ProcessorError(msg)) if msg == "boom"));
316 }
317
318 #[tokio::test]
319 async fn test_loop_pipeline_chaining() {
320 let pipeline = BoxProcessor::from_fn(|mut exchange: Exchange| {
321 Box::pin(async move {
322 if let Body::Text(s) = &exchange.input.body {
323 exchange.input.body = Body::Text(format!("{s}x"));
324 }
325 Ok(exchange)
326 })
327 });
328
329 let config = LoopConfig::new(LoopMode::Count(3));
330 let mut service = LoopService::new(config, pipeline);
331
332 let exchange = Exchange::new(Message::new("start"));
333 let result = service.ready().await.unwrap().call(exchange).await.unwrap();
334
335 assert_eq!(result.input.body.as_text(), Some("startxxx"));
336 }
337}