1use camel_api::error_handler::ExceptionDisposition;
6use camel_api::exchange::PROPERTY_EXCEPTION_HANDLED;
7use camel_api::{BoxProcessor, CamelError, Exchange, FilterPredicate};
8use tower::Service;
9use tower::ServiceExt;
10
11#[derive(Clone)]
13pub enum CatchMatcher {
14 ByVariant(Vec<String>),
17 Predicate(FilterPredicate),
19}
20
21impl CatchMatcher {
22 pub fn matches(&self, err: &CamelError, ex: &Exchange) -> bool {
24 match self {
25 CatchMatcher::ByVariant(names) => {
26 if names.iter().any(|n| n == "*") {
27 return true;
28 }
29 names.iter().any(|n| n == err.variant_name())
30 }
31 CatchMatcher::Predicate(p) => p(ex),
32 }
33 }
34}
35
36#[derive(Clone)]
38pub struct CatchClause {
39 pub matcher: CatchMatcher,
41 pub on_when: Option<FilterPredicate>,
43 pub steps: Vec<BoxProcessor>,
45 pub disposition: ExceptionDisposition,
48}
49
50#[derive(Clone)]
52pub struct DoTryService {
53 pub try_steps: Vec<BoxProcessor>,
55 pub catch_clauses: Vec<CatchClause>,
57 pub finally_steps: Vec<BoxProcessor>,
59 pub finally_on_when: Option<FilterPredicate>,
61}
62
63impl DoTryService {
64 pub fn new(try_steps: Vec<BoxProcessor>) -> Self {
66 Self {
67 try_steps,
68 catch_clauses: Vec::new(),
69 finally_steps: Vec::new(),
70 finally_on_when: None,
71 }
72 }
73
74 pub fn with_catch_and_finally(
77 try_steps: Vec<BoxProcessor>,
78 catch_clauses: Vec<CatchClause>,
79 finally_steps: Vec<BoxProcessor>,
80 finally_on_when: Option<FilterPredicate>,
81 ) -> Self {
82 Self {
83 try_steps,
84 catch_clauses,
85 finally_steps,
86 finally_on_when,
87 }
88 }
89}
90
91async fn run_pipeline(
94 steps: Vec<BoxProcessor>,
95 mut ex: Exchange,
96) -> Result<Exchange, (Exchange, CamelError)> {
97 for mut svc in steps {
98 match svc.ready().await {
99 Ok(ready) => {
100 let snapshot = ex.clone();
101 match ready.call(ex).await {
102 Ok(new_ex) => ex = new_ex,
103 Err(err) => return Err((snapshot, err)),
104 }
105 }
106 Err(err) => return Err((ex, err)),
107 }
108 }
109 Ok(ex)
110}
111
112async fn run_finally(
117 finally_steps: Vec<BoxProcessor>,
118 finally_on_when: Option<FilterPredicate>,
119 ex: Exchange,
120 previous_err: Option<CamelError>,
121) -> Result<Exchange, CamelError> {
122 if finally_steps.is_empty() {
123 return Ok(ex);
124 }
125 if let Some(on_when) = &finally_on_when
126 && !on_when(&ex)
127 {
128 return Ok(ex);
129 }
130 match run_pipeline(finally_steps, ex).await {
131 Ok(ex) => Ok(ex),
132 Err((_, finally_err)) => match previous_err {
133 Some(prev) => {
134 tracing::warn!(
135 finally_error = %finally_err,
136 previous_error = %prev,
137 "doFinally threw; restoring previous exception (Camel parity)"
138 );
139 Err(prev)
140 }
141 None => {
142 tracing::warn!(error = %finally_err, "doFinally threw");
143 Err(finally_err)
144 }
145 },
146 }
147}
148
149impl tower::Service<Exchange> for DoTryService {
150 type Response = Exchange;
151 type Error = CamelError;
152 type Future = std::pin::Pin<
153 Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
154 >;
155
156 fn poll_ready(
157 &mut self,
158 _cx: &mut std::task::Context<'_>,
159 ) -> std::task::Poll<Result<(), Self::Error>> {
160 std::task::Poll::Ready(Ok(()))
161 }
162
163 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
164 exchange.properties.remove(PROPERTY_EXCEPTION_HANDLED);
167
168 let try_steps = self.try_steps.clone();
169 let catch_clauses = self.catch_clauses.clone();
170 let finally_steps = self.finally_steps.clone();
171 let finally_on_when = self.finally_on_when.clone();
172
173 Box::pin(async move {
174 let try_result = run_pipeline(try_steps, exchange).await;
175 match try_result {
176 Ok(ex) => run_finally(finally_steps, finally_on_when, ex, None).await,
177 Err((failed_ex, original_err)) => {
178 let mut ex = failed_ex;
179 ex.set_error(original_err.clone());
180
181 for clause in catch_clauses {
182 let CatchClause {
183 matcher,
184 on_when,
185 steps,
186 disposition,
187 } = clause;
188 if !matcher.matches(&original_err, &ex) {
189 continue;
190 }
191 if let Some(ref on_when) = on_when
192 && !on_when(&ex)
193 {
194 continue;
195 }
196
197 let catch_result = run_pipeline(steps, ex.clone()).await;
198
199 return match catch_result {
200 Ok(ok_ex) => {
201 let prev = match disposition {
213 ExceptionDisposition::Handled => None,
214 ExceptionDisposition::Propagate => Some(original_err.clone()),
215 ExceptionDisposition::Continued => {
216 tracing::warn!(
217 "ExceptionDisposition::Continued reached doTry runtime; \
218 treating as Propagate. Should have been rejected at parse time."
219 );
220 Some(original_err.clone())
221 }
222 };
223 let mut ex = run_finally(
224 finally_steps.clone(),
225 finally_on_when.clone(),
226 ok_ex,
227 prev,
228 )
229 .await?;
230 if matches!(disposition, ExceptionDisposition::Handled) {
234 ex.handle_error();
235 }
236 match disposition {
237 ExceptionDisposition::Handled => Ok(ex),
238 _ => Err(original_err),
239 }
240 }
241 Err((catch_ex, catch_err)) => {
242 let _ex = run_finally(
245 finally_steps.clone(),
246 finally_on_when.clone(),
247 catch_ex,
248 Some(catch_err.clone()),
249 )
250 .await?;
251 Err(catch_err)
252 }
253 };
254 }
255
256 let _ex = run_finally(
258 finally_steps,
259 finally_on_when,
260 ex,
261 Some(original_err.clone()),
262 )
263 .await?;
264 Err(original_err)
265 }
266 }
267 })
268 }
269}
270
271#[cfg(test)]
278mod tests {
279 use super::*;
280 use camel_api::{BoxProcessor, BoxProcessorExt};
281 use std::sync::Arc;
282 use std::sync::atomic::{AtomicU32, Ordering};
283
284 fn passthrough() -> BoxProcessor {
285 BoxProcessor::from_fn(move |ex| Box::pin(async move { Ok(ex) }))
286 }
287
288 fn record_call(flag: Arc<AtomicU32>) -> BoxProcessor {
289 BoxProcessor::from_fn(move |ex| {
290 let f = flag.clone();
291 Box::pin(async move {
292 f.fetch_add(1, Ordering::SeqCst);
293 Ok(ex)
294 })
295 })
296 }
297
298 fn always_fail(err: CamelError) -> BoxProcessor {
299 BoxProcessor::from_fn(move |_ex| {
300 let e = err.clone();
301 Box::pin(async move { Err(e) })
302 })
303 }
304
305 #[tokio::test]
306 async fn happy_path_try_succeeds_finally_runs() {
307 let finally_flag = Arc::new(AtomicU32::new(0));
308 let mut svc = DoTryService::new(vec![passthrough()]);
309 svc.finally_steps = vec![record_call(finally_flag.clone())];
310
311 let mut boxed = BoxProcessor::new(svc);
312 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
313 assert!(result.is_ok());
314 assert_eq!(finally_flag.load(Ordering::SeqCst), 1);
315 }
316
317 #[tokio::test]
318 async fn catch_by_variant_handled_returns_ok() {
319 let try_step = always_fail(CamelError::ProcessorError("boom".into()));
320 let mut svc = DoTryService::new(vec![try_step]);
321 svc.catch_clauses.push(CatchClause {
322 matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
323 on_when: None,
324 steps: vec![passthrough()],
325 disposition: ExceptionDisposition::Handled,
326 });
327
328 let mut boxed = BoxProcessor::new(svc);
329 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
330 assert!(result.is_ok(), "Handled must return Ok");
331 let ex = result.unwrap();
332 assert_eq!(
333 ex.properties.get(PROPERTY_EXCEPTION_HANDLED),
334 Some(&camel_api::Value::Bool(true)),
335 "CamelExceptionHandled must be set via handle_error()"
336 );
337 }
338
339 #[tokio::test]
340 async fn catch_by_variant_propagate_runs_side_effects_and_rethrows() {
341 let original = CamelError::ProcessorError("boom".into());
342 let try_step = always_fail(original.clone());
343 let side_effect = Arc::new(AtomicU32::new(0));
344 let catch_step = record_call(side_effect.clone());
345 let mut svc = DoTryService::new(vec![try_step]);
346 svc.catch_clauses.push(CatchClause {
347 matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
348 on_when: None,
349 steps: vec![catch_step],
350 disposition: ExceptionDisposition::Propagate,
351 });
352
353 let mut boxed = BoxProcessor::new(svc);
354 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
355 assert!(result.is_err(), "Propagate must rethrow original");
356 assert!(matches!(result.unwrap_err(), CamelError::ProcessorError(_)));
357 assert_eq!(
358 side_effect.load(Ordering::SeqCst),
359 1,
360 "catch branch must have run for side-effects"
361 );
362 }
363
364 #[tokio::test]
365 async fn catch_by_predicate_matches_via_exception_kind() {
366 let try_step = always_fail(CamelError::Io("disk full".into()));
367 let predicate: FilterPredicate = Arc::new(|ex: &Exchange| {
368 ex.properties
369 .get(camel_api::exchange::PROPERTY_EXCEPTION_KIND)
370 .map(|v| matches!(v, camel_api::Value::String(s) if s == "io"))
371 .unwrap_or(false)
372 });
373 let mut svc = DoTryService::new(vec![try_step]);
374 svc.catch_clauses.push(CatchClause {
375 matcher: CatchMatcher::Predicate(predicate),
376 on_when: None,
377 steps: vec![passthrough()],
378 disposition: ExceptionDisposition::Handled,
379 });
380
381 let mut boxed = BoxProcessor::new(svc);
382 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
383 assert!(
384 result.is_ok(),
385 "Predicate matcher must catch the error and Handled must return Ok"
386 );
387 }
388
389 #[tokio::test]
390 async fn on_when_filters_clause_and_next_evaluated() {
391 let try_step = always_fail(CamelError::ProcessorError("boom".into()));
392 let first_call = Arc::new(AtomicU32::new(0));
393 let second_call = Arc::new(AtomicU32::new(0));
394
395 let mut svc = DoTryService::new(vec![try_step]);
396 svc.catch_clauses.push(CatchClause {
397 matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
398 on_when: Some(Arc::new(|_ex| false)),
399 steps: vec![record_call(first_call.clone())],
400 disposition: ExceptionDisposition::Handled,
401 });
402 svc.catch_clauses.push(CatchClause {
403 matcher: CatchMatcher::ByVariant(vec!["*".into()]),
404 on_when: None,
405 steps: vec![record_call(second_call.clone())],
406 disposition: ExceptionDisposition::Handled,
407 });
408
409 let mut boxed = BoxProcessor::new(svc);
410 let _ = boxed.ready().await.unwrap().call(Exchange::default()).await;
411 assert_eq!(first_call.load(Ordering::SeqCst), 0);
412 assert_eq!(second_call.load(Ordering::SeqCst), 1);
413 }
414
415 #[tokio::test]
416 async fn first_match_wins_subsequent_clauses_not_evaluated() {
417 let try_step = always_fail(CamelError::Io("err".into()));
418 let first_call = Arc::new(AtomicU32::new(0));
419 let second_call = Arc::new(AtomicU32::new(0));
420
421 let mut svc = DoTryService::new(vec![try_step]);
422 svc.catch_clauses.push(CatchClause {
423 matcher: CatchMatcher::ByVariant(vec!["Io".into()]),
424 on_when: None,
425 steps: vec![record_call(first_call.clone())],
426 disposition: ExceptionDisposition::Handled,
427 });
428 svc.catch_clauses.push(CatchClause {
429 matcher: CatchMatcher::ByVariant(vec!["*".into()]),
430 on_when: None,
431 steps: vec![record_call(second_call.clone())],
432 disposition: ExceptionDisposition::Handled,
433 });
434
435 let mut boxed = BoxProcessor::new(svc);
436 let _ = boxed.ready().await.unwrap().call(Exchange::default()).await;
437 assert_eq!(first_call.load(Ordering::SeqCst), 1);
438 assert_eq!(second_call.load(Ordering::SeqCst), 0);
439 }
440
441 #[tokio::test]
442 async fn no_clause_matches_propagates_original() {
443 let try_step = always_fail(CamelError::CircuitOpen("cb".into()));
444 let mut svc = DoTryService::new(vec![try_step]);
445 svc.catch_clauses.push(CatchClause {
446 matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
447 on_when: None,
448 steps: vec![passthrough()],
449 disposition: ExceptionDisposition::Handled,
450 });
451
452 let mut boxed = BoxProcessor::new(svc);
453 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
454 assert!(result.is_err());
455 assert!(matches!(result.unwrap_err(), CamelError::CircuitOpen(_)));
456 }
457
458 #[tokio::test]
459 async fn catch_branch_throws_new_error_wins() {
460 let try_step = always_fail(CamelError::ProcessorError("orig".into()));
461 let catch_step = always_fail(CamelError::Io("catch-fail".into()));
462 let mut svc = DoTryService::new(vec![try_step]);
463 svc.catch_clauses.push(CatchClause {
464 matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
465 on_when: None,
466 steps: vec![catch_step],
467 disposition: ExceptionDisposition::Handled,
468 });
469
470 let mut boxed = BoxProcessor::new(svc);
471 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
472 assert!(result.is_err());
473 assert!(matches!(result.unwrap_err(), CamelError::Io(_)));
474 }
475
476 #[tokio::test]
477 async fn finally_throws_with_no_previous_error_propagates_finally_error() {
478 let finally_step = always_fail(CamelError::Config("fin".into()));
479 let mut svc = DoTryService::new(vec![passthrough()]);
480 svc.finally_steps = vec![finally_step];
481
482 let mut boxed = BoxProcessor::new(svc);
483 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
484 assert!(result.is_err());
485 assert!(matches!(result.unwrap_err(), CamelError::Config(_)));
486 }
487
488 #[tokio::test]
489 async fn finally_throws_with_previous_error_restores_previous() {
490 let try_step = always_fail(CamelError::ProcessorError("orig".into()));
491 let finally_step = always_fail(CamelError::Config("fin".into()));
492 let mut svc = DoTryService::new(vec![try_step]);
493 svc.finally_steps = vec![finally_step];
494
495 let mut boxed = BoxProcessor::new(svc);
496 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
497 assert!(result.is_err());
498 assert!(
499 matches!(result.unwrap_err(), CamelError::ProcessorError(_)),
500 "previous error must be restored when finally throws (Camel parity)"
501 );
502 }
503
504 #[tokio::test]
505 async fn finally_on_when_false_skips_finally() {
506 let finally_call = Arc::new(AtomicU32::new(0));
507 let mut svc = DoTryService::new(vec![passthrough()]);
508 svc.finally_steps = vec![record_call(finally_call.clone())];
509 svc.finally_on_when = Some(Arc::new(|_ex| false));
510
511 let mut boxed = BoxProcessor::new(svc);
512 let _ = boxed.ready().await.unwrap().call(Exchange::default()).await;
513 assert_eq!(finally_call.load(Ordering::SeqCst), 0);
514 }
515
516 #[tokio::test]
517 async fn stale_handled_marker_cleared_on_entry() {
518 let mut ex = Exchange::default();
519 ex.set_property(PROPERTY_EXCEPTION_HANDLED, camel_api::Value::Bool(true));
520 let svc = DoTryService::new(vec![passthrough()]);
521 let mut boxed = BoxProcessor::new(svc);
522 let result = boxed.ready().await.unwrap().call(ex).await;
523 let ex = result.unwrap();
524 assert!(
525 !ex.properties.contains_key(PROPERTY_EXCEPTION_HANDLED),
526 "stale CamelExceptionHandled must be cleared on entry"
527 );
528 }
529
530 #[tokio::test]
531 async fn nested_do_try_inner_catch_does_not_leak_to_outer() {
532 let inner = {
533 let try_step = always_fail(CamelError::Io("inner".into()));
534 let mut d = DoTryService::new(vec![try_step]);
535 d.catch_clauses.push(CatchClause {
536 matcher: CatchMatcher::ByVariant(vec!["Io".into()]),
537 on_when: None,
538 steps: vec![passthrough()],
539 disposition: ExceptionDisposition::Handled,
540 });
541 BoxProcessor::new(d)
542 };
543 let mut outer = DoTryService::new(vec![inner]);
544 outer.catch_clauses.push(CatchClause {
545 matcher: CatchMatcher::ByVariant(vec!["Io".into()]),
546 on_when: None,
547 steps: vec![passthrough()],
548 disposition: ExceptionDisposition::Handled,
549 });
550
551 let mut boxed = BoxProcessor::new(outer);
552 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
553 assert!(
554 result.is_ok(),
555 "outer must see Ok because inner handled its own error"
556 );
557 }
558
559 #[tokio::test]
560 async fn catch_all_only_fires_when_no_specific_clause_matches() {
561 let try_step = always_fail(CamelError::Io("err".into()));
562 let processor_call = Arc::new(AtomicU32::new(0));
563 let catch_all_call = Arc::new(AtomicU32::new(0));
564
565 let mut svc = DoTryService::new(vec![try_step]);
566 svc.catch_clauses.push(CatchClause {
568 matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
569 on_when: None,
570 steps: vec![record_call(processor_call.clone())],
571 disposition: ExceptionDisposition::Handled,
572 });
573 svc.catch_clauses.push(CatchClause {
575 matcher: CatchMatcher::ByVariant(vec!["*".into()]),
576 on_when: None,
577 steps: vec![record_call(catch_all_call.clone())],
578 disposition: ExceptionDisposition::Handled,
579 });
580
581 let mut boxed = BoxProcessor::new(svc);
582 let _ = boxed.ready().await.unwrap().call(Exchange::default()).await;
583 assert_eq!(
584 processor_call.load(Ordering::SeqCst),
585 0,
586 "specific ProcessorError clause must not fire on Io error"
587 );
588 assert_eq!(
589 catch_all_call.load(Ordering::SeqCst),
590 1,
591 "catch-all clause must fire when no specific clause matches"
592 );
593 }
594
595 #[tokio::test]
596 async fn catch_throws_with_finally_runs_finally_and_propagates_catch_err() {
597 let try_step = always_fail(CamelError::ProcessorError("orig".into()));
598 let catch_step = always_fail(CamelError::Io("catch-fail".into()));
599 let finally_flag = Arc::new(AtomicU32::new(0));
600 let finally_step = record_call(finally_flag.clone());
601
602 let mut svc = DoTryService::new(vec![try_step]);
603 svc.catch_clauses.push(CatchClause {
604 matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
605 on_when: None,
606 steps: vec![catch_step],
607 disposition: ExceptionDisposition::Handled,
608 });
609 svc.finally_steps = vec![finally_step];
610
611 let mut boxed = BoxProcessor::new(svc);
612 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
613
614 assert!(result.is_err());
615 assert!(
616 matches!(result.unwrap_err(), CamelError::Io(_)),
617 "catch_err must propagate (not original ProcessorError)"
618 );
619 assert_eq!(
620 finally_flag.load(Ordering::SeqCst),
621 1,
622 "doFinally must run even when catch throws"
623 );
624 }
625
626 #[tokio::test]
627 async fn catch_throws_and_finally_throws_restores_catch_err() {
628 let try_step = always_fail(CamelError::ProcessorError("orig".into()));
629 let catch_step = always_fail(CamelError::Io("catch-fail".into()));
630 let finally_step = always_fail(CamelError::Config("fin-fail".into()));
631
632 let mut svc = DoTryService::new(vec![try_step]);
633 svc.catch_clauses.push(CatchClause {
634 matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
635 on_when: None,
636 steps: vec![catch_step],
637 disposition: ExceptionDisposition::Handled,
638 });
639 svc.finally_steps = vec![finally_step];
640
641 let mut boxed = BoxProcessor::new(svc);
642 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
643
644 assert!(result.is_err());
645 assert!(
646 matches!(result.unwrap_err(), CamelError::Io(_)),
647 "catch_err (Io) must be restored over finally_err (Config) per Camel parity"
648 );
649 }
650
651 #[tokio::test]
652 async fn finally_on_when_false_with_previous_error_still_propagates_original() {
653 let try_step = always_fail(CamelError::ProcessorError("orig".into()));
654 let finally_flag = Arc::new(AtomicU32::new(0));
655 let finally_step = record_call(finally_flag.clone());
656
657 let mut svc = DoTryService::new(vec![try_step]);
658 svc.finally_steps = vec![finally_step];
660 svc.finally_on_when = Some(Arc::new(|_ex| false));
661
662 let mut boxed = BoxProcessor::new(svc);
663 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
664
665 assert!(result.is_err());
666 assert!(
667 matches!(result.unwrap_err(), CamelError::ProcessorError(_)),
668 "original error must propagate even when finally_on_when skips finally"
669 );
670 assert_eq!(
671 finally_flag.load(Ordering::SeqCst),
672 0,
673 "doFinally must NOT run when on_when returns false"
674 );
675 }
676}