camel_processor/
loop_eip.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Service, ServiceExt};
6
7use camel_api::loop_eip::{LoopConfig, LoopMode, MAX_LOOP_ITERATIONS};
8use camel_api::{BoxProcessor, CamelError, Exchange, Value};
9
10pub const CAMEL_LOOP_INDEX: &str = "CamelLoopIndex";
11pub const CAMEL_LOOP_SIZE: &str = "CamelLoopSize";
12
13#[derive(Clone)]
14pub struct LoopService {
15 config: LoopConfig,
16 sub_pipeline: BoxProcessor,
17}
18
19impl LoopService {
20 pub fn new(config: LoopConfig, sub_pipeline: BoxProcessor) -> Self {
21 Self {
22 config,
23 sub_pipeline,
24 }
25 }
26}
27
28impl Service<Exchange> for LoopService {
29 type Response = Exchange;
30 type Error = CamelError;
31 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
32
33 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
34 Poll::Ready(Ok(()))
35 }
36
37 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
38 let config = self.config.clone();
39 let mut pipeline = self.sub_pipeline.clone();
40
41 Box::pin(async move {
42 match config.mode {
43 LoopMode::Count(n) => {
44 exchange.set_property(CAMEL_LOOP_SIZE, Value::from(n as u64));
45 for i in 0..n {
46 exchange.set_property(CAMEL_LOOP_INDEX, Value::from(i as u64));
47 exchange = pipeline.ready().await?.call(exchange).await?;
48 }
49 }
50 LoopMode::While(ref predicate) => {
51 exchange.set_property(CAMEL_LOOP_SIZE, Value::from(0u64));
52 for i in 0..MAX_LOOP_ITERATIONS {
53 if !predicate(&exchange) {
54 break;
55 }
56 exchange.set_property(CAMEL_LOOP_INDEX, Value::from(i as u64));
57 exchange = pipeline.ready().await?.call(exchange).await?;
58 }
59 if predicate(&exchange) {
60 tracing::warn!(
61 "Loop while-mode hit MAX_LOOP_ITERATIONS ({}) safety guard. Predicate still true.",
62 MAX_LOOP_ITERATIONS
63 );
64 }
65 }
66 }
67 Ok(exchange)
68 })
69 }
70}
71
72#[cfg(test)]
73mod tests {
74 use std::sync::atomic::{AtomicUsize, Ordering};
75 use std::sync::{Arc, Mutex};
76
77 use camel_api::loop_eip::{LoopConfig, LoopMode, MAX_LOOP_ITERATIONS};
78 use camel_api::{
79 Body, BoxProcessor, BoxProcessorExt, CamelError, Exchange, IdentityProcessor, Message,
80 };
81 use tower::{Service, ServiceExt};
82
83 use super::{CAMEL_LOOP_INDEX, CAMEL_LOOP_SIZE, LoopService};
84
85 fn identity_pipeline() -> BoxProcessor {
86 BoxProcessor::new(IdentityProcessor)
87 }
88
89 fn counter_pipeline(counter: Arc<AtomicUsize>) -> BoxProcessor {
90 BoxProcessor::from_fn(move |exchange: Exchange| {
91 let counter = Arc::clone(&counter);
92 Box::pin(async move {
93 counter.fetch_add(1, Ordering::SeqCst);
94 Ok(exchange)
95 })
96 })
97 }
98
99 #[tokio::test]
100 async fn test_loop_count_iterates_n_times() {
101 let counter = Arc::new(AtomicUsize::new(0));
102 let config = LoopConfig::new(LoopMode::Count(3));
103 let mut service = LoopService::new(config, counter_pipeline(Arc::clone(&counter)));
104
105 let exchange = Exchange::new(Message::new("test"));
106 let result = service.ready().await.unwrap().call(exchange).await;
107
108 assert!(result.is_ok());
109 assert_eq!(counter.load(Ordering::SeqCst), 3);
110 }
111
112 #[tokio::test]
113 async fn test_loop_count_sets_properties() {
114 let seen_indices = Arc::new(Mutex::new(Vec::<u64>::new()));
115 let seen_indices_for_pipeline = Arc::clone(&seen_indices);
116
117 let pipeline = BoxProcessor::from_fn(move |exchange: Exchange| {
118 let seen_indices = Arc::clone(&seen_indices_for_pipeline);
119 Box::pin(async move {
120 if let Some(index) = exchange.property(CAMEL_LOOP_INDEX).and_then(|v| v.as_u64()) {
121 seen_indices.lock().unwrap().push(index);
122 }
123 Ok(exchange)
124 })
125 });
126
127 let config = LoopConfig::new(LoopMode::Count(3));
128 let mut service = LoopService::new(config, pipeline);
129
130 let exchange = Exchange::new(Message::new("test"));
131 let result = service.ready().await.unwrap().call(exchange).await.unwrap();
132
133 assert_eq!(*seen_indices.lock().unwrap(), vec![0, 1, 2]);
134 assert_eq!(
135 result.property(CAMEL_LOOP_SIZE).and_then(|v| v.as_u64()),
136 Some(3)
137 );
138 }
139
140 #[tokio::test]
141 async fn test_loop_count_zero_is_noop() {
142 let config = LoopConfig::new(LoopMode::Count(0));
143 let mut service = LoopService::new(config, identity_pipeline());
144
145 let exchange = Exchange::new(Message::new("test"));
146 let result = service.ready().await.unwrap().call(exchange).await.unwrap();
147
148 assert_eq!(result.input.body.as_text(), Some("test"));
149 assert_eq!(
150 result.property(CAMEL_LOOP_SIZE).and_then(|v| v.as_u64()),
151 Some(0)
152 );
153 assert!(result.property(CAMEL_LOOP_INDEX).is_none());
154 }
155
156 #[tokio::test]
157 async fn test_loop_while_stops_when_predicate_false() {
158 let counter = Arc::new(AtomicUsize::new(0));
159
160 let predicate = Arc::new(|exchange: &Exchange| {
161 exchange
162 .property("iterations")
163 .and_then(|v| v.as_u64())
164 .unwrap_or(0)
165 < 2
166 });
167
168 let counter_for_pipeline = Arc::clone(&counter);
169 let pipeline = BoxProcessor::from_fn(move |mut exchange: Exchange| {
170 let counter = Arc::clone(&counter_for_pipeline);
171 Box::pin(async move {
172 let current = exchange
173 .property("iterations")
174 .and_then(|v| v.as_u64())
175 .unwrap_or(0);
176 exchange.set_property("iterations", current + 1);
177 counter.fetch_add(1, Ordering::SeqCst);
178 Ok(exchange)
179 })
180 });
181
182 let config = LoopConfig::new(LoopMode::While(predicate));
183 let mut service = LoopService::new(config, pipeline);
184
185 let exchange = Exchange::new(Message::new("test"));
186 let result = service.ready().await.unwrap().call(exchange).await.unwrap();
187
188 assert_eq!(counter.load(Ordering::SeqCst), 2);
189 assert_eq!(
190 result.property("iterations").and_then(|v| v.as_u64()),
191 Some(2)
192 );
193 assert_eq!(
194 result.property(CAMEL_LOOP_INDEX).and_then(|v| v.as_u64()),
195 Some(1)
196 );
197 assert_eq!(
198 result.property(CAMEL_LOOP_SIZE).and_then(|v| v.as_u64()),
199 Some(0)
200 );
201 }
202
203 #[tokio::test]
204 async fn test_loop_while_respects_max_iterations() {
205 let counter = Arc::new(AtomicUsize::new(0));
206 let predicate = Arc::new(|_exchange: &Exchange| true);
207 let config = LoopConfig::new(LoopMode::While(predicate));
208 let mut service = LoopService::new(config, counter_pipeline(Arc::clone(&counter)));
209
210 let exchange = Exchange::new(Message::new("test"));
211 let result = service.ready().await.unwrap().call(exchange).await;
212
213 assert!(result.is_ok());
214 assert_eq!(counter.load(Ordering::SeqCst), MAX_LOOP_ITERATIONS);
215 }
216
217 #[tokio::test]
218 async fn test_loop_error_propagation() {
219 let pipeline = BoxProcessor::from_fn(|_exchange: Exchange| {
220 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
221 });
222
223 let config = LoopConfig::new(LoopMode::Count(3));
224 let mut service = LoopService::new(config, pipeline);
225
226 let exchange = Exchange::new(Message::new("test"));
227 let result = service.ready().await.unwrap().call(exchange).await;
228
229 assert!(matches!(result, Err(CamelError::ProcessorError(msg)) if msg == "boom"));
230 }
231
232 #[tokio::test]
233 async fn test_loop_pipeline_chaining() {
234 let pipeline = BoxProcessor::from_fn(|mut exchange: Exchange| {
235 Box::pin(async move {
236 if let Body::Text(s) = &exchange.input.body {
237 exchange.input.body = Body::Text(format!("{s}x"));
238 }
239 Ok(exchange)
240 })
241 });
242
243 let config = LoopConfig::new(LoopMode::Count(3));
244 let mut service = LoopService::new(config, pipeline);
245
246 let exchange = Exchange::new(Message::new("start"));
247 let result = service.ready().await.unwrap().call(exchange).await.unwrap();
248
249 assert_eq!(result.input.body.as_text(), Some("startxxx"));
250 }
251}