Skip to main content

camel_core/lifecycle/application/
route_definition.rs

1// lifecycle/application/route_definition.rs
2// Route definition and builder-step types. Route (compiled artifact) lives in adapters.
3
4use camel_api::UnitOfWorkConfig;
5use camel_api::circuit_breaker::CircuitBreakerConfig;
6use camel_api::error_handler::ErrorHandlerConfig;
7use camel_api::{AggregatorConfig, BoxProcessor, FilterPredicate, MulticastConfig, SplitterConfig};
8use camel_component_api::ConcurrencyModel;
9
10/// An unresolved when-clause: predicate + nested steps for the sub-pipeline.
11pub struct WhenStep {
12    pub predicate: FilterPredicate,
13    pub steps: Vec<BuilderStep>,
14}
15
16pub use camel_api::declarative::{LanguageExpressionDef, ValueSourceDef};
17
18/// Declarative `when` clause resolved later by the runtime.
19#[derive(Debug)]
20pub struct DeclarativeWhenStep {
21    pub predicate: LanguageExpressionDef,
22    pub steps: Vec<BuilderStep>,
23}
24
25/// A step in an unresolved route definition.
26pub enum BuilderStep {
27    /// A pre-built Tower processor service.
28    Processor(BoxProcessor),
29    /// A destination URI — resolved at start time by CamelContext.
30    To(String),
31    /// A stop step that halts processing immediately.
32    Stop,
33    /// A static log step.
34    Log {
35        level: camel_processor::LogLevel,
36        message: String,
37    },
38    /// Declarative set_header (literal or language-based value), resolved at route-add time.
39    DeclarativeSetHeader {
40        key: String,
41        value: ValueSourceDef,
42    },
43    /// Declarative set_body (literal or language-based value), resolved at route-add time.
44    DeclarativeSetBody {
45        value: ValueSourceDef,
46    },
47    /// Declarative filter using a language predicate, resolved at route-add time.
48    DeclarativeFilter {
49        predicate: LanguageExpressionDef,
50        steps: Vec<BuilderStep>,
51    },
52    /// Declarative choice/when/otherwise using language predicates, resolved at route-add time.
53    DeclarativeChoice {
54        whens: Vec<DeclarativeWhenStep>,
55        otherwise: Option<Vec<BuilderStep>>,
56    },
57    /// Declarative script step evaluated by language and written to body.
58    DeclarativeScript {
59        expression: LanguageExpressionDef,
60    },
61    /// Declarative split using a language expression, resolved at route-add time.
62    DeclarativeSplit {
63        expression: LanguageExpressionDef,
64        aggregation: camel_api::splitter::AggregationStrategy,
65        parallel: bool,
66        parallel_limit: Option<usize>,
67        stop_on_exception: bool,
68        steps: Vec<BuilderStep>,
69    },
70    DeclarativeDynamicRouter {
71        expression: LanguageExpressionDef,
72        uri_delimiter: String,
73        cache_size: i32,
74        ignore_invalid_endpoints: bool,
75        max_iterations: usize,
76    },
77    DeclarativeRoutingSlip {
78        expression: LanguageExpressionDef,
79        uri_delimiter: String,
80        cache_size: i32,
81        ignore_invalid_endpoints: bool,
82    },
83    /// A Splitter sub-pipeline: config + nested steps to execute per fragment.
84    Split {
85        config: SplitterConfig,
86        steps: Vec<BuilderStep>,
87    },
88    /// An Aggregator step: collects exchanges by correlation key, emits when complete.
89    Aggregate {
90        config: AggregatorConfig,
91    },
92    /// A Filter sub-pipeline: predicate + nested steps executed only when predicate is true.
93    Filter {
94        predicate: FilterPredicate,
95        steps: Vec<BuilderStep>,
96    },
97    /// A Choice step: evaluates when-clauses in order, routes to the first match.
98    /// If no when matches, the optional otherwise branch is used.
99    Choice {
100        whens: Vec<WhenStep>,
101        otherwise: Option<Vec<BuilderStep>>,
102    },
103    /// A WireTap step: sends a clone of the exchange to a tap endpoint (fire-and-forget).
104    WireTap {
105        uri: String,
106    },
107    /// A Multicast step: sends the same exchange to multiple destinations.
108    Multicast {
109        steps: Vec<BuilderStep>,
110        config: MulticastConfig,
111    },
112    /// Declarative log step with a language-evaluated message, resolved at route-add time.
113    DeclarativeLog {
114        level: camel_processor::LogLevel,
115        message: ValueSourceDef,
116    },
117    /// Bean invocation step — resolved at route-add time.
118    Bean {
119        name: String,
120        method: String,
121    },
122    /// Script step: executes a script that can mutate the exchange.
123    /// The script has access to `headers`, `properties`, and `body`.
124    Script {
125        language: String,
126        script: String,
127    },
128    /// Throttle step: rate limiting with configurable behavior when limit exceeded.
129    Throttle {
130        config: camel_api::ThrottlerConfig,
131        steps: Vec<BuilderStep>,
132    },
133    /// LoadBalance step: distributes exchanges across multiple endpoints using a strategy.
134    LoadBalance {
135        config: camel_api::LoadBalancerConfig,
136        steps: Vec<BuilderStep>,
137    },
138    /// DynamicRouter step: routes exchanges dynamically based on expression evaluation.
139    DynamicRouter {
140        config: camel_api::DynamicRouterConfig,
141    },
142    RoutingSlip {
143        config: camel_api::RoutingSlipConfig,
144    },
145    RecipientList {
146        config: camel_api::recipient_list::RecipientListConfig,
147    },
148    DeclarativeRecipientList {
149        expression: LanguageExpressionDef,
150        delimiter: String,
151        parallel: bool,
152        parallel_limit: Option<usize>,
153        stop_on_exception: bool,
154        aggregation: String,
155    },
156    Delay {
157        config: camel_api::DelayConfig,
158    },
159}
160
161impl std::fmt::Debug for BuilderStep {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        match self {
164            BuilderStep::Processor(_) => write!(f, "BuilderStep::Processor(...)"),
165            BuilderStep::To(uri) => write!(f, "BuilderStep::To({uri:?})"),
166            BuilderStep::Stop => write!(f, "BuilderStep::Stop"),
167            BuilderStep::Log { level, message } => write!(
168                f,
169                "BuilderStep::Log {{ level: {level:?}, message: {message:?} }}"
170            ),
171            BuilderStep::DeclarativeSetHeader { key, .. } => {
172                write!(
173                    f,
174                    "BuilderStep::DeclarativeSetHeader {{ key: {key:?}, .. }}"
175                )
176            }
177            BuilderStep::DeclarativeSetBody { .. } => {
178                write!(f, "BuilderStep::DeclarativeSetBody {{ .. }}")
179            }
180            BuilderStep::DeclarativeFilter { steps, .. } => {
181                write!(
182                    f,
183                    "BuilderStep::DeclarativeFilter {{ steps: {steps:?}, .. }}"
184                )
185            }
186            BuilderStep::DeclarativeChoice { whens, otherwise } => {
187                write!(
188                    f,
189                    "BuilderStep::DeclarativeChoice {{ whens: {} clause(s), otherwise: {} }}",
190                    whens.len(),
191                    if otherwise.is_some() { "Some" } else { "None" }
192                )
193            }
194            BuilderStep::DeclarativeScript { expression } => write!(
195                f,
196                "BuilderStep::DeclarativeScript {{ language: {:?}, .. }}",
197                expression.language
198            ),
199            BuilderStep::DeclarativeSplit { steps, .. } => {
200                write!(
201                    f,
202                    "BuilderStep::DeclarativeSplit {{ steps: {steps:?}, .. }}"
203                )
204            }
205            BuilderStep::DeclarativeDynamicRouter { expression, .. } => write!(
206                f,
207                "BuilderStep::DeclarativeDynamicRouter {{ language: {:?}, .. }}",
208                expression.language
209            ),
210            BuilderStep::DeclarativeRoutingSlip { expression, .. } => write!(
211                f,
212                "BuilderStep::DeclarativeRoutingSlip {{ language: {:?}, .. }}",
213                expression.language
214            ),
215            BuilderStep::Split { steps, .. } => {
216                write!(f, "BuilderStep::Split {{ steps: {steps:?}, .. }}")
217            }
218            BuilderStep::Aggregate { .. } => write!(f, "BuilderStep::Aggregate {{ .. }}"),
219            BuilderStep::Filter { steps, .. } => {
220                write!(f, "BuilderStep::Filter {{ steps: {steps:?}, .. }}")
221            }
222            BuilderStep::Choice { whens, otherwise } => {
223                write!(
224                    f,
225                    "BuilderStep::Choice {{ whens: {} clause(s), otherwise: {} }}",
226                    whens.len(),
227                    if otherwise.is_some() { "Some" } else { "None" }
228                )
229            }
230            BuilderStep::WireTap { uri } => write!(f, "BuilderStep::WireTap {{ uri: {uri:?} }}"),
231            BuilderStep::Multicast { steps, .. } => {
232                write!(f, "BuilderStep::Multicast {{ steps: {steps:?}, .. }}")
233            }
234            BuilderStep::DeclarativeLog { level, .. } => {
235                write!(f, "BuilderStep::DeclarativeLog {{ level: {level:?}, .. }}")
236            }
237            BuilderStep::Bean { name, method } => {
238                write!(
239                    f,
240                    "BuilderStep::Bean {{ name: {name:?}, method: {method:?} }}"
241                )
242            }
243            BuilderStep::Script { language, .. } => {
244                write!(f, "BuilderStep::Script {{ language: {language:?}, .. }}")
245            }
246            BuilderStep::Throttle { steps, .. } => {
247                write!(f, "BuilderStep::Throttle {{ steps: {steps:?}, .. }}")
248            }
249            BuilderStep::LoadBalance { steps, .. } => {
250                write!(f, "BuilderStep::LoadBalance {{ steps: {steps:?}, .. }}")
251            }
252            BuilderStep::DynamicRouter { .. } => {
253                write!(f, "BuilderStep::DynamicRouter {{ .. }}")
254            }
255            BuilderStep::RoutingSlip { .. } => {
256                write!(f, "BuilderStep::RoutingSlip {{ .. }}")
257            }
258            BuilderStep::RecipientList { .. } => {
259                write!(f, "BuilderStep::RecipientList {{ .. }}")
260            }
261            BuilderStep::DeclarativeRecipientList {
262                expression,
263                aggregation,
264                ..
265            } => write!(
266                f,
267                "BuilderStep::DeclarativeRecipientList {{ language: {:?}, aggregation: {:?}, .. }}",
268                expression.language, aggregation
269            ),
270            BuilderStep::Delay { config } => {
271                write!(f, "BuilderStep::Delay {{ config: {:?} }}", config)
272            }
273        }
274    }
275}
276
277/// An unresolved route definition. "to" URIs have not been resolved to producers yet.
278pub struct RouteDefinition {
279    pub(crate) from_uri: String,
280    pub(crate) steps: Vec<BuilderStep>,
281    /// Optional per-route error handler config. Takes precedence over the global one.
282    pub(crate) error_handler: Option<ErrorHandlerConfig>,
283    /// Optional circuit breaker config. Applied between error handler and step pipeline.
284    pub(crate) circuit_breaker: Option<CircuitBreakerConfig>,
285    /// Optional Unit of Work config for in-flight tracking and completion hooks.
286    pub(crate) unit_of_work: Option<UnitOfWorkConfig>,
287    /// User override for the consumer's concurrency model. `None` means
288    /// "use whatever the consumer declares".
289    pub(crate) concurrency: Option<ConcurrencyModel>,
290    /// Unique identifier for this route. Required.
291    pub(crate) route_id: String,
292    /// Whether this route should start automatically when the context starts.
293    pub(crate) auto_startup: bool,
294    /// Order in which routes are started. Lower values start first.
295    pub(crate) startup_order: i32,
296    pub(crate) source_hash: Option<u64>,
297}
298
299impl RouteDefinition {
300    /// Create a new route definition with the required route ID.
301    pub fn new(from_uri: impl Into<String>, steps: Vec<BuilderStep>) -> Self {
302        Self {
303            from_uri: from_uri.into(),
304            steps,
305            error_handler: None,
306            circuit_breaker: None,
307            unit_of_work: None,
308            concurrency: None,
309            route_id: String::new(), // Will be set by with_route_id()
310            auto_startup: true,
311            startup_order: 1000,
312            source_hash: None,
313        }
314    }
315
316    /// The source endpoint URI.
317    pub fn from_uri(&self) -> &str {
318        &self.from_uri
319    }
320
321    /// The steps in this route definition.
322    pub fn steps(&self) -> &[BuilderStep] {
323        &self.steps
324    }
325
326    /// Set a per-route error handler, overriding the global one.
327    pub fn with_error_handler(mut self, config: ErrorHandlerConfig) -> Self {
328        self.error_handler = Some(config);
329        self
330    }
331
332    /// Get the route-level error handler config, if set.
333    pub fn error_handler_config(&self) -> Option<&ErrorHandlerConfig> {
334        self.error_handler.as_ref()
335    }
336
337    /// Set a circuit breaker for this route.
338    pub fn with_circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
339        self.circuit_breaker = Some(config);
340        self
341    }
342
343    /// Set a unit of work config for this route.
344    pub fn with_unit_of_work(mut self, config: UnitOfWorkConfig) -> Self {
345        self.unit_of_work = Some(config);
346        self
347    }
348
349    /// Get the unit of work config, if set.
350    pub fn unit_of_work_config(&self) -> Option<&UnitOfWorkConfig> {
351        self.unit_of_work.as_ref()
352    }
353
354    /// Get the circuit breaker config, if set.
355    pub fn circuit_breaker_config(&self) -> Option<&CircuitBreakerConfig> {
356        self.circuit_breaker.as_ref()
357    }
358
359    /// User-specified concurrency override, if any.
360    pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
361        self.concurrency.as_ref()
362    }
363
364    /// Override the consumer's concurrency model for this route.
365    pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
366        self.concurrency = Some(model);
367        self
368    }
369
370    /// Get the route ID.
371    pub fn route_id(&self) -> &str {
372        &self.route_id
373    }
374
375    /// Whether this route should start automatically when the context starts.
376    pub fn auto_startup(&self) -> bool {
377        self.auto_startup
378    }
379
380    /// Order in which routes are started. Lower values start first.
381    pub fn startup_order(&self) -> i32 {
382        self.startup_order
383    }
384
385    /// Set a unique identifier for this route.
386    pub fn with_route_id(mut self, id: impl Into<String>) -> Self {
387        self.route_id = id.into();
388        self
389    }
390
391    /// Set whether this route should start automatically.
392    pub fn with_auto_startup(mut self, auto: bool) -> Self {
393        self.auto_startup = auto;
394        self
395    }
396
397    /// Set the startup order. Lower values start first.
398    pub fn with_startup_order(mut self, order: i32) -> Self {
399        self.startup_order = order;
400        self
401    }
402
403    pub fn with_source_hash(mut self, hash: u64) -> Self {
404        self.source_hash = Some(hash);
405        self
406    }
407
408    pub fn source_hash(&self) -> Option<u64> {
409        self.source_hash
410    }
411
412    /// Extract the metadata fields needed for introspection.
413    /// This is used by RouteController to store route info without the non-Sync steps.
414    pub fn to_info(&self) -> RouteDefinitionInfo {
415        RouteDefinitionInfo {
416            route_id: self.route_id.clone(),
417            auto_startup: self.auto_startup,
418            startup_order: self.startup_order,
419            source_hash: self.source_hash,
420        }
421    }
422}
423
424/// Minimal route definition metadata for introspection.
425///
426/// This struct contains only the metadata fields from [`RouteDefinition`]
427/// that are needed for route lifecycle management, without the `steps` field
428/// (which contains non-Sync types and cannot be stored in a Sync struct).
429#[derive(Clone)]
430pub struct RouteDefinitionInfo {
431    route_id: String,
432    auto_startup: bool,
433    startup_order: i32,
434    pub(crate) source_hash: Option<u64>,
435}
436
437impl RouteDefinitionInfo {
438    /// Get the route ID.
439    pub fn route_id(&self) -> &str {
440        &self.route_id
441    }
442
443    /// Whether this route should start automatically when the context starts.
444    pub fn auto_startup(&self) -> bool {
445        self.auto_startup
446    }
447
448    /// Order in which routes are started. Lower values start first.
449    pub fn startup_order(&self) -> i32 {
450        self.startup_order
451    }
452
453    pub fn source_hash(&self) -> Option<u64> {
454        self.source_hash
455    }
456}
457
458#[cfg(test)]
459mod tests {
460    use super::*;
461
462    #[test]
463    fn test_builder_step_multicast_variant() {
464        use camel_api::MulticastConfig;
465
466        let step = BuilderStep::Multicast {
467            steps: vec![BuilderStep::To("direct:a".into())],
468            config: MulticastConfig::new(),
469        };
470
471        assert!(matches!(step, BuilderStep::Multicast { .. }));
472    }
473
474    #[test]
475    fn test_route_definition_defaults() {
476        let def = RouteDefinition::new("direct:test", vec![]).with_route_id("test-route");
477        assert_eq!(def.route_id(), "test-route");
478        assert!(def.auto_startup());
479        assert_eq!(def.startup_order(), 1000);
480    }
481
482    #[test]
483    fn test_route_definition_builders() {
484        let def = RouteDefinition::new("direct:test", vec![])
485            .with_route_id("my-route")
486            .with_auto_startup(false)
487            .with_startup_order(50);
488        assert_eq!(def.route_id(), "my-route");
489        assert!(!def.auto_startup());
490        assert_eq!(def.startup_order(), 50);
491    }
492
493    #[test]
494    fn test_route_definition_accessors_cover_core_fields() {
495        let def = RouteDefinition::new("direct:in", vec![BuilderStep::To("mock:out".into())])
496            .with_route_id("accessor-route");
497
498        assert_eq!(def.from_uri(), "direct:in");
499        assert_eq!(def.steps().len(), 1);
500        assert!(matches!(def.steps()[0], BuilderStep::To(_)));
501    }
502
503    #[test]
504    fn test_route_definition_error_handler_circuit_breaker_and_concurrency_accessors() {
505        use camel_api::circuit_breaker::CircuitBreakerConfig;
506        use camel_api::error_handler::ErrorHandlerConfig;
507        use camel_component_api::ConcurrencyModel;
508
509        let def = RouteDefinition::new("direct:test", vec![])
510            .with_route_id("eh-route")
511            .with_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlc"))
512            .with_circuit_breaker(CircuitBreakerConfig::new())
513            .with_concurrency(ConcurrencyModel::Concurrent { max: Some(4) });
514
515        let eh = def
516            .error_handler_config()
517            .expect("error handler should be set");
518        assert_eq!(eh.dlc_uri.as_deref(), Some("log:dlc"));
519        assert!(def.circuit_breaker_config().is_some());
520        assert!(matches!(
521            def.concurrency_override(),
522            Some(ConcurrencyModel::Concurrent { max: Some(4) })
523        ));
524    }
525
526    #[test]
527    fn test_builder_step_debug_covers_many_variants() {
528        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
529        use camel_api::{
530            DynamicRouterConfig, Exchange, IdentityProcessor, RoutingSlipConfig, Value,
531        };
532        use std::sync::Arc;
533
534        let expr = LanguageExpressionDef {
535            language: "simple".into(),
536            source: "${body}".into(),
537        };
538
539        let steps = vec![
540            BuilderStep::Processor(BoxProcessor::new(IdentityProcessor)),
541            BuilderStep::To("mock:out".into()),
542            BuilderStep::Stop,
543            BuilderStep::Log {
544                level: camel_processor::LogLevel::Info,
545                message: "hello".into(),
546            },
547            BuilderStep::DeclarativeSetHeader {
548                key: "k".into(),
549                value: ValueSourceDef::Literal(Value::String("v".into())),
550            },
551            BuilderStep::DeclarativeSetBody {
552                value: ValueSourceDef::Expression(expr.clone()),
553            },
554            BuilderStep::DeclarativeFilter {
555                predicate: expr.clone(),
556                steps: vec![BuilderStep::Stop],
557            },
558            BuilderStep::DeclarativeChoice {
559                whens: vec![DeclarativeWhenStep {
560                    predicate: expr.clone(),
561                    steps: vec![BuilderStep::Stop],
562                }],
563                otherwise: Some(vec![BuilderStep::Stop]),
564            },
565            BuilderStep::DeclarativeScript {
566                expression: expr.clone(),
567            },
568            BuilderStep::DeclarativeSplit {
569                expression: expr.clone(),
570                aggregation: AggregationStrategy::Original,
571                parallel: false,
572                parallel_limit: Some(2),
573                stop_on_exception: true,
574                steps: vec![BuilderStep::Stop],
575            },
576            BuilderStep::Split {
577                config: SplitterConfig::new(split_body_lines()),
578                steps: vec![BuilderStep::Stop],
579            },
580            BuilderStep::Aggregate {
581                config: camel_api::AggregatorConfig::correlate_by("id")
582                    .complete_when_size(1)
583                    .build(),
584            },
585            BuilderStep::Filter {
586                predicate: Arc::new(|_: &Exchange| true),
587                steps: vec![BuilderStep::Stop],
588            },
589            BuilderStep::WireTap {
590                uri: "mock:tap".into(),
591            },
592            BuilderStep::DeclarativeLog {
593                level: camel_processor::LogLevel::Info,
594                message: ValueSourceDef::Expression(expr.clone()),
595            },
596            BuilderStep::Bean {
597                name: "bean".into(),
598                method: "call".into(),
599            },
600            BuilderStep::Script {
601                language: "rhai".into(),
602                script: "body".into(),
603            },
604            BuilderStep::Throttle {
605                config: camel_api::ThrottlerConfig::new(10, std::time::Duration::from_millis(10)),
606                steps: vec![BuilderStep::Stop],
607            },
608            BuilderStep::LoadBalance {
609                config: camel_api::LoadBalancerConfig::round_robin(),
610                steps: vec![BuilderStep::To("mock:l1".into())],
611            },
612            BuilderStep::DynamicRouter {
613                config: DynamicRouterConfig::new(Arc::new(|_| Some("mock:dr".into()))),
614            },
615            BuilderStep::RoutingSlip {
616                config: RoutingSlipConfig::new(Arc::new(|_| Some("mock:rs".into()))),
617            },
618        ];
619
620        for step in steps {
621            let dbg = format!("{step:?}");
622            assert!(!dbg.is_empty());
623        }
624    }
625
626    #[test]
627    fn test_route_definition_to_info_preserves_metadata() {
628        let info = RouteDefinition::new("direct:test", vec![])
629            .with_route_id("meta-route")
630            .with_auto_startup(false)
631            .with_startup_order(7)
632            .to_info();
633
634        assert_eq!(info.route_id(), "meta-route");
635        assert!(!info.auto_startup());
636        assert_eq!(info.startup_order(), 7);
637    }
638
639    #[test]
640    fn test_choice_builder_step_debug() {
641        use camel_api::{Exchange, FilterPredicate};
642        use std::sync::Arc;
643
644        fn always_true(_: &Exchange) -> bool {
645            true
646        }
647
648        let step = BuilderStep::Choice {
649            whens: vec![WhenStep {
650                predicate: Arc::new(always_true) as FilterPredicate,
651                steps: vec![BuilderStep::To("mock:a".into())],
652            }],
653            otherwise: None,
654        };
655        let debug = format!("{step:?}");
656        assert!(debug.contains("Choice"));
657    }
658
659    #[test]
660    fn test_route_definition_unit_of_work() {
661        use camel_api::UnitOfWorkConfig;
662        let config = UnitOfWorkConfig {
663            on_complete: Some("log:complete".into()),
664            on_failure: Some("log:failed".into()),
665        };
666        let def = RouteDefinition::new("direct:test", vec![])
667            .with_route_id("uow-test")
668            .with_unit_of_work(config.clone());
669        assert_eq!(
670            def.unit_of_work_config().unwrap().on_complete.as_deref(),
671            Some("log:complete")
672        );
673        assert_eq!(
674            def.unit_of_work_config().unwrap().on_failure.as_deref(),
675            Some("log:failed")
676        );
677
678        let def_no_uow = RouteDefinition::new("direct:test", vec![]).with_route_id("no-uow");
679        assert!(def_no_uow.unit_of_work_config().is_none());
680    }
681}