Skip to main content

camel_core/
route.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6use tower::ServiceExt;
7
8use camel_api::circuit_breaker::CircuitBreakerConfig;
9use camel_api::error_handler::ErrorHandlerConfig;
10use camel_api::{
11    AggregatorConfig, BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor,
12    MulticastConfig, SplitterConfig,
13};
14use camel_component::ConcurrencyModel;
15
16use crate::config::DetailLevel;
17use crate::tracer::TracingProcessor;
18
19/// A Route defines a message flow: from a source endpoint, through a composed
20/// Tower Service pipeline.
21pub struct Route {
22    /// The source endpoint URI.
23    pub(crate) from_uri: String,
24    /// The composed processor pipeline as a type-erased Tower Service.
25    pub(crate) pipeline: BoxProcessor,
26    /// Optional per-route concurrency model override.
27    /// When `None`, the consumer's default concurrency model is used.
28    pub(crate) concurrency: Option<ConcurrencyModel>,
29}
30
31impl Route {
32    /// Create a new route from the given source URI and processor pipeline.
33    pub fn new(from_uri: impl Into<String>, pipeline: BoxProcessor) -> Self {
34        Self {
35            from_uri: from_uri.into(),
36            pipeline,
37            concurrency: None,
38        }
39    }
40
41    /// The source endpoint URI.
42    pub fn from_uri(&self) -> &str {
43        &self.from_uri
44    }
45
46    /// Consume the route and return its pipeline.
47    pub fn into_pipeline(self) -> BoxProcessor {
48        self.pipeline
49    }
50
51    /// Set a concurrency model override for this route.
52    pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
53        self.concurrency = Some(model);
54        self
55    }
56
57    /// Get the concurrency model override, if any.
58    pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
59        self.concurrency.as_ref()
60    }
61
62    /// Consume the route, returning the pipeline and optional concurrency override.
63    pub fn into_parts(self) -> (BoxProcessor, Option<ConcurrencyModel>) {
64        (self.pipeline, self.concurrency)
65    }
66}
67
68/// An unresolved when-clause: predicate + nested steps for the sub-pipeline.
69pub struct WhenStep {
70    pub predicate: FilterPredicate,
71    pub steps: Vec<BuilderStep>,
72}
73
74pub use camel_api::declarative::{LanguageExpressionDef, ValueSourceDef};
75
76/// Declarative `when` clause resolved later by the runtime.
77#[derive(Debug)]
78pub struct DeclarativeWhenStep {
79    pub predicate: LanguageExpressionDef,
80    pub steps: Vec<BuilderStep>,
81}
82
83/// A step in an unresolved route definition.
84pub enum BuilderStep {
85    /// A pre-built Tower processor service.
86    Processor(BoxProcessor),
87    /// A destination URI — resolved at start time by CamelContext.
88    To(String),
89    /// Declarative set_header (literal or language-based value), resolved at route-add time.
90    DeclarativeSetHeader { key: String, value: ValueSourceDef },
91    /// Declarative set_body (literal or language-based value), resolved at route-add time.
92    DeclarativeSetBody { value: ValueSourceDef },
93    /// Declarative filter using a language predicate, resolved at route-add time.
94    DeclarativeFilter {
95        predicate: LanguageExpressionDef,
96        steps: Vec<BuilderStep>,
97    },
98    /// Declarative choice/when/otherwise using language predicates, resolved at route-add time.
99    DeclarativeChoice {
100        whens: Vec<DeclarativeWhenStep>,
101        otherwise: Option<Vec<BuilderStep>>,
102    },
103    /// Declarative script step evaluated by language and written to body.
104    DeclarativeScript { expression: LanguageExpressionDef },
105    /// Declarative split using a language expression, resolved at route-add time.
106    DeclarativeSplit {
107        expression: LanguageExpressionDef,
108        aggregation: camel_api::splitter::AggregationStrategy,
109        parallel: bool,
110        parallel_limit: Option<usize>,
111        stop_on_exception: bool,
112        steps: Vec<BuilderStep>,
113    },
114    /// A Splitter sub-pipeline: config + nested steps to execute per fragment.
115    Split {
116        config: SplitterConfig,
117        steps: Vec<BuilderStep>,
118    },
119    /// An Aggregator step: collects exchanges by correlation key, emits when complete.
120    Aggregate { config: AggregatorConfig },
121    /// A Filter sub-pipeline: predicate + nested steps executed only when predicate is true.
122    Filter {
123        predicate: FilterPredicate,
124        steps: Vec<BuilderStep>,
125    },
126    /// A Choice step: evaluates when-clauses in order, routes to the first match.
127    /// If no when matches, the optional otherwise branch is used.
128    Choice {
129        whens: Vec<WhenStep>,
130        otherwise: Option<Vec<BuilderStep>>,
131    },
132    /// A WireTap step: sends a clone of the exchange to a tap endpoint (fire-and-forget).
133    WireTap { uri: String },
134    /// A Multicast step: sends the same exchange to multiple destinations.
135    Multicast {
136        steps: Vec<BuilderStep>,
137        config: MulticastConfig,
138    },
139    /// Declarative log step with a language-evaluated message, resolved at route-add time.
140    DeclarativeLog {
141        level: camel_processor::LogLevel,
142        message: ValueSourceDef,
143    },
144    /// Bean invocation step — resolved at route-add time.
145    Bean { name: String, method: String },
146}
147
148impl std::fmt::Debug for BuilderStep {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        match self {
151            BuilderStep::Processor(_) => write!(f, "BuilderStep::Processor(...)"),
152            BuilderStep::To(uri) => write!(f, "BuilderStep::To({uri:?})"),
153            BuilderStep::DeclarativeSetHeader { key, .. } => {
154                write!(
155                    f,
156                    "BuilderStep::DeclarativeSetHeader {{ key: {key:?}, .. }}"
157                )
158            }
159            BuilderStep::DeclarativeSetBody { .. } => {
160                write!(f, "BuilderStep::DeclarativeSetBody {{ .. }}")
161            }
162            BuilderStep::DeclarativeFilter { steps, .. } => {
163                write!(
164                    f,
165                    "BuilderStep::DeclarativeFilter {{ steps: {steps:?}, .. }}"
166                )
167            }
168            BuilderStep::DeclarativeChoice { whens, otherwise } => {
169                write!(
170                    f,
171                    "BuilderStep::DeclarativeChoice {{ whens: {} clause(s), otherwise: {} }}",
172                    whens.len(),
173                    if otherwise.is_some() { "Some" } else { "None" }
174                )
175            }
176            BuilderStep::DeclarativeScript { expression } => write!(
177                f,
178                "BuilderStep::DeclarativeScript {{ language: {:?}, .. }}",
179                expression.language
180            ),
181            BuilderStep::DeclarativeSplit { steps, .. } => {
182                write!(
183                    f,
184                    "BuilderStep::DeclarativeSplit {{ steps: {steps:?}, .. }}"
185                )
186            }
187            BuilderStep::Split { steps, .. } => {
188                write!(f, "BuilderStep::Split {{ steps: {steps:?}, .. }}")
189            }
190            BuilderStep::Aggregate { .. } => write!(f, "BuilderStep::Aggregate {{ .. }}"),
191            BuilderStep::Filter { steps, .. } => {
192                write!(f, "BuilderStep::Filter {{ steps: {steps:?}, .. }}")
193            }
194            BuilderStep::Choice { whens, otherwise } => {
195                write!(
196                    f,
197                    "BuilderStep::Choice {{ whens: {} clause(s), otherwise: {} }}",
198                    whens.len(),
199                    if otherwise.is_some() { "Some" } else { "None" }
200                )
201            }
202            BuilderStep::WireTap { uri } => write!(f, "BuilderStep::WireTap {{ uri: {uri:?} }}"),
203            BuilderStep::Multicast { steps, .. } => {
204                write!(f, "BuilderStep::Multicast {{ steps: {steps:?}, .. }}")
205            }
206            BuilderStep::DeclarativeLog { level, .. } => {
207                write!(f, "BuilderStep::DeclarativeLog {{ level: {level:?}, .. }}")
208            }
209            BuilderStep::Bean { name, method } => {
210                write!(
211                    f,
212                    "BuilderStep::Bean {{ name: {name:?}, method: {method:?} }}"
213                )
214            }
215        }
216    }
217}
218
219/// An unresolved route definition. "to" URIs have not been resolved to producers yet.
220pub struct RouteDefinition {
221    pub(crate) from_uri: String,
222    pub(crate) steps: Vec<BuilderStep>,
223    /// Optional per-route error handler config. Takes precedence over the global one.
224    pub(crate) error_handler: Option<ErrorHandlerConfig>,
225    /// Optional circuit breaker config. Applied between error handler and step pipeline.
226    pub(crate) circuit_breaker: Option<CircuitBreakerConfig>,
227    /// User override for the consumer's concurrency model. `None` means
228    /// "use whatever the consumer declares".
229    pub(crate) concurrency: Option<ConcurrencyModel>,
230    /// Unique identifier for this route. Required.
231    pub(crate) route_id: String,
232    /// Whether this route should start automatically when the context starts.
233    pub(crate) auto_startup: bool,
234    /// Order in which routes are started. Lower values start first.
235    pub(crate) startup_order: i32,
236}
237
238impl RouteDefinition {
239    /// Create a new route definition with the required route ID.
240    pub fn new(from_uri: impl Into<String>, steps: Vec<BuilderStep>) -> Self {
241        Self {
242            from_uri: from_uri.into(),
243            steps,
244            error_handler: None,
245            circuit_breaker: None,
246            concurrency: None,
247            route_id: String::new(), // Will be set by with_route_id()
248            auto_startup: true,
249            startup_order: 1000,
250        }
251    }
252
253    /// The source endpoint URI.
254    pub fn from_uri(&self) -> &str {
255        &self.from_uri
256    }
257
258    /// The steps in this route definition.
259    pub fn steps(&self) -> &[BuilderStep] {
260        &self.steps
261    }
262
263    /// Set a per-route error handler, overriding the global one.
264    pub fn with_error_handler(mut self, config: ErrorHandlerConfig) -> Self {
265        self.error_handler = Some(config);
266        self
267    }
268
269    /// Set a circuit breaker for this route.
270    pub fn with_circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
271        self.circuit_breaker = Some(config);
272        self
273    }
274
275    /// Get the circuit breaker config, if set.
276    pub fn circuit_breaker_config(&self) -> Option<&CircuitBreakerConfig> {
277        self.circuit_breaker.as_ref()
278    }
279
280    /// User-specified concurrency override, if any.
281    pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
282        self.concurrency.as_ref()
283    }
284
285    /// Override the consumer's concurrency model for this route.
286    pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
287        self.concurrency = Some(model);
288        self
289    }
290
291    /// Get the route ID.
292    pub fn route_id(&self) -> &str {
293        &self.route_id
294    }
295
296    /// Whether this route should start automatically when the context starts.
297    pub fn auto_startup(&self) -> bool {
298        self.auto_startup
299    }
300
301    /// Order in which routes are started. Lower values start first.
302    pub fn startup_order(&self) -> i32 {
303        self.startup_order
304    }
305
306    /// Set a unique identifier for this route.
307    pub fn with_route_id(mut self, id: impl Into<String>) -> Self {
308        self.route_id = id.into();
309        self
310    }
311
312    /// Set whether this route should start automatically.
313    pub fn with_auto_startup(mut self, auto: bool) -> Self {
314        self.auto_startup = auto;
315        self
316    }
317
318    /// Set the startup order. Lower values start first.
319    pub fn with_startup_order(mut self, order: i32) -> Self {
320        self.startup_order = order;
321        self
322    }
323
324    /// Extract the metadata fields needed for introspection.
325    /// This is used by RouteController to store route info without the non-Sync steps.
326    pub fn to_info(&self) -> RouteDefinitionInfo {
327        RouteDefinitionInfo {
328            route_id: self.route_id.clone(),
329            auto_startup: self.auto_startup,
330            startup_order: self.startup_order,
331        }
332    }
333}
334
335/// Minimal route definition metadata for introspection.
336///
337/// This struct contains only the metadata fields from [`RouteDefinition`]
338/// that are needed for route lifecycle management, without the `steps` field
339/// (which contains non-Sync types and cannot be stored in a Sync struct).
340#[derive(Clone)]
341pub struct RouteDefinitionInfo {
342    route_id: String,
343    auto_startup: bool,
344    startup_order: i32,
345}
346
347impl RouteDefinitionInfo {
348    /// Get the route ID.
349    pub fn route_id(&self) -> &str {
350        &self.route_id
351    }
352
353    /// Whether this route should start automatically when the context starts.
354    pub fn auto_startup(&self) -> bool {
355        self.auto_startup
356    }
357
358    /// Order in which routes are started. Lower values start first.
359    pub fn startup_order(&self) -> i32 {
360        self.startup_order
361    }
362}
363
364/// Compose a list of BoxProcessors into a single pipeline that runs them sequentially.
365pub fn compose_pipeline(processors: Vec<BoxProcessor>) -> BoxProcessor {
366    if processors.is_empty() {
367        return BoxProcessor::new(IdentityProcessor);
368    }
369    BoxProcessor::new(SequentialPipeline { steps: processors })
370}
371
372/// Compose a list of BoxProcessors into a traced pipeline.
373///
374/// Each processor is wrapped with TracingProcessor to emit spans for observability.
375/// When tracing is disabled, falls back to plain compose_pipeline with zero overhead.
376pub fn compose_traced_pipeline(
377    processors: Vec<BoxProcessor>,
378    route_id: &str,
379    trace_enabled: bool,
380    detail_level: DetailLevel,
381) -> BoxProcessor {
382    if !trace_enabled {
383        return compose_pipeline(processors);
384    }
385
386    if processors.is_empty() {
387        return BoxProcessor::new(IdentityProcessor);
388    }
389
390    // Wrap each processor with TracingProcessor
391    let wrapped: Vec<BoxProcessor> = processors
392        .into_iter()
393        .enumerate()
394        .map(|(idx, processor)| {
395            BoxProcessor::new(TracingProcessor::new(
396                processor,
397                route_id.to_string(),
398                idx,
399                detail_level.clone(),
400            ))
401        })
402        .collect();
403
404    BoxProcessor::new(SequentialPipeline { steps: wrapped })
405}
406
407/// A service that executes a sequence of BoxProcessors in order.
408#[derive(Clone)]
409struct SequentialPipeline {
410    steps: Vec<BoxProcessor>,
411}
412
413impl Service<Exchange> for SequentialPipeline {
414    type Response = Exchange;
415    type Error = CamelError;
416    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
417
418    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
419        if let Some(first) = self.steps.first_mut() {
420            first.poll_ready(cx)
421        } else {
422            Poll::Ready(Ok(()))
423        }
424    }
425
426    fn call(&mut self, exchange: Exchange) -> Self::Future {
427        let mut steps = self.steps.clone();
428        Box::pin(async move {
429            let mut ex = exchange;
430            for step in &mut steps {
431                ex = step.ready().await?.call(ex).await?;
432            }
433            Ok(ex)
434        })
435    }
436}
437
438#[cfg(test)]
439mod tests {
440    use super::*;
441    use camel_api::BoxProcessorExt;
442    use std::sync::Arc;
443    use std::sync::atomic::{AtomicBool, Ordering};
444
445    /// A service that returns `Pending` on the first `poll_ready`, then `Ready`.
446    #[derive(Clone)]
447    struct DelayedReadyService {
448        ready: Arc<AtomicBool>,
449    }
450
451    impl Service<Exchange> for DelayedReadyService {
452        type Response = Exchange;
453        type Error = CamelError;
454        type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
455
456        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
457            if self.ready.fetch_or(true, Ordering::SeqCst) {
458                // Already marked ready (second+ call) → Ready
459                Poll::Ready(Ok(()))
460            } else {
461                // First call → Pending, schedule a wake
462                cx.waker().wake_by_ref();
463                Poll::Pending
464            }
465        }
466
467        fn call(&mut self, ex: Exchange) -> Self::Future {
468            Box::pin(async move { Ok(ex) })
469        }
470    }
471
472    #[test]
473    fn test_pipeline_poll_ready_delegates_to_first_step() {
474        let waker = futures::task::noop_waker();
475        let mut cx = Context::from_waker(&waker);
476
477        let inner = DelayedReadyService {
478            ready: Arc::new(AtomicBool::new(false)),
479        };
480        let boxed = BoxProcessor::new(inner);
481        let mut pipeline = SequentialPipeline { steps: vec![boxed] };
482
483        // First poll_ready: inner returns Pending, so pipeline must too.
484        let first = pipeline.poll_ready(&mut cx);
485        assert!(first.is_pending(), "expected Pending on first poll_ready");
486
487        // Second poll_ready: inner returns Ready, so pipeline must too.
488        let second = pipeline.poll_ready(&mut cx);
489        assert!(second.is_ready(), "expected Ready on second poll_ready");
490    }
491
492    #[test]
493    fn test_pipeline_poll_ready_with_empty_steps() {
494        let waker = futures::task::noop_waker();
495        let mut cx = Context::from_waker(&waker);
496
497        let mut pipeline = SequentialPipeline { steps: vec![] };
498
499        // Empty pipeline should be immediately ready.
500        let result = pipeline.poll_ready(&mut cx);
501        assert!(result.is_ready(), "expected Ready for empty pipeline");
502    }
503
504    // When a step in the pipeline returns Err(CamelError::Stopped), the pipeline
505    // should halt further steps and propagate Err(Stopped). The context loop is
506    // responsible for silencing Stopped (treating it as a graceful halt, not an error).
507    #[tokio::test]
508    async fn test_pipeline_stops_gracefully_on_stopped_error() {
509        use std::sync::{
510            Arc,
511            atomic::{AtomicBool, Ordering},
512        };
513
514        // A flag to detect if a step AFTER stop() was called.
515        let after_called = Arc::new(AtomicBool::new(false));
516        let after_called_clone = after_called.clone();
517
518        let stop_step = BoxProcessor::from_fn(|_ex| Box::pin(async { Err(CamelError::Stopped) }));
519        let after_step = BoxProcessor::from_fn(move |ex| {
520            after_called_clone.store(true, Ordering::SeqCst);
521            Box::pin(async move { Ok(ex) })
522        });
523
524        let mut pipeline = SequentialPipeline {
525            steps: vec![stop_step, after_step],
526        };
527
528        let ex = Exchange::new(camel_api::Message::new("hello"));
529        let result = pipeline.call(ex).await;
530
531        // Pipeline propagates Stopped — callers (context loop) are responsible for silencing it.
532        assert!(
533            matches!(result, Err(CamelError::Stopped)),
534            "expected Err(Stopped), got: {:?}",
535            result
536        );
537        // The step after stop must NOT have been called.
538        assert!(
539            !after_called.load(Ordering::SeqCst),
540            "step after stop should not be called"
541        );
542    }
543
544    #[test]
545    fn test_builder_step_multicast_variant() {
546        use camel_api::MulticastConfig;
547
548        let step = BuilderStep::Multicast {
549            steps: vec![BuilderStep::To("direct:a".into())],
550            config: MulticastConfig::new(),
551        };
552
553        assert!(matches!(step, BuilderStep::Multicast { .. }));
554    }
555
556    #[test]
557    fn test_route_definition_defaults() {
558        let def = RouteDefinition::new("direct:test", vec![]).with_route_id("test-route");
559        assert_eq!(def.route_id(), "test-route");
560        assert!(def.auto_startup());
561        assert_eq!(def.startup_order(), 1000);
562    }
563
564    #[test]
565    fn test_route_definition_builders() {
566        let def = RouteDefinition::new("direct:test", vec![])
567            .with_route_id("my-route")
568            .with_auto_startup(false)
569            .with_startup_order(50);
570        assert_eq!(def.route_id(), "my-route");
571        assert!(!def.auto_startup());
572        assert_eq!(def.startup_order(), 50);
573    }
574
575    #[test]
576    fn test_choice_builder_step_debug() {
577        use camel_api::{Exchange, FilterPredicate};
578        use std::sync::Arc;
579
580        fn always_true(_: &Exchange) -> bool {
581            true
582        }
583
584        let step = BuilderStep::Choice {
585            whens: vec![crate::route::WhenStep {
586                predicate: Arc::new(always_true) as FilterPredicate,
587                steps: vec![BuilderStep::To("mock:a".into())],
588            }],
589            otherwise: None,
590        };
591        let debug = format!("{step:?}");
592        assert!(debug.contains("Choice"));
593    }
594
595    #[tokio::test]
596    async fn test_compose_traced_pipeline_disabled() {
597        let pipeline = compose_traced_pipeline(vec![], "test-route", false, DetailLevel::Minimal);
598        // Should behave like identity
599        let ex = Exchange::new(camel_api::Message::new("hello"));
600        let result = tower::ServiceExt::oneshot(pipeline, ex).await;
601        assert!(result.is_ok());
602    }
603
604    #[tokio::test]
605    async fn test_compose_traced_pipeline_enabled() {
606        use camel_api::BoxProcessorExt;
607
608        let step = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
609        let pipeline =
610            compose_traced_pipeline(vec![step], "test-route", true, DetailLevel::Minimal);
611        let ex = Exchange::new(camel_api::Message::new("hello"));
612        let result = tower::ServiceExt::oneshot(pipeline, ex).await;
613        assert!(result.is_ok());
614    }
615}