1use crate::do_try::CatchMatcher;
7use camel_api::error_handler::ExceptionDisposition;
8use camel_api::outcome_pipeline::OutcomePipeline;
9use camel_api::pipeline_outcome::PipelineOutcome;
10use camel_api::{CamelError, Exchange, FilterPredicate};
11use std::future::Future;
12use std::pin::Pin;
13
14#[derive(Clone)]
18pub struct CatchClauseSegment {
19 pub matcher: CatchMatcher,
20 pub on_when: Option<FilterPredicate>,
21 pub body: camel_api::OutcomeSegment,
22 pub disposition: ExceptionDisposition,
23}
24
25#[derive(Clone)]
27pub struct FinallyClauseSegment {
28 pub on_when: Option<FilterPredicate>,
29 pub body: camel_api::OutcomeSegment,
30}
31
32pub struct DoTrySegment {
38 pub try_body: camel_api::OutcomeSegment,
39 pub catches: Vec<CatchClauseSegment>,
40 pub finally: Option<FinallyClauseSegment>,
41}
42
43impl Clone for DoTrySegment {
44 fn clone(&self) -> Self {
45 Self {
46 try_body: self.try_body.clone(),
47 catches: self.catches.clone(),
48 finally: self.finally.clone(),
49 }
50 }
51}
52
53#[allow(clippy::large_enum_variant)]
60enum FinallyOutcome {
61 Stopped(Exchange),
62 Failed(CamelError),
63}
64
65async fn run_finally_body(
69 finally: &mut Option<FinallyClauseSegment>,
70 ex: Exchange,
71) -> Result<Exchange, FinallyOutcome> {
72 let Some(f) = finally.as_mut() else {
73 return Ok(ex);
74 };
75 if !f.on_when.as_ref().map(|p| p(&ex)).unwrap_or(true) {
76 return Ok(ex);
77 }
78 match f.body.run(ex).await {
79 PipelineOutcome::Completed(e) => Ok(e),
80 PipelineOutcome::Stopped(e) => Err(FinallyOutcome::Stopped(e)),
81 PipelineOutcome::Failed(e) => Err(FinallyOutcome::Failed(e)),
82 }
83}
84
85impl OutcomePipeline for DoTrySegment {
86 fn clone_box(&self) -> Box<dyn OutcomePipeline> {
87 Box::new(self.clone())
88 }
89
90 fn run<'a>(
91 &'a mut self,
92 exchange: Exchange,
93 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
94 Box::pin(async move {
95 let exchange_for_unmatched = exchange.clone();
98
99 let try_outcome = self.try_body.run(exchange).await;
101 let returned_ex = match try_outcome {
102 PipelineOutcome::Stopped(ex) => return PipelineOutcome::Stopped(ex),
103 PipelineOutcome::Completed(ex) => ex,
104 PipelineOutcome::Failed(err) => {
105 let mut current_ex = exchange_for_unmatched;
109 current_ex.set_error(err.clone());
110 for catch in self.catches.iter_mut() {
111 if catch.matcher.matches(&err, ¤t_ex)
112 && catch
113 .on_when
114 .as_ref()
115 .map(|p| p(¤t_ex))
116 .unwrap_or(true)
117 {
118 match catch.body.run(current_ex).await {
119 PipelineOutcome::Stopped(stopped_ex) => {
121 return PipelineOutcome::Stopped(stopped_ex);
122 }
123 PipelineOutcome::Completed(next) => {
125 match catch.disposition {
126 ExceptionDisposition::Propagate => {
127 match run_finally_body(&mut self.finally, next).await {
130 Ok(_) => {}
131 Err(FinallyOutcome::Stopped(e)) => {
132 return PipelineOutcome::Stopped(e);
133 }
134 Err(FinallyOutcome::Failed(_finally_err)) => {
135 tracing::warn!(
136 error = %err,
137 "doFinally threw during Propagate; \
138 restoring original"
139 );
140 return PipelineOutcome::Failed(err);
141 }
142 }
143 return PipelineOutcome::Failed(err);
144 }
145 ExceptionDisposition::Handled => {
146 current_ex = next;
147 break;
148 }
149 ExceptionDisposition::Continued => {
150 current_ex = next;
151 break;
152 }
153 }
154 }
155 PipelineOutcome::Failed(catch_err) => {
162 return PipelineOutcome::Failed(catch_err);
163 }
164 }
165 }
166 }
167 current_ex
168 }
169 };
170 match run_finally_body(&mut self.finally, returned_ex).await {
173 Ok(ex) => PipelineOutcome::Completed(ex),
174 Err(FinallyOutcome::Stopped(e)) => PipelineOutcome::Stopped(e),
175 Err(FinallyOutcome::Failed(finally_err)) => {
176 tracing::warn!(
177 error = %finally_err,
178 "doFinally threw during/after catch; surfacing finally error"
179 );
180 PipelineOutcome::Failed(finally_err)
181 }
182 }
183 })
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190 use crate::do_try::CatchMatcher;
191 use camel_api::pipeline_outcome::PipelineOutcome;
192 use std::sync::Arc;
193 use std::sync::atomic::{AtomicU32, Ordering};
194
195 struct CompleteSegment;
198
199 impl OutcomePipeline for CompleteSegment {
200 fn clone_box(&self) -> Box<dyn OutcomePipeline> {
201 Box::new(CompleteSegment)
202 }
203 fn run<'a>(
204 &'a mut self,
205 exchange: Exchange,
206 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
207 Box::pin(async move { PipelineOutcome::Completed(exchange) })
208 }
209 }
210
211 fn seg_complete() -> camel_api::OutcomeSegment {
212 camel_api::OutcomeSegment::new(Box::new(CompleteSegment))
213 }
214
215 struct FailSegment(CamelError);
216
217 impl OutcomePipeline for FailSegment {
218 fn clone_box(&self) -> Box<dyn OutcomePipeline> {
219 Box::new(FailSegment(self.0.clone()))
220 }
221 fn run<'a>(
222 &'a mut self,
223 _exchange: Exchange,
224 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
225 let e = self.0.clone();
226 Box::pin(async move { PipelineOutcome::Failed(e) })
227 }
228 }
229
230 fn seg_fail(err: CamelError) -> camel_api::OutcomeSegment {
231 camel_api::OutcomeSegment::new(Box::new(FailSegment(err)))
232 }
233
234 struct MutateThenStop {
235 mutator: Arc<dyn Fn(&mut Exchange) + Send + Sync>,
236 }
237
238 impl OutcomePipeline for MutateThenStop {
239 fn clone_box(&self) -> Box<dyn OutcomePipeline> {
240 Box::new(MutateThenStop {
241 mutator: Arc::clone(&self.mutator),
242 })
243 }
244 fn run<'a>(
245 &'a mut self,
246 mut exchange: Exchange,
247 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
248 let m = Arc::clone(&self.mutator);
249 Box::pin(async move {
250 m(&mut exchange);
251 PipelineOutcome::Stopped(exchange)
252 })
253 }
254 }
255
256 fn seg_stop_with(
257 mutator: impl Fn(&mut Exchange) + Send + Sync + 'static,
258 ) -> camel_api::OutcomeSegment {
259 camel_api::OutcomeSegment::new(Box::new(MutateThenStop {
260 mutator: Arc::new(mutator),
261 }))
262 }
263
264 struct RecordCall {
265 counter: Arc<AtomicU32>,
266 }
267
268 impl OutcomePipeline for RecordCall {
269 fn clone_box(&self) -> Box<dyn OutcomePipeline> {
270 Box::new(RecordCall {
271 counter: Arc::clone(&self.counter),
272 })
273 }
274 fn run<'a>(
275 &'a mut self,
276 exchange: Exchange,
277 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
278 let c = Arc::clone(&self.counter);
279 Box::pin(async move {
280 c.fetch_add(1, Ordering::SeqCst);
281 PipelineOutcome::Completed(exchange)
282 })
283 }
284 }
285
286 fn seg_record(counter: Arc<AtomicU32>) -> camel_api::OutcomeSegment {
287 camel_api::OutcomeSegment::new(Box::new(RecordCall { counter }))
288 }
289
290 #[tokio::test]
293 async fn stop_inside_try_skips_catch_and_finally() {
294 let catch_call = Arc::new(AtomicU32::new(0));
295 let finally_call = Arc::new(AtomicU32::new(0));
296
297 let mut seg = DoTrySegment {
298 try_body: seg_stop_with(|ex| {
299 ex.set_property("mutated", camel_api::Value::Bool(true));
300 }),
301 catches: vec![CatchClauseSegment {
302 matcher: CatchMatcher::ByVariant(vec!["*".into()]),
303 on_when: None,
304 body: seg_record(catch_call.clone()),
305 disposition: ExceptionDisposition::Handled,
306 }],
307 finally: Some(FinallyClauseSegment {
308 on_when: None,
309 body: seg_record(finally_call.clone()),
310 }),
311 };
312
313 let result = seg.run(Exchange::default()).await;
314 match result {
315 PipelineOutcome::Stopped(ex) => {
316 assert_eq!(
317 ex.properties.get("mutated"),
318 Some(&camel_api::Value::Bool(true)),
319 "try body mutation must be preserved in Stopped exchange"
320 );
321 }
322 other => panic!("expected Stopped, got {:?}", other),
323 }
324 assert_eq!(
325 catch_call.load(Ordering::SeqCst),
326 0,
327 "catch must NOT run when try stops"
328 );
329 assert_eq!(
330 finally_call.load(Ordering::SeqCst),
331 0,
332 "finally must NOT run when try stops"
333 );
334 }
335
336 #[tokio::test]
337 async fn stop_inside_catch_skips_finally() {
338 let finally_call = Arc::new(AtomicU32::new(0));
339
340 let mut seg = DoTrySegment {
341 try_body: seg_fail(CamelError::ProcessorError("boom".into())),
342 catches: vec![CatchClauseSegment {
343 matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
344 on_when: None,
345 body: seg_stop_with(|ex| {
346 ex.set_property("catch_mutated", camel_api::Value::Bool(true));
347 }),
348 disposition: ExceptionDisposition::Handled,
349 }],
350 finally: Some(FinallyClauseSegment {
351 on_when: None,
352 body: seg_record(finally_call.clone()),
353 }),
354 };
355
356 let result = seg.run(Exchange::default()).await;
357 match result {
358 PipelineOutcome::Stopped(ex) => {
359 assert_eq!(
360 ex.properties.get("catch_mutated"),
361 Some(&camel_api::Value::Bool(true)),
362 "catch body mutation must be preserved in Stopped exchange"
363 );
364 }
365 other => panic!("expected Stopped, got {:?}", other),
366 }
367 assert_eq!(
368 finally_call.load(Ordering::SeqCst),
369 0,
370 "finally must NOT run when catch stops"
371 );
372 }
373
374 #[tokio::test]
375 async fn stop_inside_finally_stops_outer_route() {
376 let mut seg = DoTrySegment {
377 try_body: seg_complete(),
378 catches: vec![],
379 finally: Some(FinallyClauseSegment {
380 on_when: None,
381 body: seg_stop_with(|ex| {
382 ex.set_property("finally_mutated", camel_api::Value::Bool(true));
383 }),
384 }),
385 };
386
387 let result = seg.run(Exchange::default()).await;
388 match result {
389 PipelineOutcome::Stopped(ex) => {
390 assert_eq!(
391 ex.properties.get("finally_mutated"),
392 Some(&camel_api::Value::Bool(true)),
393 "finally body mutation must be preserved in Stopped exchange"
394 );
395 }
396 other => panic!("expected Stopped, got {:?}", other),
397 }
398 }
399
400 #[tokio::test]
401 async fn catch_on_when_false_falls_through_to_next_catch() {
402 let first_call = Arc::new(AtomicU32::new(0));
403 let second_call = Arc::new(AtomicU32::new(0));
404
405 let mut seg = DoTrySegment {
406 try_body: seg_fail(CamelError::Io("disk err".into())),
407 catches: vec![
408 CatchClauseSegment {
409 matcher: CatchMatcher::ByVariant(vec!["Io".into()]),
410 on_when: Some(Arc::new(|_ex| false)),
411 body: seg_record(first_call.clone()),
412 disposition: ExceptionDisposition::Handled,
413 },
414 CatchClauseSegment {
415 matcher: CatchMatcher::ByVariant(vec!["*".into()]),
416 on_when: None,
417 body: seg_record(second_call.clone()),
418 disposition: ExceptionDisposition::Handled,
419 },
420 ],
421 finally: None,
422 };
423
424 let result = seg.run(Exchange::default()).await;
425 assert!(
426 matches!(result, PipelineOutcome::Completed(_)),
427 "expected Completed after second catch"
428 );
429 assert_eq!(
430 first_call.load(Ordering::SeqCst),
431 0,
432 "first catch must NOT fire (on_when=false)"
433 );
434 assert_eq!(
435 second_call.load(Ordering::SeqCst),
436 1,
437 "second catch must fire"
438 );
439 }
440
441 #[tokio::test]
442 async fn finally_on_when_false_skips_finally_entirely() {
443 let finally_call = Arc::new(AtomicU32::new(0));
444 let mut ex = Exchange::default();
445 ex.set_property("try_set", camel_api::Value::Bool(true));
446
447 let mut seg = DoTrySegment {
448 try_body: seg_complete(),
449 catches: vec![],
450 finally: Some(FinallyClauseSegment {
451 on_when: Some(Arc::new(|_ex| false)),
452 body: seg_record(finally_call.clone()),
453 }),
454 };
455
456 let result = seg.run(ex).await;
457 match result {
458 PipelineOutcome::Completed(ex) => {
459 assert_eq!(
460 ex.properties.get("try_set"),
461 Some(&camel_api::Value::Bool(true)),
462 "exchange state from try must be preserved"
463 );
464 }
465 other => panic!("expected Completed, got {:?}", other),
466 }
467 assert_eq!(
468 finally_call.load(Ordering::SeqCst),
469 0,
470 "finally must NOT run when on_when=false"
471 );
472 }
473}