use camel_api::error_handler::ExceptionDisposition;
use camel_api::exchange::PROPERTY_EXCEPTION_HANDLED;
use camel_api::{BoxProcessor, CamelError, Exchange, FilterPredicate};
use tower::Service;
use tower::ServiceExt;
#[derive(Clone)]
pub enum CatchMatcher {
ByVariant(Vec<String>),
Predicate(FilterPredicate),
}
impl CatchMatcher {
pub fn matches(&self, err: &CamelError, ex: &Exchange) -> bool {
match self {
CatchMatcher::ByVariant(names) => {
if names.iter().any(|n| n == "*") {
return true;
}
names.iter().any(|n| n == err.variant_name())
}
CatchMatcher::Predicate(p) => p(ex),
}
}
}
#[derive(Clone)]
pub struct CatchClause {
pub matcher: CatchMatcher,
pub on_when: Option<FilterPredicate>,
pub steps: Vec<BoxProcessor>,
pub disposition: ExceptionDisposition,
}
#[derive(Clone)]
pub struct DoTryService {
pub try_steps: Vec<BoxProcessor>,
pub catch_clauses: Vec<CatchClause>,
pub finally_steps: Vec<BoxProcessor>,
pub finally_on_when: Option<FilterPredicate>,
}
impl DoTryService {
pub fn new(try_steps: Vec<BoxProcessor>) -> Self {
Self {
try_steps,
catch_clauses: Vec::new(),
finally_steps: Vec::new(),
finally_on_when: None,
}
}
pub fn with_catch_and_finally(
try_steps: Vec<BoxProcessor>,
catch_clauses: Vec<CatchClause>,
finally_steps: Vec<BoxProcessor>,
finally_on_when: Option<FilterPredicate>,
) -> Self {
Self {
try_steps,
catch_clauses,
finally_steps,
finally_on_when,
}
}
}
async fn run_pipeline(
steps: Vec<BoxProcessor>,
mut ex: Exchange,
) -> Result<Exchange, (Exchange, CamelError)> {
for mut svc in steps {
match svc.ready().await {
Ok(ready) => {
let snapshot = ex.clone();
match ready.call(ex).await {
Ok(new_ex) => ex = new_ex,
Err(err) => return Err((snapshot, err)),
}
}
Err(err) => return Err((ex, err)),
}
}
Ok(ex)
}
async fn run_finally(
finally_steps: Vec<BoxProcessor>,
finally_on_when: Option<FilterPredicate>,
ex: Exchange,
previous_err: Option<CamelError>,
) -> Result<Exchange, CamelError> {
if finally_steps.is_empty() {
return Ok(ex);
}
if let Some(on_when) = &finally_on_when
&& !on_when(&ex)
{
return Ok(ex);
}
match run_pipeline(finally_steps, ex).await {
Ok(ex) => Ok(ex),
Err((_, finally_err)) => match previous_err {
Some(prev) => {
tracing::warn!(
finally_error = %finally_err,
previous_error = %prev,
"doFinally threw; restoring previous exception (Camel parity)"
);
Err(prev)
}
None => {
tracing::warn!(error = %finally_err, "doFinally threw");
Err(finally_err)
}
},
}
}
impl tower::Service<Exchange> for DoTryService {
type Response = Exchange;
type Error = CamelError;
type Future = std::pin::Pin<
Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
>;
fn poll_ready(
&mut self,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, mut exchange: Exchange) -> Self::Future {
exchange.properties.remove(PROPERTY_EXCEPTION_HANDLED);
let try_steps = self.try_steps.clone();
let catch_clauses = self.catch_clauses.clone();
let finally_steps = self.finally_steps.clone();
let finally_on_when = self.finally_on_when.clone();
Box::pin(async move {
let try_result = run_pipeline(try_steps, exchange).await;
match try_result {
Ok(ex) => run_finally(finally_steps, finally_on_when, ex, None).await,
Err((failed_ex, original_err)) => {
let mut ex = failed_ex;
ex.set_error(original_err.clone());
for clause in catch_clauses {
let CatchClause {
matcher,
on_when,
steps,
disposition,
} = clause;
if !matcher.matches(&original_err, &ex) {
continue;
}
if let Some(ref on_when) = on_when
&& !on_when(&ex)
{
continue;
}
let catch_result = run_pipeline(steps, ex.clone()).await;
return match catch_result {
Ok(ok_ex) => {
let prev = match disposition {
ExceptionDisposition::Handled => None,
ExceptionDisposition::Propagate => Some(original_err.clone()),
ExceptionDisposition::Continued => {
tracing::warn!(
"ExceptionDisposition::Continued reached doTry runtime; \
treating as Propagate. Should have been rejected at parse time."
);
Some(original_err.clone())
}
};
let mut ex = run_finally(
finally_steps.clone(),
finally_on_when.clone(),
ok_ex,
prev,
)
.await?;
if matches!(disposition, ExceptionDisposition::Handled) {
ex.handle_error();
}
match disposition {
ExceptionDisposition::Handled => Ok(ex),
_ => Err(original_err),
}
}
Err((catch_ex, catch_err)) => {
let _ex = run_finally(
finally_steps.clone(),
finally_on_when.clone(),
catch_ex,
Some(catch_err.clone()),
)
.await?;
Err(catch_err)
}
};
}
let _ex = run_finally(
finally_steps,
finally_on_when,
ex,
Some(original_err.clone()),
)
.await?;
Err(original_err)
}
}
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use camel_api::{BoxProcessor, BoxProcessorExt};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
fn passthrough() -> BoxProcessor {
BoxProcessor::from_fn(move |ex| Box::pin(async move { Ok(ex) }))
}
fn record_call(flag: Arc<AtomicU32>) -> BoxProcessor {
BoxProcessor::from_fn(move |ex| {
let f = flag.clone();
Box::pin(async move {
f.fetch_add(1, Ordering::SeqCst);
Ok(ex)
})
})
}
fn always_fail(err: CamelError) -> BoxProcessor {
BoxProcessor::from_fn(move |_ex| {
let e = err.clone();
Box::pin(async move { Err(e) })
})
}
#[tokio::test]
async fn happy_path_try_succeeds_finally_runs() {
let finally_flag = Arc::new(AtomicU32::new(0));
let mut svc = DoTryService::new(vec![passthrough()]);
svc.finally_steps = vec![record_call(finally_flag.clone())];
let mut boxed = BoxProcessor::new(svc);
let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
assert!(result.is_ok());
assert_eq!(finally_flag.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn catch_by_variant_handled_returns_ok() {
let try_step = always_fail(CamelError::ProcessorError("boom".into()));
let mut svc = DoTryService::new(vec![try_step]);
svc.catch_clauses.push(CatchClause {
matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
on_when: None,
steps: vec![passthrough()],
disposition: ExceptionDisposition::Handled,
});
let mut boxed = BoxProcessor::new(svc);
let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
assert!(result.is_ok(), "Handled must return Ok");
let ex = result.unwrap();
assert_eq!(
ex.properties.get(PROPERTY_EXCEPTION_HANDLED),
Some(&camel_api::Value::Bool(true)),
"CamelExceptionHandled must be set via handle_error()"
);
}
#[tokio::test]
async fn catch_by_variant_propagate_runs_side_effects_and_rethrows() {
let original = CamelError::ProcessorError("boom".into());
let try_step = always_fail(original.clone());
let side_effect = Arc::new(AtomicU32::new(0));
let catch_step = record_call(side_effect.clone());
let mut svc = DoTryService::new(vec![try_step]);
svc.catch_clauses.push(CatchClause {
matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
on_when: None,
steps: vec![catch_step],
disposition: ExceptionDisposition::Propagate,
});
let mut boxed = BoxProcessor::new(svc);
let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
assert!(result.is_err(), "Propagate must rethrow original");
assert!(matches!(result.unwrap_err(), CamelError::ProcessorError(_)));
assert_eq!(
side_effect.load(Ordering::SeqCst),
1,
"catch branch must have run for side-effects"
);
}
#[tokio::test]
async fn catch_by_predicate_matches_via_exception_kind() {
let try_step = always_fail(CamelError::Io("disk full".into()));
let predicate: FilterPredicate = Arc::new(|ex: &Exchange| {
ex.properties
.get(camel_api::exchange::PROPERTY_EXCEPTION_KIND)
.map(|v| matches!(v, camel_api::Value::String(s) if s == "io"))
.unwrap_or(false)
});
let mut svc = DoTryService::new(vec![try_step]);
svc.catch_clauses.push(CatchClause {
matcher: CatchMatcher::Predicate(predicate),
on_when: None,
steps: vec![passthrough()],
disposition: ExceptionDisposition::Handled,
});
let mut boxed = BoxProcessor::new(svc);
let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
assert!(
result.is_ok(),
"Predicate matcher must catch the error and Handled must return Ok"
);
}
#[tokio::test]
async fn on_when_filters_clause_and_next_evaluated() {
let try_step = always_fail(CamelError::ProcessorError("boom".into()));
let first_call = Arc::new(AtomicU32::new(0));
let second_call = Arc::new(AtomicU32::new(0));
let mut svc = DoTryService::new(vec![try_step]);
svc.catch_clauses.push(CatchClause {
matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
on_when: Some(Arc::new(|_ex| false)),
steps: vec![record_call(first_call.clone())],
disposition: ExceptionDisposition::Handled,
});
svc.catch_clauses.push(CatchClause {
matcher: CatchMatcher::ByVariant(vec!["*".into()]),
on_when: None,
steps: vec![record_call(second_call.clone())],
disposition: ExceptionDisposition::Handled,
});
let mut boxed = BoxProcessor::new(svc);
let _ = boxed.ready().await.unwrap().call(Exchange::default()).await;
assert_eq!(first_call.load(Ordering::SeqCst), 0);
assert_eq!(second_call.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn first_match_wins_subsequent_clauses_not_evaluated() {
let try_step = always_fail(CamelError::Io("err".into()));
let first_call = Arc::new(AtomicU32::new(0));
let second_call = Arc::new(AtomicU32::new(0));
let mut svc = DoTryService::new(vec![try_step]);
svc.catch_clauses.push(CatchClause {
matcher: CatchMatcher::ByVariant(vec!["Io".into()]),
on_when: None,
steps: vec![record_call(first_call.clone())],
disposition: ExceptionDisposition::Handled,
});
svc.catch_clauses.push(CatchClause {
matcher: CatchMatcher::ByVariant(vec!["*".into()]),
on_when: None,
steps: vec![record_call(second_call.clone())],
disposition: ExceptionDisposition::Handled,
});
let mut boxed = BoxProcessor::new(svc);
let _ = boxed.ready().await.unwrap().call(Exchange::default()).await;
assert_eq!(first_call.load(Ordering::SeqCst), 1);
assert_eq!(second_call.load(Ordering::SeqCst), 0);
}
#[tokio::test]
async fn no_clause_matches_propagates_original() {
let try_step = always_fail(CamelError::CircuitOpen("cb".into()));
let mut svc = DoTryService::new(vec![try_step]);
svc.catch_clauses.push(CatchClause {
matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
on_when: None,
steps: vec![passthrough()],
disposition: ExceptionDisposition::Handled,
});
let mut boxed = BoxProcessor::new(svc);
let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), CamelError::CircuitOpen(_)));
}
#[tokio::test]
async fn catch_branch_throws_new_error_wins() {
let try_step = always_fail(CamelError::ProcessorError("orig".into()));
let catch_step = always_fail(CamelError::Io("catch-fail".into()));
let mut svc = DoTryService::new(vec![try_step]);
svc.catch_clauses.push(CatchClause {
matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
on_when: None,
steps: vec![catch_step],
disposition: ExceptionDisposition::Handled,
});
let mut boxed = BoxProcessor::new(svc);
let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), CamelError::Io(_)));
}
#[tokio::test]
async fn finally_throws_with_no_previous_error_propagates_finally_error() {
let finally_step = always_fail(CamelError::Config("fin".into()));
let mut svc = DoTryService::new(vec![passthrough()]);
svc.finally_steps = vec![finally_step];
let mut boxed = BoxProcessor::new(svc);
let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), CamelError::Config(_)));
}
#[tokio::test]
async fn finally_throws_with_previous_error_restores_previous() {
let try_step = always_fail(CamelError::ProcessorError("orig".into()));
let finally_step = always_fail(CamelError::Config("fin".into()));
let mut svc = DoTryService::new(vec![try_step]);
svc.finally_steps = vec![finally_step];
let mut boxed = BoxProcessor::new(svc);
let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
assert!(result.is_err());
assert!(
matches!(result.unwrap_err(), CamelError::ProcessorError(_)),
"previous error must be restored when finally throws (Camel parity)"
);
}
#[tokio::test]
async fn finally_on_when_false_skips_finally() {
let finally_call = Arc::new(AtomicU32::new(0));
let mut svc = DoTryService::new(vec![passthrough()]);
svc.finally_steps = vec![record_call(finally_call.clone())];
svc.finally_on_when = Some(Arc::new(|_ex| false));
let mut boxed = BoxProcessor::new(svc);
let _ = boxed.ready().await.unwrap().call(Exchange::default()).await;
assert_eq!(finally_call.load(Ordering::SeqCst), 0);
}
#[tokio::test]
async fn stale_handled_marker_cleared_on_entry() {
let mut ex = Exchange::default();
ex.set_property(PROPERTY_EXCEPTION_HANDLED, camel_api::Value::Bool(true));
let svc = DoTryService::new(vec![passthrough()]);
let mut boxed = BoxProcessor::new(svc);
let result = boxed.ready().await.unwrap().call(ex).await;
let ex = result.unwrap();
assert!(
!ex.properties.contains_key(PROPERTY_EXCEPTION_HANDLED),
"stale CamelExceptionHandled must be cleared on entry"
);
}
#[tokio::test]
async fn nested_do_try_inner_catch_does_not_leak_to_outer() {
let inner = {
let try_step = always_fail(CamelError::Io("inner".into()));
let mut d = DoTryService::new(vec![try_step]);
d.catch_clauses.push(CatchClause {
matcher: CatchMatcher::ByVariant(vec!["Io".into()]),
on_when: None,
steps: vec![passthrough()],
disposition: ExceptionDisposition::Handled,
});
BoxProcessor::new(d)
};
let mut outer = DoTryService::new(vec![inner]);
outer.catch_clauses.push(CatchClause {
matcher: CatchMatcher::ByVariant(vec!["Io".into()]),
on_when: None,
steps: vec![passthrough()],
disposition: ExceptionDisposition::Handled,
});
let mut boxed = BoxProcessor::new(outer);
let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
assert!(
result.is_ok(),
"outer must see Ok because inner handled its own error"
);
}
#[tokio::test]
async fn catch_all_only_fires_when_no_specific_clause_matches() {
let try_step = always_fail(CamelError::Io("err".into()));
let processor_call = Arc::new(AtomicU32::new(0));
let catch_all_call = Arc::new(AtomicU32::new(0));
let mut svc = DoTryService::new(vec![try_step]);
svc.catch_clauses.push(CatchClause {
matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
on_when: None,
steps: vec![record_call(processor_call.clone())],
disposition: ExceptionDisposition::Handled,
});
svc.catch_clauses.push(CatchClause {
matcher: CatchMatcher::ByVariant(vec!["*".into()]),
on_when: None,
steps: vec![record_call(catch_all_call.clone())],
disposition: ExceptionDisposition::Handled,
});
let mut boxed = BoxProcessor::new(svc);
let _ = boxed.ready().await.unwrap().call(Exchange::default()).await;
assert_eq!(
processor_call.load(Ordering::SeqCst),
0,
"specific ProcessorError clause must not fire on Io error"
);
assert_eq!(
catch_all_call.load(Ordering::SeqCst),
1,
"catch-all clause must fire when no specific clause matches"
);
}
#[tokio::test]
async fn catch_throws_with_finally_runs_finally_and_propagates_catch_err() {
let try_step = always_fail(CamelError::ProcessorError("orig".into()));
let catch_step = always_fail(CamelError::Io("catch-fail".into()));
let finally_flag = Arc::new(AtomicU32::new(0));
let finally_step = record_call(finally_flag.clone());
let mut svc = DoTryService::new(vec![try_step]);
svc.catch_clauses.push(CatchClause {
matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
on_when: None,
steps: vec![catch_step],
disposition: ExceptionDisposition::Handled,
});
svc.finally_steps = vec![finally_step];
let mut boxed = BoxProcessor::new(svc);
let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
assert!(result.is_err());
assert!(
matches!(result.unwrap_err(), CamelError::Io(_)),
"catch_err must propagate (not original ProcessorError)"
);
assert_eq!(
finally_flag.load(Ordering::SeqCst),
1,
"doFinally must run even when catch throws"
);
}
#[tokio::test]
async fn catch_throws_and_finally_throws_restores_catch_err() {
let try_step = always_fail(CamelError::ProcessorError("orig".into()));
let catch_step = always_fail(CamelError::Io("catch-fail".into()));
let finally_step = always_fail(CamelError::Config("fin-fail".into()));
let mut svc = DoTryService::new(vec![try_step]);
svc.catch_clauses.push(CatchClause {
matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
on_when: None,
steps: vec![catch_step],
disposition: ExceptionDisposition::Handled,
});
svc.finally_steps = vec![finally_step];
let mut boxed = BoxProcessor::new(svc);
let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
assert!(result.is_err());
assert!(
matches!(result.unwrap_err(), CamelError::Io(_)),
"catch_err (Io) must be restored over finally_err (Config) per Camel parity"
);
}
#[tokio::test]
async fn finally_on_when_false_with_previous_error_still_propagates_original() {
let try_step = always_fail(CamelError::ProcessorError("orig".into()));
let finally_flag = Arc::new(AtomicU32::new(0));
let finally_step = record_call(finally_flag.clone());
let mut svc = DoTryService::new(vec![try_step]);
svc.finally_steps = vec![finally_step];
svc.finally_on_when = Some(Arc::new(|_ex| false));
let mut boxed = BoxProcessor::new(svc);
let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
assert!(result.is_err());
assert!(
matches!(result.unwrap_err(), CamelError::ProcessorError(_)),
"original error must propagate even when finally_on_when skips finally"
);
assert_eq!(
finally_flag.load(Ordering::SeqCst),
0,
"doFinally must NOT run when on_when returns false"
);
}
}