Skip to main content

camel_processor/
loop_eip.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Service, ServiceExt};
6
7use camel_api::loop_eip::{LoopConfig, LoopMode, MAX_LOOP_ITERATIONS};
8use camel_api::{BoxProcessor, CamelError, Exchange, Value};
9
10pub const CAMEL_LOOP_INDEX: &str = "CamelLoopIndex";
11pub const CAMEL_LOOP_SIZE: &str = "CamelLoopSize";
12
13#[derive(Clone)]
14pub struct LoopService {
15    config: LoopConfig,
16    sub_pipeline: BoxProcessor,
17}
18
19impl LoopService {
20    pub fn new(config: LoopConfig, sub_pipeline: BoxProcessor) -> Self {
21        Self {
22            config,
23            sub_pipeline,
24        }
25    }
26}
27
28impl Service<Exchange> for LoopService {
29    type Response = Exchange;
30    type Error = CamelError;
31    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
32
33    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
34        Poll::Ready(Ok(()))
35    }
36
37    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
38        let config = self.config.clone();
39        let mut pipeline = self.sub_pipeline.clone();
40
41        Box::pin(async move {
42            match config.mode {
43                LoopMode::Count(n) => {
44                    exchange.set_property(CAMEL_LOOP_SIZE, Value::from(n as u64));
45                    for i in 0..n {
46                        exchange.set_property(CAMEL_LOOP_INDEX, Value::from(i as u64));
47                        exchange = pipeline.ready().await?.call(exchange).await?;
48                    }
49                }
50                LoopMode::While(ref predicate) => {
51                    exchange.set_property(CAMEL_LOOP_SIZE, Value::from(0u64));
52                    for i in 0..MAX_LOOP_ITERATIONS {
53                        if !predicate(&exchange) {
54                            break;
55                        }
56                        exchange.set_property(CAMEL_LOOP_INDEX, Value::from(i as u64));
57                        exchange = pipeline.ready().await?.call(exchange).await?;
58                    }
59                    if predicate(&exchange) {
60                        tracing::warn!(
61                            "Loop while-mode hit MAX_LOOP_ITERATIONS ({}) safety guard. Predicate still true.",
62                            MAX_LOOP_ITERATIONS
63                        );
64                    }
65                }
66            }
67            Ok(exchange)
68        })
69    }
70}
71
72#[cfg(test)]
73mod tests {
74    use std::sync::atomic::{AtomicUsize, Ordering};
75    use std::sync::{Arc, Mutex};
76
77    use camel_api::loop_eip::{LoopConfig, LoopMode, MAX_LOOP_ITERATIONS};
78    use camel_api::{
79        Body, BoxProcessor, BoxProcessorExt, CamelError, Exchange, IdentityProcessor, Message,
80    };
81    use tower::{Service, ServiceExt};
82
83    use super::{CAMEL_LOOP_INDEX, CAMEL_LOOP_SIZE, LoopService};
84
85    fn identity_pipeline() -> BoxProcessor {
86        BoxProcessor::new(IdentityProcessor)
87    }
88
89    fn counter_pipeline(counter: Arc<AtomicUsize>) -> BoxProcessor {
90        BoxProcessor::from_fn(move |exchange: Exchange| {
91            let counter = Arc::clone(&counter);
92            Box::pin(async move {
93                counter.fetch_add(1, Ordering::SeqCst);
94                Ok(exchange)
95            })
96        })
97    }
98
99    #[tokio::test]
100    async fn test_loop_count_iterates_n_times() {
101        let counter = Arc::new(AtomicUsize::new(0));
102        let config = LoopConfig::new(LoopMode::Count(3));
103        let mut service = LoopService::new(config, counter_pipeline(Arc::clone(&counter)));
104
105        let exchange = Exchange::new(Message::new("test"));
106        let result = service.ready().await.unwrap().call(exchange).await;
107
108        assert!(result.is_ok());
109        assert_eq!(counter.load(Ordering::SeqCst), 3);
110    }
111
112    #[tokio::test]
113    async fn test_loop_count_sets_properties() {
114        let seen_indices = Arc::new(Mutex::new(Vec::<u64>::new()));
115        let seen_indices_for_pipeline = Arc::clone(&seen_indices);
116
117        let pipeline = BoxProcessor::from_fn(move |exchange: Exchange| {
118            let seen_indices = Arc::clone(&seen_indices_for_pipeline);
119            Box::pin(async move {
120                if let Some(index) = exchange.property(CAMEL_LOOP_INDEX).and_then(|v| v.as_u64()) {
121                    seen_indices.lock().unwrap().push(index);
122                }
123                Ok(exchange)
124            })
125        });
126
127        let config = LoopConfig::new(LoopMode::Count(3));
128        let mut service = LoopService::new(config, pipeline);
129
130        let exchange = Exchange::new(Message::new("test"));
131        let result = service.ready().await.unwrap().call(exchange).await.unwrap();
132
133        assert_eq!(*seen_indices.lock().unwrap(), vec![0, 1, 2]);
134        assert_eq!(
135            result.property(CAMEL_LOOP_SIZE).and_then(|v| v.as_u64()),
136            Some(3)
137        );
138    }
139
140    #[tokio::test]
141    async fn test_loop_count_zero_is_noop() {
142        let config = LoopConfig::new(LoopMode::Count(0));
143        let mut service = LoopService::new(config, identity_pipeline());
144
145        let exchange = Exchange::new(Message::new("test"));
146        let result = service.ready().await.unwrap().call(exchange).await.unwrap();
147
148        assert_eq!(result.input.body.as_text(), Some("test"));
149        assert_eq!(
150            result.property(CAMEL_LOOP_SIZE).and_then(|v| v.as_u64()),
151            Some(0)
152        );
153        assert!(result.property(CAMEL_LOOP_INDEX).is_none());
154    }
155
156    #[tokio::test]
157    async fn test_loop_while_stops_when_predicate_false() {
158        let counter = Arc::new(AtomicUsize::new(0));
159
160        let predicate = Arc::new(|exchange: &Exchange| {
161            exchange
162                .property("iterations")
163                .and_then(|v| v.as_u64())
164                .unwrap_or(0)
165                < 2
166        });
167
168        let counter_for_pipeline = Arc::clone(&counter);
169        let pipeline = BoxProcessor::from_fn(move |mut exchange: Exchange| {
170            let counter = Arc::clone(&counter_for_pipeline);
171            Box::pin(async move {
172                let current = exchange
173                    .property("iterations")
174                    .and_then(|v| v.as_u64())
175                    .unwrap_or(0);
176                exchange.set_property("iterations", current + 1);
177                counter.fetch_add(1, Ordering::SeqCst);
178                Ok(exchange)
179            })
180        });
181
182        let config = LoopConfig::new(LoopMode::While(predicate));
183        let mut service = LoopService::new(config, pipeline);
184
185        let exchange = Exchange::new(Message::new("test"));
186        let result = service.ready().await.unwrap().call(exchange).await.unwrap();
187
188        assert_eq!(counter.load(Ordering::SeqCst), 2);
189        assert_eq!(
190            result.property("iterations").and_then(|v| v.as_u64()),
191            Some(2)
192        );
193        assert_eq!(
194            result.property(CAMEL_LOOP_INDEX).and_then(|v| v.as_u64()),
195            Some(1)
196        );
197        assert_eq!(
198            result.property(CAMEL_LOOP_SIZE).and_then(|v| v.as_u64()),
199            Some(0)
200        );
201    }
202
203    #[tokio::test]
204    async fn test_loop_while_respects_max_iterations() {
205        let counter = Arc::new(AtomicUsize::new(0));
206        let predicate = Arc::new(|_exchange: &Exchange| true);
207        let config = LoopConfig::new(LoopMode::While(predicate));
208        let mut service = LoopService::new(config, counter_pipeline(Arc::clone(&counter)));
209
210        let exchange = Exchange::new(Message::new("test"));
211        let result = service.ready().await.unwrap().call(exchange).await;
212
213        assert!(result.is_ok());
214        assert_eq!(counter.load(Ordering::SeqCst), MAX_LOOP_ITERATIONS);
215    }
216
217    #[tokio::test]
218    async fn test_loop_error_propagation() {
219        let pipeline = BoxProcessor::from_fn(|_exchange: Exchange| {
220            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
221        });
222
223        let config = LoopConfig::new(LoopMode::Count(3));
224        let mut service = LoopService::new(config, pipeline);
225
226        let exchange = Exchange::new(Message::new("test"));
227        let result = service.ready().await.unwrap().call(exchange).await;
228
229        assert!(matches!(result, Err(CamelError::ProcessorError(msg)) if msg == "boom"));
230    }
231
232    #[tokio::test]
233    async fn test_loop_pipeline_chaining() {
234        let pipeline = BoxProcessor::from_fn(|mut exchange: Exchange| {
235            Box::pin(async move {
236                if let Body::Text(s) = &exchange.input.body {
237                    exchange.input.body = Body::Text(format!("{s}x"));
238                }
239                Ok(exchange)
240            })
241        });
242
243        let config = LoopConfig::new(LoopMode::Count(3));
244        let mut service = LoopService::new(config, pipeline);
245
246        let exchange = Exchange::new(Message::new("start"));
247        let result = service.ready().await.unwrap().call(exchange).await.unwrap();
248
249        assert_eq!(result.input.body.as_text(), Some("startxxx"));
250    }
251}