use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tower::Service;
use camel_api::metrics::MetricsCollector;
use camel_api::{
BoxProcessor, CamelError, Exchange, IdentityProcessor, Message, ORIGINAL_MESSAGE_EXTENSION,
PipelineOutcome,
};
use camel_api::error_handler::{BoundaryKind, RetryOutcome, StepDisposition};
use camel_processor::{
CircuitBreakerDecision, CircuitBreakerGate, RouteErrorHandler, invoke_processor,
};
use tracing::Instrument;
use crate::lifecycle::adapters::body_coercing::wrap_if_needed;
use crate::lifecycle::adapters::step_compilers::CompiledStep;
use crate::shared::observability::adapters::TracingProcessor;
use crate::shared::observability::domain::DetailLevel;
pub(crate) use super::outcome_composition::{
BodyCoercingSegment, BoxProcessorSegment, StopSegment, compose_outcome_segment,
};
pub fn compose_pipeline(processors: Vec<CompiledStep>) -> BoxProcessor {
if processors.is_empty() {
return BoxProcessor::new(IdentityProcessor);
}
BoxProcessor::new(SequentialPipeline {
steps: processors,
handler: None,
})
}
pub fn compose_pipeline_with_handler(
processors: Vec<CompiledStep>,
handler: Option<Arc<dyn RouteErrorHandler>>,
) -> BoxProcessor {
if processors.is_empty() {
return BoxProcessor::new(IdentityProcessor);
}
BoxProcessor::new(SequentialPipeline {
steps: processors,
handler,
})
}
pub fn compose_traced_pipeline(
processors: Vec<CompiledStep>,
route_id: &str,
trace_enabled: bool,
detail_level: DetailLevel,
metrics: Option<Arc<dyn MetricsCollector>>,
handler: Option<Arc<dyn RouteErrorHandler>>,
) -> BoxProcessor {
if !trace_enabled {
return compose_pipeline_with_handler(processors, handler);
}
if processors.is_empty() {
return BoxProcessor::new(IdentityProcessor);
}
let wrapped: Vec<CompiledStep> = processors
.into_iter()
.enumerate()
.map(|(idx, step)| {
let (p, c, lc) = match step {
CompiledStep::Process {
processor,
body_contract,
lifecycle,
} => (processor, body_contract, lifecycle),
CompiledStep::Stop => return CompiledStep::Stop,
CompiledStep::Segment { .. } => return step,
};
let traced = BoxProcessor::new(TracingProcessor::new(
p,
route_id.to_string(),
idx,
detail_level.clone(),
metrics.clone(),
));
CompiledStep::Process {
processor: traced,
body_contract: c,
lifecycle: lc,
}
})
.collect();
BoxProcessor::new(TracedPipeline {
steps: wrapped,
handler,
})
}
pub fn compose_pipeline_with_contracts(
processors: Vec<CompiledStep>,
handler: Option<Arc<dyn RouteErrorHandler>>,
) -> BoxProcessor {
let wrapped: Vec<CompiledStep> = processors
.into_iter()
.map(|step| match step {
CompiledStep::Process {
processor,
body_contract,
lifecycle,
} => {
let coerced = wrap_if_needed(processor, body_contract);
CompiledStep::Process {
processor: coerced,
body_contract: None,
lifecycle,
}
}
CompiledStep::Stop => CompiledStep::Stop,
CompiledStep::Segment { .. } => step,
})
.collect();
compose_pipeline_with_handler(wrapped, handler)
}
pub(crate) fn compose_traced_pipeline_with_contracts(
processors: Vec<CompiledStep>,
route_id: &str,
trace_enabled: bool,
detail_level: DetailLevel,
metrics: Option<Arc<dyn MetricsCollector>>,
handler: Option<Arc<dyn RouteErrorHandler>>,
) -> BoxProcessor {
if !trace_enabled {
return compose_pipeline_with_contracts(processors, handler);
}
if processors.is_empty() {
return BoxProcessor::new(IdentityProcessor);
}
let wrapped: Vec<CompiledStep> = processors
.into_iter()
.enumerate()
.map(|(idx, step)| match step {
CompiledStep::Process {
processor,
body_contract,
lifecycle,
} => {
let coerced = wrap_if_needed(processor, body_contract);
let traced = BoxProcessor::new(TracingProcessor::new(
coerced,
route_id.to_string(),
idx,
detail_level.clone(),
metrics.clone(),
));
CompiledStep::Process {
processor: traced,
body_contract: None,
lifecycle,
}
}
CompiledStep::Stop => CompiledStep::Stop,
CompiledStep::Segment { .. } => step,
})
.collect();
BoxProcessor::new(TracedPipeline {
steps: wrapped,
handler,
})
}
#[derive(Clone)]
struct SequentialPipeline {
steps: Vec<CompiledStep>,
handler: Option<Arc<dyn RouteErrorHandler>>,
}
impl Service<Exchange> for SequentialPipeline {
type Response = Exchange;
type Error = CamelError;
type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.steps.first() {
Some(CompiledStep::Process { processor, .. }) => {
let mut proc = processor.clone();
match proc.poll_ready(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(_)) if self.handler.is_some() => Poll::Ready(Ok(())),
Poll::Ready(other) => Poll::Ready(other),
}
}
Some(CompiledStep::Stop) => Poll::Ready(Ok(())),
Some(CompiledStep::Segment { .. }) => Poll::Ready(Ok(())),
None => Poll::Ready(Ok(())),
}
}
fn call(&mut self, exchange: Exchange) -> Self::Future {
let steps = self.steps.clone();
let handler = self.handler.clone();
Box::pin(async move {
let outcome = run_steps(steps, exchange, handler, false).await;
outcome.into_tower_result()
})
}
}
#[derive(Clone)]
struct TracedPipeline {
steps: Vec<CompiledStep>,
handler: Option<Arc<dyn RouteErrorHandler>>,
}
impl Service<Exchange> for TracedPipeline {
type Response = Exchange;
type Error = CamelError;
type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.steps.first() {
Some(CompiledStep::Process { processor, .. }) => {
let mut proc = processor.clone();
match proc.poll_ready(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(_)) if self.handler.is_some() => Poll::Ready(Ok(())),
Poll::Ready(other) => Poll::Ready(other),
}
}
Some(CompiledStep::Stop) => Poll::Ready(Ok(())),
Some(CompiledStep::Segment { .. }) => Poll::Ready(Ok(())),
None => Poll::Ready(Ok(())),
}
}
fn call(&mut self, exchange: Exchange) -> Self::Future {
let steps = self.steps.clone();
let handler = self.handler.clone();
Box::pin(async move {
let outcome = run_steps(steps, exchange, handler, true).await;
outcome.into_tower_result()
})
}
}
pub async fn run_steps(
steps: Vec<CompiledStep>,
exchange: Exchange,
handler: Option<Arc<dyn RouteErrorHandler>>,
trace: bool,
) -> PipelineOutcome {
use camel_api::error_handler::RetryableStep;
let mut ex = exchange;
for (i, step) in steps.into_iter().enumerate() {
let (mut retryable, _body_contract): (Box<dyn RetryableStep>, _) = match step {
CompiledStep::Stop => return PipelineOutcome::Stopped(ex),
CompiledStep::Process {
processor,
body_contract,
..
} => {
let boxed: Box<dyn RetryableStep> = Box::new(processor);
(boxed, body_contract)
}
CompiledStep::Segment {
segment,
body_contract,
..
} => {
let boxed: Box<dyn RetryableStep> = Box::new(segment);
(boxed, body_contract)
}
};
let original = ex.clone();
let outcome = if trace {
invoke_with_span(&mut retryable, ex, i).await
} else {
retryable.invoke(ex).await
};
match outcome {
PipelineOutcome::Completed(next) => {
if camel_api::is_camel_stop(&next) {
return PipelineOutcome::Stopped(next);
}
ex = next;
}
PipelineOutcome::Stopped(stopped_ex) => {
return PipelineOutcome::Stopped(stopped_ex);
}
PipelineOutcome::Failed(err) => {
let Some(handler) = handler.as_ref() else {
return PipelineOutcome::Failed(err);
};
let policy = handler.match_policy(&err);
match handler
.retry_step(policy, retryable.as_mut(), original, err)
.await
{
RetryOutcome::Recovered(exchange) => {
ex = exchange;
}
RetryOutcome::Stopped(stopped_ex) => {
return PipelineOutcome::Stopped(stopped_ex);
}
RetryOutcome::Exhausted {
exchange,
error,
policy,
} => {
let disposition = if trace {
handler
.handle_step(policy, exchange, error)
.instrument(tracing::debug_span!("error_handler", step_index = i))
.await
} else {
handler.handle_step(policy, exchange, error).await
};
match disposition {
Ok(StepDisposition::Propagate(e)) => {
return PipelineOutcome::Failed(e);
}
Ok(StepDisposition::Handled(done)) => {
return PipelineOutcome::Completed(done);
}
Ok(StepDisposition::Continued(next)) => {
ex = next;
}
Err(e) => return PipelineOutcome::Failed(e),
}
}
}
}
}
}
PipelineOutcome::Completed(ex)
}
async fn invoke_with_span(
retryable: &mut Box<dyn camel_api::error_handler::RetryableStep>,
exchange: Exchange,
idx: usize,
) -> PipelineOutcome {
retryable
.invoke(exchange)
.instrument(tracing::debug_span!("pipeline_step", index = idx))
.await
}
#[derive(Clone)]
pub struct RouteChannelService {
handler: Arc<dyn RouteErrorHandler>,
security: Option<BoxProcessor>,
cb_gate: Option<CircuitBreakerGate>,
pipeline: BoxProcessor,
use_original_message: bool,
}
impl RouteChannelService {
pub fn new(
handler: Arc<dyn RouteErrorHandler>,
security: Option<BoxProcessor>,
cb_gate: Option<CircuitBreakerGate>,
pipeline: BoxProcessor,
use_original_message: bool,
) -> Self {
Self {
handler,
security,
cb_gate,
pipeline,
use_original_message,
}
}
}
impl Service<Exchange> for RouteChannelService {
type Response = Exchange;
type Error = CamelError;
type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), CamelError>> {
if let Some(ref mut sec) = self.security {
match sec.clone().poll_ready(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(_)) | Poll::Ready(Ok(())) => {}
}
}
match self.pipeline.clone().poll_ready(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(_)) | Poll::Ready(Ok(())) => {}
}
Poll::Ready(Ok(()))
}
fn call(&mut self, exchange: Exchange) -> Self::Future {
let handler = self.handler.clone();
let security = self.security.clone();
let cb_gate = self.cb_gate.clone();
let mut pipeline = self.pipeline.clone();
let use_original_message = self.use_original_message;
Box::pin(async move {
let mut ex = exchange;
if use_original_message {
let original: Arc<Message> = Arc::new(ex.input.clone());
ex.set_extension(ORIGINAL_MESSAGE_EXTENSION, original);
}
if let Some(mut sec) = security {
let original = ex.clone();
match invoke_processor(&mut sec, ex).await {
Ok(next) => ex = next,
Err(err) => {
return handler
.handle_boundary(BoundaryKind::Security, original, err)
.await;
}
}
}
if let Some(ref cb) = cb_gate {
match cb.before_call() {
CircuitBreakerDecision::Allow => { }
CircuitBreakerDecision::Fallback(mut fb) => {
let original = ex.clone();
match invoke_processor(&mut fb, ex).await {
Ok(result) => return Ok(result),
Err(err) => {
return handler
.handle_boundary(BoundaryKind::CircuitBreaker, original, err)
.await;
}
}
}
CircuitBreakerDecision::Reject(err) => {
let original = ex.clone();
return handler
.handle_boundary(BoundaryKind::CircuitBreaker, original, err)
.await;
}
}
}
let result = invoke_processor(&mut pipeline, ex).await;
if let Some(ref cb) = cb_gate {
cb.after_result(&result);
}
result
})
}
}
#[cfg(test)]
#[path = "route_compiler_tests.rs"]
mod tests;