1use std::future::Future;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use tower::Service;
12
13use camel_api::metrics::MetricsCollector;
14use camel_api::{
15 BoxProcessor, CamelError, Exchange, IdentityProcessor, Message, ORIGINAL_MESSAGE_EXTENSION,
16 PipelineOutcome,
17};
18
19use camel_api::error_handler::{BoundaryKind, RetryOutcome, StepDisposition};
20use camel_processor::{
21 CircuitBreakerDecision, CircuitBreakerGate, RouteErrorHandler, invoke_processor,
22};
23use tracing::Instrument;
24
25use crate::lifecycle::adapters::body_coercing::wrap_if_needed;
26use crate::lifecycle::adapters::step_compilers::CompiledStep;
27use crate::shared::observability::adapters::TracingProcessor;
28use crate::shared::observability::domain::DetailLevel;
29
30pub(crate) use super::outcome_composition::{
33 BodyCoercingSegment, BoxProcessorSegment, StopSegment, compose_outcome_segment,
34};
35
36pub fn compose_pipeline(processors: Vec<CompiledStep>) -> BoxProcessor {
41 if processors.is_empty() {
42 return BoxProcessor::new(IdentityProcessor);
43 }
44 BoxProcessor::new(SequentialPipeline {
45 steps: processors,
46 handler: None,
47 })
48}
49
50pub fn compose_pipeline_with_handler(
56 processors: Vec<CompiledStep>,
57 handler: Option<Arc<dyn RouteErrorHandler>>,
58) -> BoxProcessor {
59 if processors.is_empty() {
60 return BoxProcessor::new(IdentityProcessor);
61 }
62 BoxProcessor::new(SequentialPipeline {
63 steps: processors,
64 handler,
65 })
66}
67
68pub fn compose_traced_pipeline(
73 processors: Vec<CompiledStep>,
74 route_id: &str,
75 trace_enabled: bool,
76 detail_level: DetailLevel,
77 metrics: Option<Arc<dyn MetricsCollector>>,
78 handler: Option<Arc<dyn RouteErrorHandler>>,
79) -> BoxProcessor {
80 if !trace_enabled {
81 return compose_pipeline_with_handler(processors, handler);
82 }
83
84 if processors.is_empty() {
85 return BoxProcessor::new(IdentityProcessor);
86 }
87
88 let wrapped: Vec<CompiledStep> = processors
89 .into_iter()
90 .enumerate()
91 .map(|(idx, step)| {
92 let (p, c, lc) = match step {
93 CompiledStep::Process {
94 processor,
95 body_contract,
96 lifecycle,
97 } => (processor, body_contract, lifecycle),
98 CompiledStep::Stop => return CompiledStep::Stop,
99 CompiledStep::Segment { .. } => return step,
100 };
101 let traced = BoxProcessor::new(TracingProcessor::new(
102 p,
103 route_id.to_string(),
104 idx,
105 detail_level.clone(),
106 metrics.clone(),
107 ));
108 CompiledStep::Process {
109 processor: traced,
110 body_contract: c,
111 lifecycle: lc,
112 }
113 })
114 .collect();
115
116 BoxProcessor::new(TracedPipeline {
117 steps: wrapped,
118 handler,
119 })
120}
121
122pub fn compose_pipeline_with_contracts(
128 processors: Vec<CompiledStep>,
129 handler: Option<Arc<dyn RouteErrorHandler>>,
130) -> BoxProcessor {
131 let wrapped: Vec<CompiledStep> = processors
132 .into_iter()
133 .map(|step| match step {
134 CompiledStep::Process {
135 processor,
136 body_contract,
137 lifecycle,
138 } => {
139 let coerced = wrap_if_needed(processor, body_contract);
140 CompiledStep::Process {
141 processor: coerced,
142 body_contract: None,
143 lifecycle,
144 }
145 }
146 CompiledStep::Stop => CompiledStep::Stop,
147 CompiledStep::Segment { .. } => step,
148 })
149 .collect();
150 compose_pipeline_with_handler(wrapped, handler)
151}
152
153pub(crate) fn compose_traced_pipeline_with_contracts(
158 processors: Vec<CompiledStep>,
159 route_id: &str,
160 trace_enabled: bool,
161 detail_level: DetailLevel,
162 metrics: Option<Arc<dyn MetricsCollector>>,
163 handler: Option<Arc<dyn RouteErrorHandler>>,
164) -> BoxProcessor {
165 if !trace_enabled {
166 return compose_pipeline_with_contracts(processors, handler);
167 }
168
169 if processors.is_empty() {
170 return BoxProcessor::new(IdentityProcessor);
171 }
172
173 let wrapped: Vec<CompiledStep> = processors
174 .into_iter()
175 .enumerate()
176 .map(|(idx, step)| match step {
177 CompiledStep::Process {
178 processor,
179 body_contract,
180 lifecycle,
181 } => {
182 let coerced = wrap_if_needed(processor, body_contract);
183 let traced = BoxProcessor::new(TracingProcessor::new(
184 coerced,
185 route_id.to_string(),
186 idx,
187 detail_level.clone(),
188 metrics.clone(),
189 ));
190 CompiledStep::Process {
191 processor: traced,
192 body_contract: None,
193 lifecycle,
194 }
195 }
196 CompiledStep::Stop => CompiledStep::Stop,
197 CompiledStep::Segment { .. } => step,
198 })
199 .collect();
200
201 BoxProcessor::new(TracedPipeline {
202 steps: wrapped,
203 handler,
204 })
205}
206
207#[derive(Clone)]
213struct SequentialPipeline {
214 steps: Vec<CompiledStep>,
215 handler: Option<Arc<dyn RouteErrorHandler>>,
216}
217
218impl Service<Exchange> for SequentialPipeline {
219 type Response = Exchange;
220 type Error = CamelError;
221 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
222
223 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
224 match self.steps.first() {
225 Some(CompiledStep::Process { processor, .. }) => {
226 let mut proc = processor.clone();
227 match proc.poll_ready(cx) {
228 Poll::Pending => Poll::Pending,
229 Poll::Ready(Err(_)) if self.handler.is_some() => Poll::Ready(Ok(())),
230 Poll::Ready(other) => Poll::Ready(other),
231 }
232 }
233 Some(CompiledStep::Stop) => Poll::Ready(Ok(())),
234 Some(CompiledStep::Segment { .. }) => Poll::Ready(Ok(())),
235 None => Poll::Ready(Ok(())),
236 }
237 }
238
239 fn call(&mut self, exchange: Exchange) -> Self::Future {
244 let steps = self.steps.clone();
245 let handler = self.handler.clone();
246 Box::pin(async move {
247 let outcome = run_steps(steps, exchange, handler, false).await;
248 outcome.into_tower_result()
249 })
250 }
251}
252
253#[derive(Clone)]
255struct TracedPipeline {
256 steps: Vec<CompiledStep>,
257 handler: Option<Arc<dyn RouteErrorHandler>>,
258}
259
260impl Service<Exchange> for TracedPipeline {
261 type Response = Exchange;
262 type Error = CamelError;
263 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
264
265 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
266 match self.steps.first() {
267 Some(CompiledStep::Process { processor, .. }) => {
268 let mut proc = processor.clone();
269 match proc.poll_ready(cx) {
270 Poll::Pending => Poll::Pending,
271 Poll::Ready(Err(_)) if self.handler.is_some() => Poll::Ready(Ok(())),
272 Poll::Ready(other) => Poll::Ready(other),
273 }
274 }
275 Some(CompiledStep::Stop) => Poll::Ready(Ok(())),
276 Some(CompiledStep::Segment { .. }) => Poll::Ready(Ok(())),
277 None => Poll::Ready(Ok(())),
278 }
279 }
280
281 fn call(&mut self, exchange: Exchange) -> Self::Future {
284 let steps = self.steps.clone();
285 let handler = self.handler.clone();
286 Box::pin(async move {
287 let outcome = run_steps(steps, exchange, handler, true).await;
288 outcome.into_tower_result()
289 })
290 }
291}
292
293pub async fn run_steps(
308 steps: Vec<CompiledStep>,
309 exchange: Exchange,
310 handler: Option<Arc<dyn RouteErrorHandler>>,
311 trace: bool,
312) -> PipelineOutcome {
313 use camel_api::error_handler::RetryableStep;
314 let mut ex = exchange;
315 for (i, step) in steps.into_iter().enumerate() {
316 let (mut retryable, _body_contract): (Box<dyn RetryableStep>, _) = match step {
317 CompiledStep::Stop => return PipelineOutcome::Stopped(ex),
318 CompiledStep::Process {
319 processor,
320 body_contract,
321 ..
322 } => {
323 let boxed: Box<dyn RetryableStep> = Box::new(processor);
324 (boxed, body_contract)
325 }
326 CompiledStep::Segment {
327 segment,
328 body_contract,
329 ..
330 } => {
331 let boxed: Box<dyn RetryableStep> = Box::new(segment);
332 (boxed, body_contract)
333 }
334 };
335
336 let original = ex.clone();
337 let outcome = if trace {
338 invoke_with_span(&mut retryable, ex, i).await
339 } else {
340 retryable.invoke(ex).await
341 };
342
343 match outcome {
344 PipelineOutcome::Completed(next) => {
345 if camel_api::is_camel_stop(&next) {
346 return PipelineOutcome::Stopped(next);
347 }
348 ex = next;
349 }
350 PipelineOutcome::Stopped(stopped_ex) => {
351 return PipelineOutcome::Stopped(stopped_ex);
352 }
353 PipelineOutcome::Failed(err) => {
354 let Some(handler) = handler.as_ref() else {
355 return PipelineOutcome::Failed(err);
356 };
357 let policy = handler.match_policy(&err);
358 match handler
359 .retry_step(policy, retryable.as_mut(), original, err)
360 .await
361 {
362 RetryOutcome::Recovered(exchange) => {
363 ex = exchange;
364 }
365 RetryOutcome::Stopped(stopped_ex) => {
366 return PipelineOutcome::Stopped(stopped_ex);
367 }
368 RetryOutcome::Exhausted {
369 exchange,
370 error,
371 policy,
372 } => {
373 let disposition = if trace {
374 handler
375 .handle_step(policy, exchange, error)
376 .instrument(tracing::debug_span!("error_handler", step_index = i))
377 .await
378 } else {
379 handler.handle_step(policy, exchange, error).await
380 };
381 match disposition {
382 Ok(StepDisposition::Propagate(e)) => {
383 return PipelineOutcome::Failed(e);
384 }
385 Ok(StepDisposition::Handled(done)) => {
386 return PipelineOutcome::Completed(done);
387 }
388 Ok(StepDisposition::Continued(next)) => {
389 ex = next;
390 }
391 Err(e) => return PipelineOutcome::Failed(e),
392 }
393 }
394 }
395 }
396 }
397 }
398 PipelineOutcome::Completed(ex)
399}
400
401async fn invoke_with_span(
402 retryable: &mut Box<dyn camel_api::error_handler::RetryableStep>,
403 exchange: Exchange,
404 idx: usize,
405) -> PipelineOutcome {
406 retryable
407 .invoke(exchange)
408 .instrument(tracing::debug_span!("pipeline_step", index = idx))
409 .await
410}
411
412#[derive(Clone)]
419pub struct RouteChannelService {
420 handler: Arc<dyn RouteErrorHandler>,
421 security: Option<BoxProcessor>,
422 cb_gate: Option<CircuitBreakerGate>,
423 pipeline: BoxProcessor,
424 use_original_message: bool,
427}
428
429impl RouteChannelService {
430 pub fn new(
431 handler: Arc<dyn RouteErrorHandler>,
432 security: Option<BoxProcessor>,
433 cb_gate: Option<CircuitBreakerGate>,
434 pipeline: BoxProcessor,
435 use_original_message: bool,
436 ) -> Self {
437 Self {
438 handler,
439 security,
440 cb_gate,
441 pipeline,
442 use_original_message,
443 }
444 }
445}
446
447impl Service<Exchange> for RouteChannelService {
448 type Response = Exchange;
449 type Error = CamelError;
450 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
451
452 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), CamelError>> {
453 if let Some(ref mut sec) = self.security {
455 match sec.clone().poll_ready(cx) {
456 Poll::Pending => return Poll::Pending,
457 Poll::Ready(Err(_)) | Poll::Ready(Ok(())) => {}
458 }
459 }
460 match self.pipeline.clone().poll_ready(cx) {
462 Poll::Pending => return Poll::Pending,
463 Poll::Ready(Err(_)) | Poll::Ready(Ok(())) => {}
464 }
465 Poll::Ready(Ok(()))
466 }
467
468 fn call(&mut self, exchange: Exchange) -> Self::Future {
469 let handler = self.handler.clone();
470 let security = self.security.clone();
471 let cb_gate = self.cb_gate.clone();
472 let mut pipeline = self.pipeline.clone();
473 let use_original_message = self.use_original_message;
474
475 Box::pin(async move {
476 let mut ex = exchange;
477
478 if use_original_message {
482 let original: Arc<Message> = Arc::new(ex.input.clone());
483 ex.set_extension(ORIGINAL_MESSAGE_EXTENSION, original);
484 }
485
486 if let Some(mut sec) = security {
488 let original = ex.clone();
489 match invoke_processor(&mut sec, ex).await {
490 Ok(next) => ex = next,
491 Err(err) => {
492 return handler
493 .handle_boundary(BoundaryKind::Security, original, err)
494 .await;
495 }
496 }
497 }
498
499 if let Some(ref cb) = cb_gate {
501 match cb.before_call() {
502 CircuitBreakerDecision::Allow => { }
503 CircuitBreakerDecision::Fallback(mut fb) => {
504 let original = ex.clone();
507 match invoke_processor(&mut fb, ex).await {
508 Ok(result) => return Ok(result),
509 Err(err) => {
510 return handler
511 .handle_boundary(BoundaryKind::CircuitBreaker, original, err)
512 .await;
513 }
514 }
515 }
516 CircuitBreakerDecision::Reject(err) => {
517 let original = ex.clone();
518 return handler
519 .handle_boundary(BoundaryKind::CircuitBreaker, original, err)
520 .await;
521 }
522 }
523 }
524
525 let result = invoke_processor(&mut pipeline, ex).await;
527
528 if let Some(ref cb) = cb_gate {
530 cb.after_result(&result);
531 }
532
533 result
535 })
536 }
537}
538
539#[cfg(test)]
540#[path = "route_compiler_tests.rs"]
541mod tests;