Skip to main content

camel_processor/
loop_eip.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Service, ServiceExt};
6
7use camel_api::loop_eip::{LoopConfig, LoopMode, MAX_LOOP_ITERATIONS};
8use camel_api::{BoxProcessor, CamelError, Exchange, Value};
9
10pub const CAMEL_LOOP_INDEX: &str = "CamelLoopIndex";
11pub const CAMEL_LOOP_SIZE: &str = "CamelLoopSize";
12
13#[derive(Clone)]
14pub struct LoopService {
15    config: LoopConfig,
16    sub_pipeline: BoxProcessor,
17}
18
19impl LoopService {
20    pub fn new(config: LoopConfig, sub_pipeline: BoxProcessor) -> Self {
21        Self {
22            config,
23            sub_pipeline,
24        }
25    }
26}
27
28impl Service<Exchange> for LoopService {
29    type Response = Exchange;
30    type Error = CamelError;
31    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
32
33    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
34        Poll::Ready(Ok(()))
35    }
36
37    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
38        let config = self.config.clone();
39        let mut pipeline = self.sub_pipeline.clone();
40
41        Box::pin(async move {
42            match config.mode {
43                LoopMode::Count(n) => {
44                    exchange.set_property(CAMEL_LOOP_SIZE, Value::from(n as u64));
45                    for i in 0..n {
46                        exchange.set_property(CAMEL_LOOP_INDEX, Value::from(i as u64));
47                        exchange = pipeline.ready().await?.call(exchange).await?;
48                    }
49                }
50                LoopMode::While(ref predicate) => {
51                    exchange.set_property(CAMEL_LOOP_SIZE, Value::from(0u64));
52                    for i in 0..MAX_LOOP_ITERATIONS {
53                        if !predicate(&exchange) {
54                            break;
55                        }
56                        exchange.set_property(CAMEL_LOOP_INDEX, Value::from(i as u64));
57                        exchange = pipeline.ready().await?.call(exchange).await?;
58                    }
59                    if predicate(&exchange) {
60                        tracing::warn!(
61                            "Loop while-mode hit MAX_LOOP_ITERATIONS ({}) safety guard. Predicate still true.",
62                            MAX_LOOP_ITERATIONS
63                        );
64                    }
65                }
66            }
67            Ok(exchange)
68        })
69    }
70}
71
72// ── LoopSegment (ADR-0025 OutcomePipeline) ─────────────────────────────
73
74/// Outcome-aware structural EIP segment for the Loop pattern.
75///
76/// Operates at the `PipelineOutcome` layer so that `Stopped(ex)` from a
77/// sub-step (e.g. Stop EIP) is preserved with the exchange including all
78/// mutations. Supports both Count and While modes, mirroring `LoopService`
79/// semantics exactly.
80///
81/// Unlike `LoopService` (which operates at the Tower layer), `LoopSegment`
82/// correctly short-circuits on `PipelineOutcome::Stopped` or `Failed`.
83pub struct LoopSegment {
84    pub config: camel_api::loop_eip::LoopConfig,
85    pub body: camel_api::OutcomeSegment,
86}
87
88impl Clone for LoopSegment {
89    fn clone(&self) -> Self {
90        Self {
91            config: self.config.clone(),
92            body: self.body.clone(),
93        }
94    }
95}
96
97impl camel_api::OutcomePipeline for LoopSegment {
98    fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
99        Box::new(self.clone())
100    }
101
102    fn run<'a>(
103        &'a mut self,
104        exchange: camel_api::Exchange,
105    ) -> Pin<Box<dyn Future<Output = camel_api::PipelineOutcome> + Send + 'a>> {
106        use camel_api::loop_eip::MAX_LOOP_ITERATIONS;
107        use camel_api::{PipelineOutcome, Value};
108
109        let config = self.config.clone();
110        let body = &mut self.body;
111
112        Box::pin(async move {
113            match config.mode {
114                camel_api::loop_eip::LoopMode::Count(n) => {
115                    let mut ex = exchange;
116                    ex.set_property(CAMEL_LOOP_SIZE, Value::from(n as u64));
117                    for i in 0..n {
118                        ex.set_property(CAMEL_LOOP_INDEX, Value::from(i as u64));
119                        match body.run(ex).await {
120                            PipelineOutcome::Completed(next) => {
121                                ex = next;
122                            }
123                            other => return other,
124                        }
125                    }
126                    PipelineOutcome::Completed(ex)
127                }
128                camel_api::loop_eip::LoopMode::While(ref predicate) => {
129                    let mut ex = exchange;
130                    ex.set_property(CAMEL_LOOP_SIZE, Value::from(0u64));
131                    let mut i = 0u64;
132                    while i < MAX_LOOP_ITERATIONS as u64 {
133                        if !predicate(&ex) {
134                            break;
135                        }
136                        ex.set_property(CAMEL_LOOP_INDEX, Value::from(i));
137                        match body.run(ex).await {
138                            PipelineOutcome::Completed(next) => {
139                                ex = next;
140                            }
141                            other => return other,
142                        }
143                        i += 1;
144                    }
145                    if predicate(&ex) {
146                        tracing::warn!(
147                            "Loop while-mode hit MAX_LOOP_ITERATIONS ({}) safety guard. Predicate still true.",
148                            MAX_LOOP_ITERATIONS
149                        );
150                    }
151                    PipelineOutcome::Completed(ex)
152                }
153            }
154        })
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use std::sync::atomic::{AtomicUsize, Ordering};
161    use std::sync::{Arc, Mutex};
162
163    use camel_api::loop_eip::{LoopConfig, LoopMode, MAX_LOOP_ITERATIONS};
164    use camel_api::{
165        Body, BoxProcessor, BoxProcessorExt, CamelError, Exchange, IdentityProcessor, Message,
166    };
167    use tower::{Service, ServiceExt};
168
169    use super::{CAMEL_LOOP_INDEX, CAMEL_LOOP_SIZE, LoopService};
170
171    fn identity_pipeline() -> BoxProcessor {
172        BoxProcessor::new(IdentityProcessor)
173    }
174
175    fn counter_pipeline(counter: Arc<AtomicUsize>) -> BoxProcessor {
176        BoxProcessor::from_fn(move |exchange: Exchange| {
177            let counter = Arc::clone(&counter);
178            Box::pin(async move {
179                counter.fetch_add(1, Ordering::SeqCst);
180                Ok(exchange)
181            })
182        })
183    }
184
185    #[tokio::test]
186    async fn test_loop_count_iterates_n_times() {
187        let counter = Arc::new(AtomicUsize::new(0));
188        let config = LoopConfig::new(LoopMode::Count(3));
189        let mut service = LoopService::new(config, counter_pipeline(Arc::clone(&counter)));
190
191        let exchange = Exchange::new(Message::new("test"));
192        let result = service.ready().await.unwrap().call(exchange).await;
193
194        assert!(result.is_ok());
195        assert_eq!(counter.load(Ordering::SeqCst), 3);
196    }
197
198    #[tokio::test]
199    async fn test_loop_count_sets_properties() {
200        let seen_indices = Arc::new(Mutex::new(Vec::<u64>::new()));
201        let seen_indices_for_pipeline = Arc::clone(&seen_indices);
202
203        let pipeline = BoxProcessor::from_fn(move |exchange: Exchange| {
204            let seen_indices = Arc::clone(&seen_indices_for_pipeline);
205            Box::pin(async move {
206                if let Some(index) = exchange.property(CAMEL_LOOP_INDEX).and_then(|v| v.as_u64()) {
207                    seen_indices.lock().unwrap().push(index);
208                }
209                Ok(exchange)
210            })
211        });
212
213        let config = LoopConfig::new(LoopMode::Count(3));
214        let mut service = LoopService::new(config, pipeline);
215
216        let exchange = Exchange::new(Message::new("test"));
217        let result = service.ready().await.unwrap().call(exchange).await.unwrap();
218
219        assert_eq!(*seen_indices.lock().unwrap(), vec![0, 1, 2]);
220        assert_eq!(
221            result.property(CAMEL_LOOP_SIZE).and_then(|v| v.as_u64()),
222            Some(3)
223        );
224    }
225
226    #[tokio::test]
227    async fn test_loop_count_zero_is_noop() {
228        let config = LoopConfig::new(LoopMode::Count(0));
229        let mut service = LoopService::new(config, identity_pipeline());
230
231        let exchange = Exchange::new(Message::new("test"));
232        let result = service.ready().await.unwrap().call(exchange).await.unwrap();
233
234        assert_eq!(result.input.body.as_text(), Some("test"));
235        assert_eq!(
236            result.property(CAMEL_LOOP_SIZE).and_then(|v| v.as_u64()),
237            Some(0)
238        );
239        assert!(result.property(CAMEL_LOOP_INDEX).is_none());
240    }
241
242    #[tokio::test]
243    async fn test_loop_while_stops_when_predicate_false() {
244        let counter = Arc::new(AtomicUsize::new(0));
245
246        let predicate = Arc::new(|exchange: &Exchange| {
247            exchange
248                .property("iterations")
249                .and_then(|v| v.as_u64())
250                .unwrap_or(0)
251                < 2
252        });
253
254        let counter_for_pipeline = Arc::clone(&counter);
255        let pipeline = BoxProcessor::from_fn(move |mut exchange: Exchange| {
256            let counter = Arc::clone(&counter_for_pipeline);
257            Box::pin(async move {
258                let current = exchange
259                    .property("iterations")
260                    .and_then(|v| v.as_u64())
261                    .unwrap_or(0);
262                exchange.set_property("iterations", current + 1);
263                counter.fetch_add(1, Ordering::SeqCst);
264                Ok(exchange)
265            })
266        });
267
268        let config = LoopConfig::new(LoopMode::While(predicate));
269        let mut service = LoopService::new(config, pipeline);
270
271        let exchange = Exchange::new(Message::new("test"));
272        let result = service.ready().await.unwrap().call(exchange).await.unwrap();
273
274        assert_eq!(counter.load(Ordering::SeqCst), 2);
275        assert_eq!(
276            result.property("iterations").and_then(|v| v.as_u64()),
277            Some(2)
278        );
279        assert_eq!(
280            result.property(CAMEL_LOOP_INDEX).and_then(|v| v.as_u64()),
281            Some(1)
282        );
283        assert_eq!(
284            result.property(CAMEL_LOOP_SIZE).and_then(|v| v.as_u64()),
285            Some(0)
286        );
287    }
288
289    #[tokio::test]
290    async fn test_loop_while_respects_max_iterations() {
291        let counter = Arc::new(AtomicUsize::new(0));
292        let predicate = Arc::new(|_exchange: &Exchange| true);
293        let config = LoopConfig::new(LoopMode::While(predicate));
294        let mut service = LoopService::new(config, counter_pipeline(Arc::clone(&counter)));
295
296        let exchange = Exchange::new(Message::new("test"));
297        let result = service.ready().await.unwrap().call(exchange).await;
298
299        assert!(result.is_ok());
300        assert_eq!(counter.load(Ordering::SeqCst), MAX_LOOP_ITERATIONS);
301    }
302
303    #[tokio::test]
304    async fn test_loop_error_propagation() {
305        let pipeline = BoxProcessor::from_fn(|_exchange: Exchange| {
306            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
307        });
308
309        let config = LoopConfig::new(LoopMode::Count(3));
310        let mut service = LoopService::new(config, pipeline);
311
312        let exchange = Exchange::new(Message::new("test"));
313        let result = service.ready().await.unwrap().call(exchange).await;
314
315        assert!(matches!(result, Err(CamelError::ProcessorError(msg)) if msg == "boom"));
316    }
317
318    #[tokio::test]
319    async fn test_loop_pipeline_chaining() {
320        let pipeline = BoxProcessor::from_fn(|mut exchange: Exchange| {
321            Box::pin(async move {
322                if let Body::Text(s) = &exchange.input.body {
323                    exchange.input.body = Body::Text(format!("{s}x"));
324                }
325                Ok(exchange)
326            })
327        });
328
329        let config = LoopConfig::new(LoopMode::Count(3));
330        let mut service = LoopService::new(config, pipeline);
331
332        let exchange = Exchange::new(Message::new("start"));
333        let result = service.ready().await.unwrap().call(exchange).await.unwrap();
334
335        assert_eq!(result.input.body.as_text(), Some("startxxx"));
336    }
337}