1use camel_api::error_handler::ExceptionDisposition;
2use camel_api::exchange::PROPERTY_EXCEPTION_HANDLED;
3use camel_api::{BoxProcessor, CamelError, Exchange, FilterPredicate};
4use tower::Service;
5use tower::ServiceExt;
6
7#[derive(Clone)]
9pub enum CatchMatcher {
10 ByVariant(Vec<String>),
13 Predicate(FilterPredicate),
15}
16
17impl CatchMatcher {
18 pub fn matches(&self, err: &CamelError, ex: &Exchange) -> bool {
20 match self {
21 CatchMatcher::ByVariant(names) => {
22 if names.iter().any(|n| n == "*") {
23 return true;
24 }
25 names.iter().any(|n| n == err.variant_name())
26 }
27 CatchMatcher::Predicate(p) => p(ex),
28 }
29 }
30}
31
32#[derive(Clone)]
34pub struct CatchClause {
35 pub matcher: CatchMatcher,
37 pub on_when: Option<FilterPredicate>,
39 pub steps: Vec<BoxProcessor>,
41 pub disposition: ExceptionDisposition,
44}
45
46#[derive(Clone)]
48pub struct DoTryService {
49 pub try_steps: Vec<BoxProcessor>,
51 pub catch_clauses: Vec<CatchClause>,
53 pub finally_steps: Vec<BoxProcessor>,
55 pub finally_on_when: Option<FilterPredicate>,
57}
58
59impl DoTryService {
60 pub fn new(try_steps: Vec<BoxProcessor>) -> Self {
62 Self {
63 try_steps,
64 catch_clauses: Vec::new(),
65 finally_steps: Vec::new(),
66 finally_on_when: None,
67 }
68 }
69
70 pub fn with_catch_and_finally(
73 try_steps: Vec<BoxProcessor>,
74 catch_clauses: Vec<CatchClause>,
75 finally_steps: Vec<BoxProcessor>,
76 finally_on_when: Option<FilterPredicate>,
77 ) -> Self {
78 Self {
79 try_steps,
80 catch_clauses,
81 finally_steps,
82 finally_on_when,
83 }
84 }
85}
86
87async fn run_pipeline(
90 steps: Vec<BoxProcessor>,
91 mut ex: Exchange,
92) -> Result<Exchange, (Exchange, CamelError)> {
93 for mut svc in steps {
94 match svc.ready().await {
95 Ok(ready) => {
96 let snapshot = ex.clone();
97 match ready.call(ex).await {
98 Ok(new_ex) => ex = new_ex,
99 Err(err) => return Err((snapshot, err)),
100 }
101 }
102 Err(err) => return Err((ex, err)),
103 }
104 }
105 Ok(ex)
106}
107
108async fn run_finally(
113 finally_steps: Vec<BoxProcessor>,
114 finally_on_when: Option<FilterPredicate>,
115 ex: Exchange,
116 previous_err: Option<CamelError>,
117) -> Result<Exchange, CamelError> {
118 if finally_steps.is_empty() {
119 return Ok(ex);
120 }
121 if let Some(on_when) = &finally_on_when
122 && !on_when(&ex)
123 {
124 return Ok(ex);
125 }
126 match run_pipeline(finally_steps, ex).await {
127 Ok(ex) => Ok(ex),
128 Err((_, finally_err)) => match previous_err {
129 Some(prev) => {
130 tracing::warn!(
131 finally_error = %finally_err,
132 previous_error = %prev,
133 "doFinally threw; restoring previous exception (Camel parity)"
134 );
135 Err(prev)
136 }
137 None => {
138 tracing::warn!(error = %finally_err, "doFinally threw");
139 Err(finally_err)
140 }
141 },
142 }
143}
144
145impl tower::Service<Exchange> for DoTryService {
146 type Response = Exchange;
147 type Error = CamelError;
148 type Future = std::pin::Pin<
149 Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
150 >;
151
152 fn poll_ready(
153 &mut self,
154 _cx: &mut std::task::Context<'_>,
155 ) -> std::task::Poll<Result<(), Self::Error>> {
156 std::task::Poll::Ready(Ok(()))
157 }
158
159 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
160 exchange.properties.remove(PROPERTY_EXCEPTION_HANDLED);
163
164 let try_steps = self.try_steps.clone();
165 let catch_clauses = self.catch_clauses.clone();
166 let finally_steps = self.finally_steps.clone();
167 let finally_on_when = self.finally_on_when.clone();
168
169 Box::pin(async move {
170 let try_result = run_pipeline(try_steps, exchange).await;
171 match try_result {
172 Ok(ex) => run_finally(finally_steps, finally_on_when, ex, None).await,
173 Err((failed_ex, original_err)) => {
174 let mut ex = failed_ex;
175 ex.set_error(original_err.clone());
176
177 for clause in catch_clauses {
178 let CatchClause {
179 matcher,
180 on_when,
181 steps,
182 disposition,
183 } = clause;
184 if !matcher.matches(&original_err, &ex) {
185 continue;
186 }
187 if let Some(ref on_when) = on_when
188 && !on_when(&ex)
189 {
190 continue;
191 }
192
193 let catch_result = run_pipeline(steps, ex.clone()).await;
194
195 return match catch_result {
196 Ok(ok_ex) => {
197 let prev = match disposition {
209 ExceptionDisposition::Handled => None,
210 ExceptionDisposition::Propagate => Some(original_err.clone()),
211 ExceptionDisposition::Continued => {
212 tracing::warn!(
213 "ExceptionDisposition::Continued reached doTry runtime; \
214 treating as Propagate. Should have been rejected at parse time."
215 );
216 Some(original_err.clone())
217 }
218 };
219 let mut ex = run_finally(
220 finally_steps.clone(),
221 finally_on_when.clone(),
222 ok_ex,
223 prev,
224 )
225 .await?;
226 if matches!(disposition, ExceptionDisposition::Handled) {
230 ex.handle_error();
231 }
232 match disposition {
233 ExceptionDisposition::Handled => Ok(ex),
234 _ => Err(original_err),
235 }
236 }
237 Err((catch_ex, catch_err)) => {
238 let _ex = run_finally(
241 finally_steps.clone(),
242 finally_on_when.clone(),
243 catch_ex,
244 Some(catch_err.clone()),
245 )
246 .await?;
247 Err(catch_err)
248 }
249 };
250 }
251
252 let _ex = run_finally(
254 finally_steps,
255 finally_on_when,
256 ex,
257 Some(original_err.clone()),
258 )
259 .await?;
260 Err(original_err)
261 }
262 }
263 })
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270 use camel_api::{BoxProcessor, BoxProcessorExt};
271 use std::sync::Arc;
272 use std::sync::atomic::{AtomicU32, Ordering};
273
274 fn passthrough() -> BoxProcessor {
275 BoxProcessor::from_fn(move |ex| Box::pin(async move { Ok(ex) }))
276 }
277
278 fn record_call(flag: Arc<AtomicU32>) -> BoxProcessor {
279 BoxProcessor::from_fn(move |ex| {
280 let f = flag.clone();
281 Box::pin(async move {
282 f.fetch_add(1, Ordering::SeqCst);
283 Ok(ex)
284 })
285 })
286 }
287
288 fn always_fail(err: CamelError) -> BoxProcessor {
289 BoxProcessor::from_fn(move |_ex| {
290 let e = err.clone();
291 Box::pin(async move { Err(e) })
292 })
293 }
294
295 #[tokio::test]
296 async fn happy_path_try_succeeds_finally_runs() {
297 let finally_flag = Arc::new(AtomicU32::new(0));
298 let mut svc = DoTryService::new(vec![passthrough()]);
299 svc.finally_steps = vec![record_call(finally_flag.clone())];
300
301 let mut boxed = BoxProcessor::new(svc);
302 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
303 assert!(result.is_ok());
304 assert_eq!(finally_flag.load(Ordering::SeqCst), 1);
305 }
306
307 #[tokio::test]
308 async fn catch_by_variant_handled_returns_ok() {
309 let try_step = always_fail(CamelError::ProcessorError("boom".into()));
310 let mut svc = DoTryService::new(vec![try_step]);
311 svc.catch_clauses.push(CatchClause {
312 matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
313 on_when: None,
314 steps: vec![passthrough()],
315 disposition: ExceptionDisposition::Handled,
316 });
317
318 let mut boxed = BoxProcessor::new(svc);
319 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
320 assert!(result.is_ok(), "Handled must return Ok");
321 let ex = result.unwrap();
322 assert_eq!(
323 ex.properties.get(PROPERTY_EXCEPTION_HANDLED),
324 Some(&camel_api::Value::Bool(true)),
325 "CamelExceptionHandled must be set via handle_error()"
326 );
327 }
328
329 #[tokio::test]
330 async fn catch_by_variant_propagate_runs_side_effects_and_rethrows() {
331 let original = CamelError::ProcessorError("boom".into());
332 let try_step = always_fail(original.clone());
333 let side_effect = Arc::new(AtomicU32::new(0));
334 let catch_step = record_call(side_effect.clone());
335 let mut svc = DoTryService::new(vec![try_step]);
336 svc.catch_clauses.push(CatchClause {
337 matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
338 on_when: None,
339 steps: vec![catch_step],
340 disposition: ExceptionDisposition::Propagate,
341 });
342
343 let mut boxed = BoxProcessor::new(svc);
344 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
345 assert!(result.is_err(), "Propagate must rethrow original");
346 assert!(matches!(result.unwrap_err(), CamelError::ProcessorError(_)));
347 assert_eq!(
348 side_effect.load(Ordering::SeqCst),
349 1,
350 "catch branch must have run for side-effects"
351 );
352 }
353
354 #[tokio::test]
355 async fn catch_by_predicate_matches_via_exception_kind() {
356 let try_step = always_fail(CamelError::Io("disk full".into()));
357 let predicate: FilterPredicate = Arc::new(|ex: &Exchange| {
358 ex.properties
359 .get(camel_api::exchange::PROPERTY_EXCEPTION_KIND)
360 .map(|v| matches!(v, camel_api::Value::String(s) if s == "io"))
361 .unwrap_or(false)
362 });
363 let mut svc = DoTryService::new(vec![try_step]);
364 svc.catch_clauses.push(CatchClause {
365 matcher: CatchMatcher::Predicate(predicate),
366 on_when: None,
367 steps: vec![passthrough()],
368 disposition: ExceptionDisposition::Handled,
369 });
370
371 let mut boxed = BoxProcessor::new(svc);
372 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
373 assert!(
374 result.is_ok(),
375 "Predicate matcher must catch the error and Handled must return Ok"
376 );
377 }
378
379 #[tokio::test]
380 async fn on_when_filters_clause_and_next_evaluated() {
381 let try_step = always_fail(CamelError::ProcessorError("boom".into()));
382 let first_call = Arc::new(AtomicU32::new(0));
383 let second_call = Arc::new(AtomicU32::new(0));
384
385 let mut svc = DoTryService::new(vec![try_step]);
386 svc.catch_clauses.push(CatchClause {
387 matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
388 on_when: Some(Arc::new(|_ex| false)),
389 steps: vec![record_call(first_call.clone())],
390 disposition: ExceptionDisposition::Handled,
391 });
392 svc.catch_clauses.push(CatchClause {
393 matcher: CatchMatcher::ByVariant(vec!["*".into()]),
394 on_when: None,
395 steps: vec![record_call(second_call.clone())],
396 disposition: ExceptionDisposition::Handled,
397 });
398
399 let mut boxed = BoxProcessor::new(svc);
400 let _ = boxed.ready().await.unwrap().call(Exchange::default()).await;
401 assert_eq!(first_call.load(Ordering::SeqCst), 0);
402 assert_eq!(second_call.load(Ordering::SeqCst), 1);
403 }
404
405 #[tokio::test]
406 async fn first_match_wins_subsequent_clauses_not_evaluated() {
407 let try_step = always_fail(CamelError::Io("err".into()));
408 let first_call = Arc::new(AtomicU32::new(0));
409 let second_call = Arc::new(AtomicU32::new(0));
410
411 let mut svc = DoTryService::new(vec![try_step]);
412 svc.catch_clauses.push(CatchClause {
413 matcher: CatchMatcher::ByVariant(vec!["Io".into()]),
414 on_when: None,
415 steps: vec![record_call(first_call.clone())],
416 disposition: ExceptionDisposition::Handled,
417 });
418 svc.catch_clauses.push(CatchClause {
419 matcher: CatchMatcher::ByVariant(vec!["*".into()]),
420 on_when: None,
421 steps: vec![record_call(second_call.clone())],
422 disposition: ExceptionDisposition::Handled,
423 });
424
425 let mut boxed = BoxProcessor::new(svc);
426 let _ = boxed.ready().await.unwrap().call(Exchange::default()).await;
427 assert_eq!(first_call.load(Ordering::SeqCst), 1);
428 assert_eq!(second_call.load(Ordering::SeqCst), 0);
429 }
430
431 #[tokio::test]
432 async fn no_clause_matches_propagates_original() {
433 let try_step = always_fail(CamelError::CircuitOpen("cb".into()));
434 let mut svc = DoTryService::new(vec![try_step]);
435 svc.catch_clauses.push(CatchClause {
436 matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
437 on_when: None,
438 steps: vec![passthrough()],
439 disposition: ExceptionDisposition::Handled,
440 });
441
442 let mut boxed = BoxProcessor::new(svc);
443 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
444 assert!(result.is_err());
445 assert!(matches!(result.unwrap_err(), CamelError::CircuitOpen(_)));
446 }
447
448 #[tokio::test]
449 async fn catch_branch_throws_new_error_wins() {
450 let try_step = always_fail(CamelError::ProcessorError("orig".into()));
451 let catch_step = always_fail(CamelError::Io("catch-fail".into()));
452 let mut svc = DoTryService::new(vec![try_step]);
453 svc.catch_clauses.push(CatchClause {
454 matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
455 on_when: None,
456 steps: vec![catch_step],
457 disposition: ExceptionDisposition::Handled,
458 });
459
460 let mut boxed = BoxProcessor::new(svc);
461 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
462 assert!(result.is_err());
463 assert!(matches!(result.unwrap_err(), CamelError::Io(_)));
464 }
465
466 #[tokio::test]
467 async fn finally_throws_with_no_previous_error_propagates_finally_error() {
468 let finally_step = always_fail(CamelError::Config("fin".into()));
469 let mut svc = DoTryService::new(vec![passthrough()]);
470 svc.finally_steps = vec![finally_step];
471
472 let mut boxed = BoxProcessor::new(svc);
473 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
474 assert!(result.is_err());
475 assert!(matches!(result.unwrap_err(), CamelError::Config(_)));
476 }
477
478 #[tokio::test]
479 async fn finally_throws_with_previous_error_restores_previous() {
480 let try_step = always_fail(CamelError::ProcessorError("orig".into()));
481 let finally_step = always_fail(CamelError::Config("fin".into()));
482 let mut svc = DoTryService::new(vec![try_step]);
483 svc.finally_steps = vec![finally_step];
484
485 let mut boxed = BoxProcessor::new(svc);
486 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
487 assert!(result.is_err());
488 assert!(
489 matches!(result.unwrap_err(), CamelError::ProcessorError(_)),
490 "previous error must be restored when finally throws (Camel parity)"
491 );
492 }
493
494 #[tokio::test]
495 async fn finally_on_when_false_skips_finally() {
496 let finally_call = Arc::new(AtomicU32::new(0));
497 let mut svc = DoTryService::new(vec![passthrough()]);
498 svc.finally_steps = vec![record_call(finally_call.clone())];
499 svc.finally_on_when = Some(Arc::new(|_ex| false));
500
501 let mut boxed = BoxProcessor::new(svc);
502 let _ = boxed.ready().await.unwrap().call(Exchange::default()).await;
503 assert_eq!(finally_call.load(Ordering::SeqCst), 0);
504 }
505
506 #[tokio::test]
507 async fn stale_handled_marker_cleared_on_entry() {
508 let mut ex = Exchange::default();
509 ex.set_property(PROPERTY_EXCEPTION_HANDLED, camel_api::Value::Bool(true));
510 let svc = DoTryService::new(vec![passthrough()]);
511 let mut boxed = BoxProcessor::new(svc);
512 let result = boxed.ready().await.unwrap().call(ex).await;
513 let ex = result.unwrap();
514 assert!(
515 !ex.properties.contains_key(PROPERTY_EXCEPTION_HANDLED),
516 "stale CamelExceptionHandled must be cleared on entry"
517 );
518 }
519
520 #[tokio::test]
521 async fn nested_do_try_inner_catch_does_not_leak_to_outer() {
522 let inner = {
523 let try_step = always_fail(CamelError::Io("inner".into()));
524 let mut d = DoTryService::new(vec![try_step]);
525 d.catch_clauses.push(CatchClause {
526 matcher: CatchMatcher::ByVariant(vec!["Io".into()]),
527 on_when: None,
528 steps: vec![passthrough()],
529 disposition: ExceptionDisposition::Handled,
530 });
531 BoxProcessor::new(d)
532 };
533 let mut outer = DoTryService::new(vec![inner]);
534 outer.catch_clauses.push(CatchClause {
535 matcher: CatchMatcher::ByVariant(vec!["Io".into()]),
536 on_when: None,
537 steps: vec![passthrough()],
538 disposition: ExceptionDisposition::Handled,
539 });
540
541 let mut boxed = BoxProcessor::new(outer);
542 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
543 assert!(
544 result.is_ok(),
545 "outer must see Ok because inner handled its own error"
546 );
547 }
548
549 #[tokio::test]
550 async fn catch_all_only_fires_when_no_specific_clause_matches() {
551 let try_step = always_fail(CamelError::Io("err".into()));
552 let processor_call = Arc::new(AtomicU32::new(0));
553 let catch_all_call = Arc::new(AtomicU32::new(0));
554
555 let mut svc = DoTryService::new(vec![try_step]);
556 svc.catch_clauses.push(CatchClause {
558 matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
559 on_when: None,
560 steps: vec![record_call(processor_call.clone())],
561 disposition: ExceptionDisposition::Handled,
562 });
563 svc.catch_clauses.push(CatchClause {
565 matcher: CatchMatcher::ByVariant(vec!["*".into()]),
566 on_when: None,
567 steps: vec![record_call(catch_all_call.clone())],
568 disposition: ExceptionDisposition::Handled,
569 });
570
571 let mut boxed = BoxProcessor::new(svc);
572 let _ = boxed.ready().await.unwrap().call(Exchange::default()).await;
573 assert_eq!(
574 processor_call.load(Ordering::SeqCst),
575 0,
576 "specific ProcessorError clause must not fire on Io error"
577 );
578 assert_eq!(
579 catch_all_call.load(Ordering::SeqCst),
580 1,
581 "catch-all clause must fire when no specific clause matches"
582 );
583 }
584
585 #[tokio::test]
586 async fn catch_throws_with_finally_runs_finally_and_propagates_catch_err() {
587 let try_step = always_fail(CamelError::ProcessorError("orig".into()));
588 let catch_step = always_fail(CamelError::Io("catch-fail".into()));
589 let finally_flag = Arc::new(AtomicU32::new(0));
590 let finally_step = record_call(finally_flag.clone());
591
592 let mut svc = DoTryService::new(vec![try_step]);
593 svc.catch_clauses.push(CatchClause {
594 matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
595 on_when: None,
596 steps: vec![catch_step],
597 disposition: ExceptionDisposition::Handled,
598 });
599 svc.finally_steps = vec![finally_step];
600
601 let mut boxed = BoxProcessor::new(svc);
602 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
603
604 assert!(result.is_err());
605 assert!(
606 matches!(result.unwrap_err(), CamelError::Io(_)),
607 "catch_err must propagate (not original ProcessorError)"
608 );
609 assert_eq!(
610 finally_flag.load(Ordering::SeqCst),
611 1,
612 "doFinally must run even when catch throws"
613 );
614 }
615
616 #[tokio::test]
617 async fn catch_throws_and_finally_throws_restores_catch_err() {
618 let try_step = always_fail(CamelError::ProcessorError("orig".into()));
619 let catch_step = always_fail(CamelError::Io("catch-fail".into()));
620 let finally_step = always_fail(CamelError::Config("fin-fail".into()));
621
622 let mut svc = DoTryService::new(vec![try_step]);
623 svc.catch_clauses.push(CatchClause {
624 matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
625 on_when: None,
626 steps: vec![catch_step],
627 disposition: ExceptionDisposition::Handled,
628 });
629 svc.finally_steps = vec![finally_step];
630
631 let mut boxed = BoxProcessor::new(svc);
632 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
633
634 assert!(result.is_err());
635 assert!(
636 matches!(result.unwrap_err(), CamelError::Io(_)),
637 "catch_err (Io) must be restored over finally_err (Config) per Camel parity"
638 );
639 }
640
641 #[tokio::test]
642 async fn finally_on_when_false_with_previous_error_still_propagates_original() {
643 let try_step = always_fail(CamelError::ProcessorError("orig".into()));
644 let finally_flag = Arc::new(AtomicU32::new(0));
645 let finally_step = record_call(finally_flag.clone());
646
647 let mut svc = DoTryService::new(vec![try_step]);
648 svc.finally_steps = vec![finally_step];
650 svc.finally_on_when = Some(Arc::new(|_ex| false));
651
652 let mut boxed = BoxProcessor::new(svc);
653 let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
654
655 assert!(result.is_err());
656 assert!(
657 matches!(result.unwrap_err(), CamelError::ProcessorError(_)),
658 "original error must propagate even when finally_on_when skips finally"
659 );
660 assert_eq!(
661 finally_flag.load(Ordering::SeqCst),
662 0,
663 "doFinally must NOT run when on_when returns false"
664 );
665 }
666}