Skip to main content

camel_core/lifecycle/application/
route_definition.rs

1// lifecycle/application/route_definition.rs
2// Route definition and builder-step types. Route (compiled artifact) lives in adapters.
3
4use camel_api::UnitOfWorkConfig;
5use camel_api::circuit_breaker::CircuitBreakerConfig;
6use camel_api::error_handler::ErrorHandlerConfig;
7use camel_api::{AggregatorConfig, BoxProcessor, FilterPredicate, MulticastConfig, SplitterConfig};
8use camel_component::ConcurrencyModel;
9
10/// An unresolved when-clause: predicate + nested steps for the sub-pipeline.
11pub struct WhenStep {
12    pub predicate: FilterPredicate,
13    pub steps: Vec<BuilderStep>,
14}
15
16pub use camel_api::declarative::{LanguageExpressionDef, ValueSourceDef};
17
18/// Declarative `when` clause resolved later by the runtime.
19#[derive(Debug)]
20pub struct DeclarativeWhenStep {
21    pub predicate: LanguageExpressionDef,
22    pub steps: Vec<BuilderStep>,
23}
24
25/// A step in an unresolved route definition.
26pub enum BuilderStep {
27    /// A pre-built Tower processor service.
28    Processor(BoxProcessor),
29    /// A destination URI — resolved at start time by CamelContext.
30    To(String),
31    /// A stop step that halts processing immediately.
32    Stop,
33    /// A static log step.
34    Log {
35        level: camel_processor::LogLevel,
36        message: String,
37    },
38    /// Declarative set_header (literal or language-based value), resolved at route-add time.
39    DeclarativeSetHeader { key: String, value: ValueSourceDef },
40    /// Declarative set_body (literal or language-based value), resolved at route-add time.
41    DeclarativeSetBody { value: ValueSourceDef },
42    /// Declarative filter using a language predicate, resolved at route-add time.
43    DeclarativeFilter {
44        predicate: LanguageExpressionDef,
45        steps: Vec<BuilderStep>,
46    },
47    /// Declarative choice/when/otherwise using language predicates, resolved at route-add time.
48    DeclarativeChoice {
49        whens: Vec<DeclarativeWhenStep>,
50        otherwise: Option<Vec<BuilderStep>>,
51    },
52    /// Declarative script step evaluated by language and written to body.
53    DeclarativeScript { expression: LanguageExpressionDef },
54    /// Declarative split using a language expression, resolved at route-add time.
55    DeclarativeSplit {
56        expression: LanguageExpressionDef,
57        aggregation: camel_api::splitter::AggregationStrategy,
58        parallel: bool,
59        parallel_limit: Option<usize>,
60        stop_on_exception: bool,
61        steps: Vec<BuilderStep>,
62    },
63    /// A Splitter sub-pipeline: config + nested steps to execute per fragment.
64    Split {
65        config: SplitterConfig,
66        steps: Vec<BuilderStep>,
67    },
68    /// An Aggregator step: collects exchanges by correlation key, emits when complete.
69    Aggregate { config: AggregatorConfig },
70    /// A Filter sub-pipeline: predicate + nested steps executed only when predicate is true.
71    Filter {
72        predicate: FilterPredicate,
73        steps: Vec<BuilderStep>,
74    },
75    /// A Choice step: evaluates when-clauses in order, routes to the first match.
76    /// If no when matches, the optional otherwise branch is used.
77    Choice {
78        whens: Vec<WhenStep>,
79        otherwise: Option<Vec<BuilderStep>>,
80    },
81    /// A WireTap step: sends a clone of the exchange to a tap endpoint (fire-and-forget).
82    WireTap { uri: String },
83    /// A Multicast step: sends the same exchange to multiple destinations.
84    Multicast {
85        steps: Vec<BuilderStep>,
86        config: MulticastConfig,
87    },
88    /// Declarative log step with a language-evaluated message, resolved at route-add time.
89    DeclarativeLog {
90        level: camel_processor::LogLevel,
91        message: ValueSourceDef,
92    },
93    /// Bean invocation step — resolved at route-add time.
94    Bean { name: String, method: String },
95    /// Script step: executes a script that can mutate the exchange.
96    /// The script has access to `headers`, `properties`, and `body`.
97    Script { language: String, script: String },
98    /// Throttle step: rate limiting with configurable behavior when limit exceeded.
99    Throttle {
100        config: camel_api::ThrottlerConfig,
101        steps: Vec<BuilderStep>,
102    },
103    /// LoadBalance step: distributes exchanges across multiple endpoints using a strategy.
104    LoadBalance {
105        config: camel_api::LoadBalancerConfig,
106        steps: Vec<BuilderStep>,
107    },
108    /// DynamicRouter step: routes exchanges dynamically based on expression evaluation.
109    DynamicRouter {
110        config: camel_api::DynamicRouterConfig,
111    },
112    RoutingSlip {
113        config: camel_api::RoutingSlipConfig,
114    },
115}
116
117impl std::fmt::Debug for BuilderStep {
118    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119        match self {
120            BuilderStep::Processor(_) => write!(f, "BuilderStep::Processor(...)"),
121            BuilderStep::To(uri) => write!(f, "BuilderStep::To({uri:?})"),
122            BuilderStep::Stop => write!(f, "BuilderStep::Stop"),
123            BuilderStep::Log { level, message } => write!(
124                f,
125                "BuilderStep::Log {{ level: {level:?}, message: {message:?} }}"
126            ),
127            BuilderStep::DeclarativeSetHeader { key, .. } => {
128                write!(
129                    f,
130                    "BuilderStep::DeclarativeSetHeader {{ key: {key:?}, .. }}"
131                )
132            }
133            BuilderStep::DeclarativeSetBody { .. } => {
134                write!(f, "BuilderStep::DeclarativeSetBody {{ .. }}")
135            }
136            BuilderStep::DeclarativeFilter { steps, .. } => {
137                write!(
138                    f,
139                    "BuilderStep::DeclarativeFilter {{ steps: {steps:?}, .. }}"
140                )
141            }
142            BuilderStep::DeclarativeChoice { whens, otherwise } => {
143                write!(
144                    f,
145                    "BuilderStep::DeclarativeChoice {{ whens: {} clause(s), otherwise: {} }}",
146                    whens.len(),
147                    if otherwise.is_some() { "Some" } else { "None" }
148                )
149            }
150            BuilderStep::DeclarativeScript { expression } => write!(
151                f,
152                "BuilderStep::DeclarativeScript {{ language: {:?}, .. }}",
153                expression.language
154            ),
155            BuilderStep::DeclarativeSplit { steps, .. } => {
156                write!(
157                    f,
158                    "BuilderStep::DeclarativeSplit {{ steps: {steps:?}, .. }}"
159                )
160            }
161            BuilderStep::Split { steps, .. } => {
162                write!(f, "BuilderStep::Split {{ steps: {steps:?}, .. }}")
163            }
164            BuilderStep::Aggregate { .. } => write!(f, "BuilderStep::Aggregate {{ .. }}"),
165            BuilderStep::Filter { steps, .. } => {
166                write!(f, "BuilderStep::Filter {{ steps: {steps:?}, .. }}")
167            }
168            BuilderStep::Choice { whens, otherwise } => {
169                write!(
170                    f,
171                    "BuilderStep::Choice {{ whens: {} clause(s), otherwise: {} }}",
172                    whens.len(),
173                    if otherwise.is_some() { "Some" } else { "None" }
174                )
175            }
176            BuilderStep::WireTap { uri } => write!(f, "BuilderStep::WireTap {{ uri: {uri:?} }}"),
177            BuilderStep::Multicast { steps, .. } => {
178                write!(f, "BuilderStep::Multicast {{ steps: {steps:?}, .. }}")
179            }
180            BuilderStep::DeclarativeLog { level, .. } => {
181                write!(f, "BuilderStep::DeclarativeLog {{ level: {level:?}, .. }}")
182            }
183            BuilderStep::Bean { name, method } => {
184                write!(
185                    f,
186                    "BuilderStep::Bean {{ name: {name:?}, method: {method:?} }}"
187                )
188            }
189            BuilderStep::Script { language, .. } => {
190                write!(f, "BuilderStep::Script {{ language: {language:?}, .. }}")
191            }
192            BuilderStep::Throttle { steps, .. } => {
193                write!(f, "BuilderStep::Throttle {{ steps: {steps:?}, .. }}")
194            }
195            BuilderStep::LoadBalance { steps, .. } => {
196                write!(f, "BuilderStep::LoadBalance {{ steps: {steps:?}, .. }}")
197            }
198            BuilderStep::DynamicRouter { .. } => {
199                write!(f, "BuilderStep::DynamicRouter {{ .. }}")
200            }
201            BuilderStep::RoutingSlip { .. } => {
202                write!(f, "BuilderStep::RoutingSlip {{ .. }}")
203            }
204        }
205    }
206}
207
208/// An unresolved route definition. "to" URIs have not been resolved to producers yet.
209pub struct RouteDefinition {
210    pub(crate) from_uri: String,
211    pub(crate) steps: Vec<BuilderStep>,
212    /// Optional per-route error handler config. Takes precedence over the global one.
213    pub(crate) error_handler: Option<ErrorHandlerConfig>,
214    /// Optional circuit breaker config. Applied between error handler and step pipeline.
215    pub(crate) circuit_breaker: Option<CircuitBreakerConfig>,
216    /// Optional Unit of Work config for in-flight tracking and completion hooks.
217    pub(crate) unit_of_work: Option<UnitOfWorkConfig>,
218    /// User override for the consumer's concurrency model. `None` means
219    /// "use whatever the consumer declares".
220    pub(crate) concurrency: Option<ConcurrencyModel>,
221    /// Unique identifier for this route. Required.
222    pub(crate) route_id: String,
223    /// Whether this route should start automatically when the context starts.
224    pub(crate) auto_startup: bool,
225    /// Order in which routes are started. Lower values start first.
226    pub(crate) startup_order: i32,
227}
228
229impl RouteDefinition {
230    /// Create a new route definition with the required route ID.
231    pub fn new(from_uri: impl Into<String>, steps: Vec<BuilderStep>) -> Self {
232        Self {
233            from_uri: from_uri.into(),
234            steps,
235            error_handler: None,
236            circuit_breaker: None,
237            unit_of_work: None,
238            concurrency: None,
239            route_id: String::new(), // Will be set by with_route_id()
240            auto_startup: true,
241            startup_order: 1000,
242        }
243    }
244
245    /// The source endpoint URI.
246    pub fn from_uri(&self) -> &str {
247        &self.from_uri
248    }
249
250    /// The steps in this route definition.
251    pub fn steps(&self) -> &[BuilderStep] {
252        &self.steps
253    }
254
255    /// Set a per-route error handler, overriding the global one.
256    pub fn with_error_handler(mut self, config: ErrorHandlerConfig) -> Self {
257        self.error_handler = Some(config);
258        self
259    }
260
261    /// Get the route-level error handler config, if set.
262    pub fn error_handler_config(&self) -> Option<&ErrorHandlerConfig> {
263        self.error_handler.as_ref()
264    }
265
266    /// Set a circuit breaker for this route.
267    pub fn with_circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
268        self.circuit_breaker = Some(config);
269        self
270    }
271
272    /// Set a unit of work config for this route.
273    pub fn with_unit_of_work(mut self, config: UnitOfWorkConfig) -> Self {
274        self.unit_of_work = Some(config);
275        self
276    }
277
278    /// Get the unit of work config, if set.
279    pub fn unit_of_work_config(&self) -> Option<&UnitOfWorkConfig> {
280        self.unit_of_work.as_ref()
281    }
282
283    /// Get the circuit breaker config, if set.
284    pub fn circuit_breaker_config(&self) -> Option<&CircuitBreakerConfig> {
285        self.circuit_breaker.as_ref()
286    }
287
288    /// User-specified concurrency override, if any.
289    pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
290        self.concurrency.as_ref()
291    }
292
293    /// Override the consumer's concurrency model for this route.
294    pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
295        self.concurrency = Some(model);
296        self
297    }
298
299    /// Get the route ID.
300    pub fn route_id(&self) -> &str {
301        &self.route_id
302    }
303
304    /// Whether this route should start automatically when the context starts.
305    pub fn auto_startup(&self) -> bool {
306        self.auto_startup
307    }
308
309    /// Order in which routes are started. Lower values start first.
310    pub fn startup_order(&self) -> i32 {
311        self.startup_order
312    }
313
314    /// Set a unique identifier for this route.
315    pub fn with_route_id(mut self, id: impl Into<String>) -> Self {
316        self.route_id = id.into();
317        self
318    }
319
320    /// Set whether this route should start automatically.
321    pub fn with_auto_startup(mut self, auto: bool) -> Self {
322        self.auto_startup = auto;
323        self
324    }
325
326    /// Set the startup order. Lower values start first.
327    pub fn with_startup_order(mut self, order: i32) -> Self {
328        self.startup_order = order;
329        self
330    }
331
332    /// Extract the metadata fields needed for introspection.
333    /// This is used by RouteController to store route info without the non-Sync steps.
334    pub fn to_info(&self) -> RouteDefinitionInfo {
335        RouteDefinitionInfo {
336            route_id: self.route_id.clone(),
337            auto_startup: self.auto_startup,
338            startup_order: self.startup_order,
339        }
340    }
341}
342
343/// Minimal route definition metadata for introspection.
344///
345/// This struct contains only the metadata fields from [`RouteDefinition`]
346/// that are needed for route lifecycle management, without the `steps` field
347/// (which contains non-Sync types and cannot be stored in a Sync struct).
348#[derive(Clone)]
349pub struct RouteDefinitionInfo {
350    route_id: String,
351    auto_startup: bool,
352    startup_order: i32,
353}
354
355impl RouteDefinitionInfo {
356    /// Get the route ID.
357    pub fn route_id(&self) -> &str {
358        &self.route_id
359    }
360
361    /// Whether this route should start automatically when the context starts.
362    pub fn auto_startup(&self) -> bool {
363        self.auto_startup
364    }
365
366    /// Order in which routes are started. Lower values start first.
367    pub fn startup_order(&self) -> i32 {
368        self.startup_order
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375
376    #[test]
377    fn test_builder_step_multicast_variant() {
378        use camel_api::MulticastConfig;
379
380        let step = BuilderStep::Multicast {
381            steps: vec![BuilderStep::To("direct:a".into())],
382            config: MulticastConfig::new(),
383        };
384
385        assert!(matches!(step, BuilderStep::Multicast { .. }));
386    }
387
388    #[test]
389    fn test_route_definition_defaults() {
390        let def = RouteDefinition::new("direct:test", vec![]).with_route_id("test-route");
391        assert_eq!(def.route_id(), "test-route");
392        assert!(def.auto_startup());
393        assert_eq!(def.startup_order(), 1000);
394    }
395
396    #[test]
397    fn test_route_definition_builders() {
398        let def = RouteDefinition::new("direct:test", vec![])
399            .with_route_id("my-route")
400            .with_auto_startup(false)
401            .with_startup_order(50);
402        assert_eq!(def.route_id(), "my-route");
403        assert!(!def.auto_startup());
404        assert_eq!(def.startup_order(), 50);
405    }
406
407    #[test]
408    fn test_route_definition_accessors_cover_core_fields() {
409        let def = RouteDefinition::new("direct:in", vec![BuilderStep::To("mock:out".into())])
410            .with_route_id("accessor-route");
411
412        assert_eq!(def.from_uri(), "direct:in");
413        assert_eq!(def.steps().len(), 1);
414        assert!(matches!(def.steps()[0], BuilderStep::To(_)));
415    }
416
417    #[test]
418    fn test_route_definition_error_handler_circuit_breaker_and_concurrency_accessors() {
419        use camel_api::circuit_breaker::CircuitBreakerConfig;
420        use camel_api::error_handler::ErrorHandlerConfig;
421        use camel_component::ConcurrencyModel;
422
423        let def = RouteDefinition::new("direct:test", vec![])
424            .with_route_id("eh-route")
425            .with_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlc"))
426            .with_circuit_breaker(CircuitBreakerConfig::new())
427            .with_concurrency(ConcurrencyModel::Concurrent { max: Some(4) });
428
429        let eh = def
430            .error_handler_config()
431            .expect("error handler should be set");
432        assert_eq!(eh.dlc_uri.as_deref(), Some("log:dlc"));
433        assert!(def.circuit_breaker_config().is_some());
434        assert!(matches!(
435            def.concurrency_override(),
436            Some(ConcurrencyModel::Concurrent { max: Some(4) })
437        ));
438    }
439
440    #[test]
441    fn test_builder_step_debug_covers_many_variants() {
442        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
443        use camel_api::{
444            DynamicRouterConfig, Exchange, IdentityProcessor, RoutingSlipConfig, Value,
445        };
446        use std::sync::Arc;
447
448        let expr = LanguageExpressionDef {
449            language: "simple".into(),
450            source: "${body}".into(),
451        };
452
453        let steps = vec![
454            BuilderStep::Processor(BoxProcessor::new(IdentityProcessor)),
455            BuilderStep::To("mock:out".into()),
456            BuilderStep::Stop,
457            BuilderStep::Log {
458                level: camel_processor::LogLevel::Info,
459                message: "hello".into(),
460            },
461            BuilderStep::DeclarativeSetHeader {
462                key: "k".into(),
463                value: ValueSourceDef::Literal(Value::String("v".into())),
464            },
465            BuilderStep::DeclarativeSetBody {
466                value: ValueSourceDef::Expression(expr.clone()),
467            },
468            BuilderStep::DeclarativeFilter {
469                predicate: expr.clone(),
470                steps: vec![BuilderStep::Stop],
471            },
472            BuilderStep::DeclarativeChoice {
473                whens: vec![DeclarativeWhenStep {
474                    predicate: expr.clone(),
475                    steps: vec![BuilderStep::Stop],
476                }],
477                otherwise: Some(vec![BuilderStep::Stop]),
478            },
479            BuilderStep::DeclarativeScript {
480                expression: expr.clone(),
481            },
482            BuilderStep::DeclarativeSplit {
483                expression: expr.clone(),
484                aggregation: AggregationStrategy::Original,
485                parallel: false,
486                parallel_limit: Some(2),
487                stop_on_exception: true,
488                steps: vec![BuilderStep::Stop],
489            },
490            BuilderStep::Split {
491                config: SplitterConfig::new(split_body_lines()),
492                steps: vec![BuilderStep::Stop],
493            },
494            BuilderStep::Aggregate {
495                config: camel_api::AggregatorConfig::correlate_by("id")
496                    .complete_when_size(1)
497                    .build(),
498            },
499            BuilderStep::Filter {
500                predicate: Arc::new(|_: &Exchange| true),
501                steps: vec![BuilderStep::Stop],
502            },
503            BuilderStep::WireTap {
504                uri: "mock:tap".into(),
505            },
506            BuilderStep::DeclarativeLog {
507                level: camel_processor::LogLevel::Info,
508                message: ValueSourceDef::Expression(expr.clone()),
509            },
510            BuilderStep::Bean {
511                name: "bean".into(),
512                method: "call".into(),
513            },
514            BuilderStep::Script {
515                language: "rhai".into(),
516                script: "body".into(),
517            },
518            BuilderStep::Throttle {
519                config: camel_api::ThrottlerConfig::new(10, std::time::Duration::from_millis(10)),
520                steps: vec![BuilderStep::Stop],
521            },
522            BuilderStep::LoadBalance {
523                config: camel_api::LoadBalancerConfig::round_robin(),
524                steps: vec![BuilderStep::To("mock:l1".into())],
525            },
526            BuilderStep::DynamicRouter {
527                config: DynamicRouterConfig::new(Arc::new(|_| Some("mock:dr".into()))),
528            },
529            BuilderStep::RoutingSlip {
530                config: RoutingSlipConfig::new(Arc::new(|_| Some("mock:rs".into()))),
531            },
532        ];
533
534        for step in steps {
535            let dbg = format!("{step:?}");
536            assert!(!dbg.is_empty());
537        }
538    }
539
540    #[test]
541    fn test_route_definition_to_info_preserves_metadata() {
542        let info = RouteDefinition::new("direct:test", vec![])
543            .with_route_id("meta-route")
544            .with_auto_startup(false)
545            .with_startup_order(7)
546            .to_info();
547
548        assert_eq!(info.route_id(), "meta-route");
549        assert!(!info.auto_startup());
550        assert_eq!(info.startup_order(), 7);
551    }
552
553    #[test]
554    fn test_choice_builder_step_debug() {
555        use camel_api::{Exchange, FilterPredicate};
556        use std::sync::Arc;
557
558        fn always_true(_: &Exchange) -> bool {
559            true
560        }
561
562        let step = BuilderStep::Choice {
563            whens: vec![WhenStep {
564                predicate: Arc::new(always_true) as FilterPredicate,
565                steps: vec![BuilderStep::To("mock:a".into())],
566            }],
567            otherwise: None,
568        };
569        let debug = format!("{step:?}");
570        assert!(debug.contains("Choice"));
571    }
572
573    #[test]
574    fn test_route_definition_unit_of_work() {
575        use camel_api::UnitOfWorkConfig;
576        let config = UnitOfWorkConfig {
577            on_complete: Some("log:complete".into()),
578            on_failure: Some("log:failed".into()),
579        };
580        let def = RouteDefinition::new("direct:test", vec![])
581            .with_route_id("uow-test")
582            .with_unit_of_work(config.clone());
583        assert_eq!(
584            def.unit_of_work_config().unwrap().on_complete.as_deref(),
585            Some("log:complete")
586        );
587        assert_eq!(
588            def.unit_of_work_config().unwrap().on_failure.as_deref(),
589            Some("log:failed")
590        );
591
592        let def_no_uow = RouteDefinition::new("direct:test", vec![]).with_route_id("no-uow");
593        assert!(def_no_uow.unit_of_work_config().is_none());
594    }
595}