use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use camel_api::error_handler::{
BoundaryKind, ErrorHandlerConfig, PolicyId, RetryOutcome, StepDisposition,
};
use camel_api::{
BoxProcessor, BoxProcessorExt, CamelError, CircuitBreakerConfig, Exchange, Message,
};
use camel_core::route::{CompiledStep, RouteChannelService, compose_pipeline_with_handler};
use camel_processor::error_handler::{DefaultRouteErrorHandler, RouteErrorHandler};
use camel_processor::{CircuitBreakerDecision, CircuitBreakerGate};
use tower::ServiceExt;
#[tokio::test]
async fn continued_e2e_step3_executes_after_step2_failure() {
let step1 = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
let step2 = BoxProcessor::from_fn(|_ex| {
Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
});
let step3_hit = Arc::new(AtomicBool::new(false));
let hit = step3_hit.clone();
let step3 = BoxProcessor::from_fn(move |ex| {
let hit = hit.clone();
Box::pin(async move {
hit.store(true, Ordering::SeqCst);
Ok(ex)
})
});
let config = ErrorHandlerConfig::log_only()
.on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
.continued(true)
.build();
let policies: Vec<_> = config.policies.into_iter().map(|p| (p, None)).collect();
let handler: Arc<dyn RouteErrorHandler> =
Arc::new(DefaultRouteErrorHandler::new(None, policies));
let pipeline = compose_pipeline_with_handler(
vec![
CompiledStep::Process {
processor: step1,
body_contract: None,
lifecycle: None,
},
CompiledStep::Process {
processor: step2,
body_contract: None,
lifecycle: None,
},
CompiledStep::Process {
processor: step3,
body_contract: None,
lifecycle: None,
},
],
Some(handler),
);
let exchange = Exchange::new(Message::new("test continued_e2e"));
let result = pipeline.oneshot(exchange).await;
assert!(
result.is_ok(),
"pipeline should return Ok with continued=true, got: {:?}",
result
);
assert!(
step3_hit.load(Ordering::SeqCst),
"step 3 should have executed after continued=true cleared the error"
);
}
struct ContinuedHandler;
#[async_trait::async_trait]
impl RouteErrorHandler for ContinuedHandler {
fn match_policy(&self, _: &CamelError) -> Option<PolicyId> {
Some(PolicyId(0))
}
async fn retry_step(
&self,
_: Option<PolicyId>,
_: &mut dyn camel_api::error_handler::RetryableStep,
original: Exchange,
error: CamelError,
) -> RetryOutcome {
RetryOutcome::Exhausted {
exchange: original,
error,
policy: Some(PolicyId(0)),
}
}
async fn handle_step(
&self,
_: Option<PolicyId>,
mut ex: Exchange,
_: CamelError,
) -> Result<StepDisposition, CamelError> {
ex.clear_error();
Ok(StepDisposition::Continued(ex))
}
async fn handle_boundary(
&self,
_: BoundaryKind,
ex: Exchange,
_: CamelError,
) -> Result<Exchange, CamelError> {
Ok(ex)
}
}
#[tokio::test]
async fn continued_e2e_route_channel_service_with_cb_gate() {
let step1 = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
let step2 = BoxProcessor::from_fn(|_ex| {
Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
});
let step3_hit = Arc::new(AtomicBool::new(false));
let hit = step3_hit.clone();
let step3 = BoxProcessor::from_fn(move |ex| {
let hit = hit.clone();
Box::pin(async move {
hit.store(true, Ordering::SeqCst);
Ok(ex)
})
});
let handler: Arc<dyn RouteErrorHandler> = Arc::new(ContinuedHandler);
let pipeline = compose_pipeline_with_handler(
vec![
CompiledStep::Process {
processor: step1,
body_contract: None,
lifecycle: None,
},
CompiledStep::Process {
processor: step2,
body_contract: None,
lifecycle: None,
},
CompiledStep::Process {
processor: step3,
body_contract: None,
lifecycle: None,
},
],
Some(handler.clone()),
);
let cb_gate = CircuitBreakerGate::new(CircuitBreakerConfig {
failure_threshold: 1,
open_duration: Duration::from_secs(60),
success_threshold: 1,
fallback: None,
});
let cb_gate_clone = cb_gate.clone();
let channel = RouteChannelService::new(handler.clone(), None, Some(cb_gate), pipeline, false);
let service = BoxProcessor::new(channel);
let exchange = Exchange::new(Message::new("test route_channel_e2e"));
let result = service.oneshot(exchange).await;
assert!(
result.is_ok(),
"pipeline should return Ok with continued=true, got: {:?}",
result
);
assert!(
step3_hit.load(Ordering::SeqCst),
"step 3 should have executed after continued=true cleared the error"
);
match cb_gate_clone.before_call() {
CircuitBreakerDecision::Allow => { }
_ => panic!(
"CB gate should be Allow (after_result received Ok), but got a different decision"
),
}
}