1use std::future::Future;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use tower::Service;
12
13use camel_api::metrics::MetricsCollector;
14use camel_api::{BoxProcessor, CamelError, Exchange, IdentityProcessor, PipelineOutcome};
15
16use camel_api::error_handler::{BoundaryKind, RetryOutcome, StepDisposition};
17use camel_processor::{
18 CircuitBreakerDecision, CircuitBreakerGate, RouteErrorHandler, invoke_processor,
19};
20use tracing::Instrument;
21
22use crate::lifecycle::adapters::body_coercing::wrap_if_needed;
23use crate::lifecycle::adapters::step_compilers::CompiledStep;
24use crate::shared::observability::adapters::TracingProcessor;
25use crate::shared::observability::domain::DetailLevel;
26
27pub(crate) use super::outcome_composition::{
30 BodyCoercingSegment, BoxProcessorSegment, StopSegment, compose_outcome_segment,
31};
32
33pub fn compose_pipeline(processors: Vec<CompiledStep>) -> BoxProcessor {
38 if processors.is_empty() {
39 return BoxProcessor::new(IdentityProcessor);
40 }
41 BoxProcessor::new(SequentialPipeline {
42 steps: processors,
43 handler: None,
44 })
45}
46
47pub fn compose_pipeline_with_handler(
53 processors: Vec<CompiledStep>,
54 handler: Option<Arc<dyn RouteErrorHandler>>,
55) -> BoxProcessor {
56 if processors.is_empty() {
57 return BoxProcessor::new(IdentityProcessor);
58 }
59 BoxProcessor::new(SequentialPipeline {
60 steps: processors,
61 handler,
62 })
63}
64
65pub fn compose_traced_pipeline(
70 processors: Vec<CompiledStep>,
71 route_id: &str,
72 trace_enabled: bool,
73 detail_level: DetailLevel,
74 metrics: Option<Arc<dyn MetricsCollector>>,
75 handler: Option<Arc<dyn RouteErrorHandler>>,
76) -> BoxProcessor {
77 if !trace_enabled {
78 return compose_pipeline_with_handler(processors, handler);
79 }
80
81 if processors.is_empty() {
82 return BoxProcessor::new(IdentityProcessor);
83 }
84
85 let wrapped: Vec<CompiledStep> = processors
86 .into_iter()
87 .enumerate()
88 .map(|(idx, step)| {
89 let (p, c) = match step {
90 CompiledStep::Process {
91 processor,
92 body_contract,
93 } => (processor, body_contract),
94 CompiledStep::Stop => return CompiledStep::Stop,
95 CompiledStep::Segment { .. } => return step,
96 };
97 let traced = BoxProcessor::new(TracingProcessor::new(
98 p,
99 route_id.to_string(),
100 idx,
101 detail_level.clone(),
102 metrics.clone(),
103 ));
104 CompiledStep::Process {
105 processor: traced,
106 body_contract: c,
107 }
108 })
109 .collect();
110
111 BoxProcessor::new(TracedPipeline {
112 steps: wrapped,
113 handler,
114 })
115}
116
117pub fn compose_pipeline_with_contracts(
123 processors: Vec<CompiledStep>,
124 handler: Option<Arc<dyn RouteErrorHandler>>,
125) -> BoxProcessor {
126 let wrapped: Vec<CompiledStep> = processors
127 .into_iter()
128 .map(|step| match step {
129 CompiledStep::Process {
130 processor,
131 body_contract,
132 } => {
133 let coerced = wrap_if_needed(processor, body_contract);
134 CompiledStep::Process {
135 processor: coerced,
136 body_contract: None,
137 }
138 }
139 CompiledStep::Stop => CompiledStep::Stop,
140 CompiledStep::Segment { .. } => step,
141 })
142 .collect();
143 compose_pipeline_with_handler(wrapped, handler)
144}
145
146pub(crate) fn compose_traced_pipeline_with_contracts(
151 processors: Vec<CompiledStep>,
152 route_id: &str,
153 trace_enabled: bool,
154 detail_level: DetailLevel,
155 metrics: Option<Arc<dyn MetricsCollector>>,
156 handler: Option<Arc<dyn RouteErrorHandler>>,
157) -> BoxProcessor {
158 if !trace_enabled {
159 return compose_pipeline_with_contracts(processors, handler);
160 }
161
162 if processors.is_empty() {
163 return BoxProcessor::new(IdentityProcessor);
164 }
165
166 let wrapped: Vec<CompiledStep> = processors
167 .into_iter()
168 .enumerate()
169 .map(|(idx, step)| match step {
170 CompiledStep::Process {
171 processor,
172 body_contract,
173 } => {
174 let coerced = wrap_if_needed(processor, body_contract);
175 let traced = BoxProcessor::new(TracingProcessor::new(
176 coerced,
177 route_id.to_string(),
178 idx,
179 detail_level.clone(),
180 metrics.clone(),
181 ));
182 CompiledStep::Process {
183 processor: traced,
184 body_contract: None,
185 }
186 }
187 CompiledStep::Stop => CompiledStep::Stop,
188 CompiledStep::Segment { .. } => step,
189 })
190 .collect();
191
192 BoxProcessor::new(TracedPipeline {
193 steps: wrapped,
194 handler,
195 })
196}
197
198#[derive(Clone)]
204struct SequentialPipeline {
205 steps: Vec<CompiledStep>,
206 handler: Option<Arc<dyn RouteErrorHandler>>,
207}
208
209impl Service<Exchange> for SequentialPipeline {
210 type Response = Exchange;
211 type Error = CamelError;
212 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
213
214 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
215 match self.steps.first() {
216 Some(CompiledStep::Process { processor, .. }) => {
217 let mut proc = processor.clone();
218 match proc.poll_ready(cx) {
219 Poll::Pending => Poll::Pending,
220 Poll::Ready(Err(_)) if self.handler.is_some() => Poll::Ready(Ok(())),
221 Poll::Ready(other) => Poll::Ready(other),
222 }
223 }
224 Some(CompiledStep::Stop) => Poll::Ready(Ok(())),
225 Some(CompiledStep::Segment { .. }) => Poll::Ready(Ok(())),
226 None => Poll::Ready(Ok(())),
227 }
228 }
229
230 fn call(&mut self, exchange: Exchange) -> Self::Future {
235 let steps = self.steps.clone();
236 let handler = self.handler.clone();
237 Box::pin(async move {
238 let outcome = run_steps(steps, exchange, handler, false).await;
239 outcome.into_tower_result()
240 })
241 }
242}
243
244#[derive(Clone)]
246struct TracedPipeline {
247 steps: Vec<CompiledStep>,
248 handler: Option<Arc<dyn RouteErrorHandler>>,
249}
250
251impl Service<Exchange> for TracedPipeline {
252 type Response = Exchange;
253 type Error = CamelError;
254 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
255
256 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
257 match self.steps.first() {
258 Some(CompiledStep::Process { processor, .. }) => {
259 let mut proc = processor.clone();
260 match proc.poll_ready(cx) {
261 Poll::Pending => Poll::Pending,
262 Poll::Ready(Err(_)) if self.handler.is_some() => Poll::Ready(Ok(())),
263 Poll::Ready(other) => Poll::Ready(other),
264 }
265 }
266 Some(CompiledStep::Stop) => Poll::Ready(Ok(())),
267 Some(CompiledStep::Segment { .. }) => Poll::Ready(Ok(())),
268 None => Poll::Ready(Ok(())),
269 }
270 }
271
272 fn call(&mut self, exchange: Exchange) -> Self::Future {
275 let steps = self.steps.clone();
276 let handler = self.handler.clone();
277 Box::pin(async move {
278 let outcome = run_steps(steps, exchange, handler, true).await;
279 outcome.into_tower_result()
280 })
281 }
282}
283
284pub async fn run_steps(
299 steps: Vec<CompiledStep>,
300 exchange: Exchange,
301 handler: Option<Arc<dyn RouteErrorHandler>>,
302 trace: bool,
303) -> PipelineOutcome {
304 use camel_api::error_handler::RetryableStep;
305 let mut ex = exchange;
306 for (i, step) in steps.into_iter().enumerate() {
307 let (mut retryable, _body_contract): (Box<dyn RetryableStep>, _) = match step {
308 CompiledStep::Stop => return PipelineOutcome::Stopped(ex),
309 CompiledStep::Process {
310 processor,
311 body_contract,
312 } => {
313 let boxed: Box<dyn RetryableStep> = Box::new(processor);
314 (boxed, body_contract)
315 }
316 CompiledStep::Segment {
317 segment,
318 body_contract,
319 } => {
320 let boxed: Box<dyn RetryableStep> = Box::new(segment);
321 (boxed, body_contract)
322 }
323 };
324
325 let original = ex.clone();
326 let outcome = if trace {
327 invoke_with_span(&mut retryable, ex, i).await
328 } else {
329 retryable.invoke(ex).await
330 };
331
332 match outcome {
333 PipelineOutcome::Completed(next) => {
334 ex = next;
335 }
336 PipelineOutcome::Stopped(stopped_ex) => {
337 return PipelineOutcome::Stopped(stopped_ex);
338 }
339 PipelineOutcome::Failed(err) => {
340 let Some(handler) = handler.as_ref() else {
341 return PipelineOutcome::Failed(err);
342 };
343 let policy = handler.match_policy(&err);
344 match handler
345 .retry_step(policy, retryable.as_mut(), original, err)
346 .await
347 {
348 RetryOutcome::Recovered(exchange) => {
349 ex = exchange;
350 }
351 RetryOutcome::Stopped(stopped_ex) => {
352 return PipelineOutcome::Stopped(stopped_ex);
353 }
354 RetryOutcome::Exhausted {
355 exchange,
356 error,
357 policy,
358 } => {
359 let disposition = if trace {
360 handler
361 .handle_step(policy, exchange, error)
362 .instrument(tracing::debug_span!("error_handler", step_index = i))
363 .await
364 } else {
365 handler.handle_step(policy, exchange, error).await
366 };
367 match disposition {
368 Ok(StepDisposition::Propagate(e)) => {
369 return PipelineOutcome::Failed(e);
370 }
371 Ok(StepDisposition::Handled(done)) => {
372 return PipelineOutcome::Completed(done);
373 }
374 Ok(StepDisposition::Continued(next)) => {
375 ex = next;
376 }
377 Err(e) => return PipelineOutcome::Failed(e),
378 }
379 }
380 }
381 }
382 }
383 }
384 PipelineOutcome::Completed(ex)
385}
386
387async fn invoke_with_span(
388 retryable: &mut Box<dyn camel_api::error_handler::RetryableStep>,
389 exchange: Exchange,
390 idx: usize,
391) -> PipelineOutcome {
392 retryable
393 .invoke(exchange)
394 .instrument(tracing::debug_span!("pipeline_step", index = idx))
395 .await
396}
397
398#[derive(Clone)]
405pub struct RouteChannelService {
406 handler: Arc<dyn RouteErrorHandler>,
407 security: Option<BoxProcessor>,
408 cb_gate: Option<CircuitBreakerGate>,
409 pipeline: BoxProcessor,
410}
411
412impl RouteChannelService {
413 pub fn new(
414 handler: Arc<dyn RouteErrorHandler>,
415 security: Option<BoxProcessor>,
416 cb_gate: Option<CircuitBreakerGate>,
417 pipeline: BoxProcessor,
418 ) -> Self {
419 Self {
420 handler,
421 security,
422 cb_gate,
423 pipeline,
424 }
425 }
426}
427
428impl Service<Exchange> for RouteChannelService {
429 type Response = Exchange;
430 type Error = CamelError;
431 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
432
433 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), CamelError>> {
434 if let Some(ref mut sec) = self.security {
436 match sec.clone().poll_ready(cx) {
437 Poll::Pending => return Poll::Pending,
438 Poll::Ready(Err(_)) | Poll::Ready(Ok(())) => {}
439 }
440 }
441 match self.pipeline.clone().poll_ready(cx) {
443 Poll::Pending => return Poll::Pending,
444 Poll::Ready(Err(_)) | Poll::Ready(Ok(())) => {}
445 }
446 Poll::Ready(Ok(()))
447 }
448
449 fn call(&mut self, exchange: Exchange) -> Self::Future {
450 let handler = self.handler.clone();
451 let security = self.security.clone();
452 let cb_gate = self.cb_gate.clone();
453 let mut pipeline = self.pipeline.clone();
454
455 Box::pin(async move {
456 let mut ex = exchange;
457
458 if let Some(mut sec) = security {
460 let original = ex.clone();
461 match invoke_processor(&mut sec, ex).await {
462 Ok(next) => ex = next,
463 Err(err) => {
464 return handler
465 .handle_boundary(BoundaryKind::Security, original, err)
466 .await;
467 }
468 }
469 }
470
471 if let Some(ref cb) = cb_gate {
473 match cb.before_call() {
474 CircuitBreakerDecision::Allow => { }
475 CircuitBreakerDecision::Fallback(mut fb) => {
476 let original = ex.clone();
479 match invoke_processor(&mut fb, ex).await {
480 Ok(result) => return Ok(result),
481 Err(err) => {
482 return handler
483 .handle_boundary(BoundaryKind::CircuitBreaker, original, err)
484 .await;
485 }
486 }
487 }
488 CircuitBreakerDecision::Reject(err) => {
489 let original = ex.clone();
490 return handler
491 .handle_boundary(BoundaryKind::CircuitBreaker, original, err)
492 .await;
493 }
494 }
495 }
496
497 let result = invoke_processor(&mut pipeline, ex).await;
499
500 if let Some(ref cb) = cb_gate {
502 cb.after_result(&result);
503 }
504
505 result
507 })
508 }
509}
510
511#[cfg(test)]
512#[path = "route_compiler_tests.rs"]
513mod tests;