Skip to main content

camel_core/lifecycle/application/
route_definition.rs

1// lifecycle/application/route_definition.rs
2// Route definition and builder-step types. Route (compiled artifact) lives in adapters.
3
4use camel_api::circuit_breaker::CircuitBreakerConfig;
5use camel_api::error_handler::ErrorHandlerConfig;
6use camel_api::UnitOfWorkConfig;
7use camel_api::{AggregatorConfig, BoxProcessor, FilterPredicate, MulticastConfig, SplitterConfig};
8use camel_component_api::ConcurrencyModel;
9
10/// An unresolved when-clause: predicate + nested steps for the sub-pipeline.
11pub struct WhenStep {
12    pub predicate: FilterPredicate,
13    pub steps: Vec<BuilderStep>,
14}
15
16pub use camel_api::declarative::{LanguageExpressionDef, ValueSourceDef};
17
18/// Declarative `when` clause resolved later by the runtime.
19#[derive(Debug)]
20pub struct DeclarativeWhenStep {
21    pub predicate: LanguageExpressionDef,
22    pub steps: Vec<BuilderStep>,
23}
24
25/// A step in an unresolved route definition.
26pub enum BuilderStep {
27    /// A pre-built Tower processor service.
28    Processor(BoxProcessor),
29    /// A destination URI — resolved at start time by CamelContext.
30    To(String),
31    /// A stop step that halts processing immediately.
32    Stop,
33    /// A static log step.
34    Log {
35        level: camel_processor::LogLevel,
36        message: String,
37    },
38    /// Declarative set_header (literal or language-based value), resolved at route-add time.
39    DeclarativeSetHeader {
40        key: String,
41        value: ValueSourceDef,
42    },
43    /// Declarative set_body (literal or language-based value), resolved at route-add time.
44    DeclarativeSetBody {
45        value: ValueSourceDef,
46    },
47    /// Declarative filter using a language predicate, resolved at route-add time.
48    DeclarativeFilter {
49        predicate: LanguageExpressionDef,
50        steps: Vec<BuilderStep>,
51    },
52    /// Declarative choice/when/otherwise using language predicates, resolved at route-add time.
53    DeclarativeChoice {
54        whens: Vec<DeclarativeWhenStep>,
55        otherwise: Option<Vec<BuilderStep>>,
56    },
57    /// Declarative script step evaluated by language and written to body.
58    DeclarativeScript {
59        expression: LanguageExpressionDef,
60    },
61    /// Declarative split using a language expression, resolved at route-add time.
62    DeclarativeSplit {
63        expression: LanguageExpressionDef,
64        aggregation: camel_api::splitter::AggregationStrategy,
65        parallel: bool,
66        parallel_limit: Option<usize>,
67        stop_on_exception: bool,
68        steps: Vec<BuilderStep>,
69    },
70    DeclarativeDynamicRouter {
71        expression: LanguageExpressionDef,
72        uri_delimiter: String,
73        cache_size: i32,
74        ignore_invalid_endpoints: bool,
75        max_iterations: usize,
76    },
77    DeclarativeRoutingSlip {
78        expression: LanguageExpressionDef,
79        uri_delimiter: String,
80        cache_size: i32,
81        ignore_invalid_endpoints: bool,
82    },
83    /// A Splitter sub-pipeline: config + nested steps to execute per fragment.
84    Split {
85        config: SplitterConfig,
86        steps: Vec<BuilderStep>,
87    },
88    /// An Aggregator step: collects exchanges by correlation key, emits when complete.
89    Aggregate {
90        config: AggregatorConfig,
91    },
92    /// A Filter sub-pipeline: predicate + nested steps executed only when predicate is true.
93    Filter {
94        predicate: FilterPredicate,
95        steps: Vec<BuilderStep>,
96    },
97    /// A Choice step: evaluates when-clauses in order, routes to the first match.
98    /// If no when matches, the optional otherwise branch is used.
99    Choice {
100        whens: Vec<WhenStep>,
101        otherwise: Option<Vec<BuilderStep>>,
102    },
103    /// A WireTap step: sends a clone of the exchange to a tap endpoint (fire-and-forget).
104    WireTap {
105        uri: String,
106    },
107    /// A Multicast step: sends the same exchange to multiple destinations.
108    Multicast {
109        steps: Vec<BuilderStep>,
110        config: MulticastConfig,
111    },
112    /// Declarative log step with a language-evaluated message, resolved at route-add time.
113    DeclarativeLog {
114        level: camel_processor::LogLevel,
115        message: ValueSourceDef,
116    },
117    /// Bean invocation step — resolved at route-add time.
118    Bean {
119        name: String,
120        method: String,
121    },
122    /// Script step: executes a script that can mutate the exchange.
123    /// The script has access to `headers`, `properties`, and `body`.
124    Script {
125        language: String,
126        script: String,
127    },
128    /// Throttle step: rate limiting with configurable behavior when limit exceeded.
129    Throttle {
130        config: camel_api::ThrottlerConfig,
131        steps: Vec<BuilderStep>,
132    },
133    /// LoadBalance step: distributes exchanges across multiple endpoints using a strategy.
134    LoadBalance {
135        config: camel_api::LoadBalancerConfig,
136        steps: Vec<BuilderStep>,
137    },
138    /// DynamicRouter step: routes exchanges dynamically based on expression evaluation.
139    DynamicRouter {
140        config: camel_api::DynamicRouterConfig,
141    },
142    RoutingSlip {
143        config: camel_api::RoutingSlipConfig,
144    },
145    Delay {
146        config: camel_api::DelayConfig,
147    },
148}
149
150impl std::fmt::Debug for BuilderStep {
151    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152        match self {
153            BuilderStep::Processor(_) => write!(f, "BuilderStep::Processor(...)"),
154            BuilderStep::To(uri) => write!(f, "BuilderStep::To({uri:?})"),
155            BuilderStep::Stop => write!(f, "BuilderStep::Stop"),
156            BuilderStep::Log { level, message } => write!(
157                f,
158                "BuilderStep::Log {{ level: {level:?}, message: {message:?} }}"
159            ),
160            BuilderStep::DeclarativeSetHeader { key, .. } => {
161                write!(
162                    f,
163                    "BuilderStep::DeclarativeSetHeader {{ key: {key:?}, .. }}"
164                )
165            }
166            BuilderStep::DeclarativeSetBody { .. } => {
167                write!(f, "BuilderStep::DeclarativeSetBody {{ .. }}")
168            }
169            BuilderStep::DeclarativeFilter { steps, .. } => {
170                write!(
171                    f,
172                    "BuilderStep::DeclarativeFilter {{ steps: {steps:?}, .. }}"
173                )
174            }
175            BuilderStep::DeclarativeChoice { whens, otherwise } => {
176                write!(
177                    f,
178                    "BuilderStep::DeclarativeChoice {{ whens: {} clause(s), otherwise: {} }}",
179                    whens.len(),
180                    if otherwise.is_some() { "Some" } else { "None" }
181                )
182            }
183            BuilderStep::DeclarativeScript { expression } => write!(
184                f,
185                "BuilderStep::DeclarativeScript {{ language: {:?}, .. }}",
186                expression.language
187            ),
188            BuilderStep::DeclarativeSplit { steps, .. } => {
189                write!(
190                    f,
191                    "BuilderStep::DeclarativeSplit {{ steps: {steps:?}, .. }}"
192                )
193            }
194            BuilderStep::DeclarativeDynamicRouter { expression, .. } => write!(
195                f,
196                "BuilderStep::DeclarativeDynamicRouter {{ language: {:?}, .. }}",
197                expression.language
198            ),
199            BuilderStep::DeclarativeRoutingSlip { expression, .. } => write!(
200                f,
201                "BuilderStep::DeclarativeRoutingSlip {{ language: {:?}, .. }}",
202                expression.language
203            ),
204            BuilderStep::Split { steps, .. } => {
205                write!(f, "BuilderStep::Split {{ steps: {steps:?}, .. }}")
206            }
207            BuilderStep::Aggregate { .. } => write!(f, "BuilderStep::Aggregate {{ .. }}"),
208            BuilderStep::Filter { steps, .. } => {
209                write!(f, "BuilderStep::Filter {{ steps: {steps:?}, .. }}")
210            }
211            BuilderStep::Choice { whens, otherwise } => {
212                write!(
213                    f,
214                    "BuilderStep::Choice {{ whens: {} clause(s), otherwise: {} }}",
215                    whens.len(),
216                    if otherwise.is_some() { "Some" } else { "None" }
217                )
218            }
219            BuilderStep::WireTap { uri } => write!(f, "BuilderStep::WireTap {{ uri: {uri:?} }}"),
220            BuilderStep::Multicast { steps, .. } => {
221                write!(f, "BuilderStep::Multicast {{ steps: {steps:?}, .. }}")
222            }
223            BuilderStep::DeclarativeLog { level, .. } => {
224                write!(f, "BuilderStep::DeclarativeLog {{ level: {level:?}, .. }}")
225            }
226            BuilderStep::Bean { name, method } => {
227                write!(
228                    f,
229                    "BuilderStep::Bean {{ name: {name:?}, method: {method:?} }}"
230                )
231            }
232            BuilderStep::Script { language, .. } => {
233                write!(f, "BuilderStep::Script {{ language: {language:?}, .. }}")
234            }
235            BuilderStep::Throttle { steps, .. } => {
236                write!(f, "BuilderStep::Throttle {{ steps: {steps:?}, .. }}")
237            }
238            BuilderStep::LoadBalance { steps, .. } => {
239                write!(f, "BuilderStep::LoadBalance {{ steps: {steps:?}, .. }}")
240            }
241            BuilderStep::DynamicRouter { .. } => {
242                write!(f, "BuilderStep::DynamicRouter {{ .. }}")
243            }
244            BuilderStep::RoutingSlip { .. } => {
245                write!(f, "BuilderStep::RoutingSlip {{ .. }}")
246            }
247            BuilderStep::Delay { config } => {
248                write!(f, "BuilderStep::Delay {{ config: {:?} }}", config)
249            }
250        }
251    }
252}
253
254/// An unresolved route definition. "to" URIs have not been resolved to producers yet.
255pub struct RouteDefinition {
256    pub(crate) from_uri: String,
257    pub(crate) steps: Vec<BuilderStep>,
258    /// Optional per-route error handler config. Takes precedence over the global one.
259    pub(crate) error_handler: Option<ErrorHandlerConfig>,
260    /// Optional circuit breaker config. Applied between error handler and step pipeline.
261    pub(crate) circuit_breaker: Option<CircuitBreakerConfig>,
262    /// Optional Unit of Work config for in-flight tracking and completion hooks.
263    pub(crate) unit_of_work: Option<UnitOfWorkConfig>,
264    /// User override for the consumer's concurrency model. `None` means
265    /// "use whatever the consumer declares".
266    pub(crate) concurrency: Option<ConcurrencyModel>,
267    /// Unique identifier for this route. Required.
268    pub(crate) route_id: String,
269    /// Whether this route should start automatically when the context starts.
270    pub(crate) auto_startup: bool,
271    /// Order in which routes are started. Lower values start first.
272    pub(crate) startup_order: i32,
273}
274
275impl RouteDefinition {
276    /// Create a new route definition with the required route ID.
277    pub fn new(from_uri: impl Into<String>, steps: Vec<BuilderStep>) -> Self {
278        Self {
279            from_uri: from_uri.into(),
280            steps,
281            error_handler: None,
282            circuit_breaker: None,
283            unit_of_work: None,
284            concurrency: None,
285            route_id: String::new(), // Will be set by with_route_id()
286            auto_startup: true,
287            startup_order: 1000,
288        }
289    }
290
291    /// The source endpoint URI.
292    pub fn from_uri(&self) -> &str {
293        &self.from_uri
294    }
295
296    /// The steps in this route definition.
297    pub fn steps(&self) -> &[BuilderStep] {
298        &self.steps
299    }
300
301    /// Set a per-route error handler, overriding the global one.
302    pub fn with_error_handler(mut self, config: ErrorHandlerConfig) -> Self {
303        self.error_handler = Some(config);
304        self
305    }
306
307    /// Get the route-level error handler config, if set.
308    pub fn error_handler_config(&self) -> Option<&ErrorHandlerConfig> {
309        self.error_handler.as_ref()
310    }
311
312    /// Set a circuit breaker for this route.
313    pub fn with_circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
314        self.circuit_breaker = Some(config);
315        self
316    }
317
318    /// Set a unit of work config for this route.
319    pub fn with_unit_of_work(mut self, config: UnitOfWorkConfig) -> Self {
320        self.unit_of_work = Some(config);
321        self
322    }
323
324    /// Get the unit of work config, if set.
325    pub fn unit_of_work_config(&self) -> Option<&UnitOfWorkConfig> {
326        self.unit_of_work.as_ref()
327    }
328
329    /// Get the circuit breaker config, if set.
330    pub fn circuit_breaker_config(&self) -> Option<&CircuitBreakerConfig> {
331        self.circuit_breaker.as_ref()
332    }
333
334    /// User-specified concurrency override, if any.
335    pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
336        self.concurrency.as_ref()
337    }
338
339    /// Override the consumer's concurrency model for this route.
340    pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
341        self.concurrency = Some(model);
342        self
343    }
344
345    /// Get the route ID.
346    pub fn route_id(&self) -> &str {
347        &self.route_id
348    }
349
350    /// Whether this route should start automatically when the context starts.
351    pub fn auto_startup(&self) -> bool {
352        self.auto_startup
353    }
354
355    /// Order in which routes are started. Lower values start first.
356    pub fn startup_order(&self) -> i32 {
357        self.startup_order
358    }
359
360    /// Set a unique identifier for this route.
361    pub fn with_route_id(mut self, id: impl Into<String>) -> Self {
362        self.route_id = id.into();
363        self
364    }
365
366    /// Set whether this route should start automatically.
367    pub fn with_auto_startup(mut self, auto: bool) -> Self {
368        self.auto_startup = auto;
369        self
370    }
371
372    /// Set the startup order. Lower values start first.
373    pub fn with_startup_order(mut self, order: i32) -> Self {
374        self.startup_order = order;
375        self
376    }
377
378    /// Extract the metadata fields needed for introspection.
379    /// This is used by RouteController to store route info without the non-Sync steps.
380    pub fn to_info(&self) -> RouteDefinitionInfo {
381        RouteDefinitionInfo {
382            route_id: self.route_id.clone(),
383            auto_startup: self.auto_startup,
384            startup_order: self.startup_order,
385        }
386    }
387}
388
389/// Minimal route definition metadata for introspection.
390///
391/// This struct contains only the metadata fields from [`RouteDefinition`]
392/// that are needed for route lifecycle management, without the `steps` field
393/// (which contains non-Sync types and cannot be stored in a Sync struct).
394#[derive(Clone)]
395pub struct RouteDefinitionInfo {
396    route_id: String,
397    auto_startup: bool,
398    startup_order: i32,
399}
400
401impl RouteDefinitionInfo {
402    /// Get the route ID.
403    pub fn route_id(&self) -> &str {
404        &self.route_id
405    }
406
407    /// Whether this route should start automatically when the context starts.
408    pub fn auto_startup(&self) -> bool {
409        self.auto_startup
410    }
411
412    /// Order in which routes are started. Lower values start first.
413    pub fn startup_order(&self) -> i32 {
414        self.startup_order
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421
422    #[test]
423    fn test_builder_step_multicast_variant() {
424        use camel_api::MulticastConfig;
425
426        let step = BuilderStep::Multicast {
427            steps: vec![BuilderStep::To("direct:a".into())],
428            config: MulticastConfig::new(),
429        };
430
431        assert!(matches!(step, BuilderStep::Multicast { .. }));
432    }
433
434    #[test]
435    fn test_route_definition_defaults() {
436        let def = RouteDefinition::new("direct:test", vec![]).with_route_id("test-route");
437        assert_eq!(def.route_id(), "test-route");
438        assert!(def.auto_startup());
439        assert_eq!(def.startup_order(), 1000);
440    }
441
442    #[test]
443    fn test_route_definition_builders() {
444        let def = RouteDefinition::new("direct:test", vec![])
445            .with_route_id("my-route")
446            .with_auto_startup(false)
447            .with_startup_order(50);
448        assert_eq!(def.route_id(), "my-route");
449        assert!(!def.auto_startup());
450        assert_eq!(def.startup_order(), 50);
451    }
452
453    #[test]
454    fn test_route_definition_accessors_cover_core_fields() {
455        let def = RouteDefinition::new("direct:in", vec![BuilderStep::To("mock:out".into())])
456            .with_route_id("accessor-route");
457
458        assert_eq!(def.from_uri(), "direct:in");
459        assert_eq!(def.steps().len(), 1);
460        assert!(matches!(def.steps()[0], BuilderStep::To(_)));
461    }
462
463    #[test]
464    fn test_route_definition_error_handler_circuit_breaker_and_concurrency_accessors() {
465        use camel_api::circuit_breaker::CircuitBreakerConfig;
466        use camel_api::error_handler::ErrorHandlerConfig;
467        use camel_component_api::ConcurrencyModel;
468
469        let def = RouteDefinition::new("direct:test", vec![])
470            .with_route_id("eh-route")
471            .with_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlc"))
472            .with_circuit_breaker(CircuitBreakerConfig::new())
473            .with_concurrency(ConcurrencyModel::Concurrent { max: Some(4) });
474
475        let eh = def
476            .error_handler_config()
477            .expect("error handler should be set");
478        assert_eq!(eh.dlc_uri.as_deref(), Some("log:dlc"));
479        assert!(def.circuit_breaker_config().is_some());
480        assert!(matches!(
481            def.concurrency_override(),
482            Some(ConcurrencyModel::Concurrent { max: Some(4) })
483        ));
484    }
485
486    #[test]
487    fn test_builder_step_debug_covers_many_variants() {
488        use camel_api::splitter::{split_body_lines, AggregationStrategy, SplitterConfig};
489        use camel_api::{
490            DynamicRouterConfig, Exchange, IdentityProcessor, RoutingSlipConfig, Value,
491        };
492        use std::sync::Arc;
493
494        let expr = LanguageExpressionDef {
495            language: "simple".into(),
496            source: "${body}".into(),
497        };
498
499        let steps = vec![
500            BuilderStep::Processor(BoxProcessor::new(IdentityProcessor)),
501            BuilderStep::To("mock:out".into()),
502            BuilderStep::Stop,
503            BuilderStep::Log {
504                level: camel_processor::LogLevel::Info,
505                message: "hello".into(),
506            },
507            BuilderStep::DeclarativeSetHeader {
508                key: "k".into(),
509                value: ValueSourceDef::Literal(Value::String("v".into())),
510            },
511            BuilderStep::DeclarativeSetBody {
512                value: ValueSourceDef::Expression(expr.clone()),
513            },
514            BuilderStep::DeclarativeFilter {
515                predicate: expr.clone(),
516                steps: vec![BuilderStep::Stop],
517            },
518            BuilderStep::DeclarativeChoice {
519                whens: vec![DeclarativeWhenStep {
520                    predicate: expr.clone(),
521                    steps: vec![BuilderStep::Stop],
522                }],
523                otherwise: Some(vec![BuilderStep::Stop]),
524            },
525            BuilderStep::DeclarativeScript {
526                expression: expr.clone(),
527            },
528            BuilderStep::DeclarativeSplit {
529                expression: expr.clone(),
530                aggregation: AggregationStrategy::Original,
531                parallel: false,
532                parallel_limit: Some(2),
533                stop_on_exception: true,
534                steps: vec![BuilderStep::Stop],
535            },
536            BuilderStep::Split {
537                config: SplitterConfig::new(split_body_lines()),
538                steps: vec![BuilderStep::Stop],
539            },
540            BuilderStep::Aggregate {
541                config: camel_api::AggregatorConfig::correlate_by("id")
542                    .complete_when_size(1)
543                    .build(),
544            },
545            BuilderStep::Filter {
546                predicate: Arc::new(|_: &Exchange| true),
547                steps: vec![BuilderStep::Stop],
548            },
549            BuilderStep::WireTap {
550                uri: "mock:tap".into(),
551            },
552            BuilderStep::DeclarativeLog {
553                level: camel_processor::LogLevel::Info,
554                message: ValueSourceDef::Expression(expr.clone()),
555            },
556            BuilderStep::Bean {
557                name: "bean".into(),
558                method: "call".into(),
559            },
560            BuilderStep::Script {
561                language: "rhai".into(),
562                script: "body".into(),
563            },
564            BuilderStep::Throttle {
565                config: camel_api::ThrottlerConfig::new(10, std::time::Duration::from_millis(10)),
566                steps: vec![BuilderStep::Stop],
567            },
568            BuilderStep::LoadBalance {
569                config: camel_api::LoadBalancerConfig::round_robin(),
570                steps: vec![BuilderStep::To("mock:l1".into())],
571            },
572            BuilderStep::DynamicRouter {
573                config: DynamicRouterConfig::new(Arc::new(|_| Some("mock:dr".into()))),
574            },
575            BuilderStep::RoutingSlip {
576                config: RoutingSlipConfig::new(Arc::new(|_| Some("mock:rs".into()))),
577            },
578        ];
579
580        for step in steps {
581            let dbg = format!("{step:?}");
582            assert!(!dbg.is_empty());
583        }
584    }
585
586    #[test]
587    fn test_route_definition_to_info_preserves_metadata() {
588        let info = RouteDefinition::new("direct:test", vec![])
589            .with_route_id("meta-route")
590            .with_auto_startup(false)
591            .with_startup_order(7)
592            .to_info();
593
594        assert_eq!(info.route_id(), "meta-route");
595        assert!(!info.auto_startup());
596        assert_eq!(info.startup_order(), 7);
597    }
598
599    #[test]
600    fn test_choice_builder_step_debug() {
601        use camel_api::{Exchange, FilterPredicate};
602        use std::sync::Arc;
603
604        fn always_true(_: &Exchange) -> bool {
605            true
606        }
607
608        let step = BuilderStep::Choice {
609            whens: vec![WhenStep {
610                predicate: Arc::new(always_true) as FilterPredicate,
611                steps: vec![BuilderStep::To("mock:a".into())],
612            }],
613            otherwise: None,
614        };
615        let debug = format!("{step:?}");
616        assert!(debug.contains("Choice"));
617    }
618
619    #[test]
620    fn test_route_definition_unit_of_work() {
621        use camel_api::UnitOfWorkConfig;
622        let config = UnitOfWorkConfig {
623            on_complete: Some("log:complete".into()),
624            on_failure: Some("log:failed".into()),
625        };
626        let def = RouteDefinition::new("direct:test", vec![])
627            .with_route_id("uow-test")
628            .with_unit_of_work(config.clone());
629        assert_eq!(
630            def.unit_of_work_config().unwrap().on_complete.as_deref(),
631            Some("log:complete")
632        );
633        assert_eq!(
634            def.unit_of_work_config().unwrap().on_failure.as_deref(),
635            Some("log:failed")
636        );
637
638        let def_no_uow = RouteDefinition::new("direct:test", vec![]).with_route_id("no-uow");
639        assert!(def_no_uow.unit_of_work_config().is_none());
640    }
641}