Skip to main content

camel_core/lifecycle/application/
route_definition.rs

1// lifecycle/application/route_definition.rs
2// Route definition and builder-step types. Route (compiled artifact) lives in adapters.
3
4use camel_api::circuit_breaker::CircuitBreakerConfig;
5use camel_api::error_handler::ErrorHandlerConfig;
6use camel_api::{AggregatorConfig, BoxProcessor, FilterPredicate, MulticastConfig, SplitterConfig};
7use camel_component::ConcurrencyModel;
8
9/// An unresolved when-clause: predicate + nested steps for the sub-pipeline.
10pub struct WhenStep {
11    pub predicate: FilterPredicate,
12    pub steps: Vec<BuilderStep>,
13}
14
15pub use camel_api::declarative::{LanguageExpressionDef, ValueSourceDef};
16
17/// Declarative `when` clause resolved later by the runtime.
18#[derive(Debug)]
19pub struct DeclarativeWhenStep {
20    pub predicate: LanguageExpressionDef,
21    pub steps: Vec<BuilderStep>,
22}
23
24/// A step in an unresolved route definition.
25pub enum BuilderStep {
26    /// A pre-built Tower processor service.
27    Processor(BoxProcessor),
28    /// A destination URI — resolved at start time by CamelContext.
29    To(String),
30    /// A stop step that halts processing immediately.
31    Stop,
32    /// A static log step.
33    Log {
34        level: camel_processor::LogLevel,
35        message: String,
36    },
37    /// Declarative set_header (literal or language-based value), resolved at route-add time.
38    DeclarativeSetHeader { key: String, value: ValueSourceDef },
39    /// Declarative set_body (literal or language-based value), resolved at route-add time.
40    DeclarativeSetBody { value: ValueSourceDef },
41    /// Declarative filter using a language predicate, resolved at route-add time.
42    DeclarativeFilter {
43        predicate: LanguageExpressionDef,
44        steps: Vec<BuilderStep>,
45    },
46    /// Declarative choice/when/otherwise using language predicates, resolved at route-add time.
47    DeclarativeChoice {
48        whens: Vec<DeclarativeWhenStep>,
49        otherwise: Option<Vec<BuilderStep>>,
50    },
51    /// Declarative script step evaluated by language and written to body.
52    DeclarativeScript { expression: LanguageExpressionDef },
53    /// Declarative split using a language expression, resolved at route-add time.
54    DeclarativeSplit {
55        expression: LanguageExpressionDef,
56        aggregation: camel_api::splitter::AggregationStrategy,
57        parallel: bool,
58        parallel_limit: Option<usize>,
59        stop_on_exception: bool,
60        steps: Vec<BuilderStep>,
61    },
62    /// A Splitter sub-pipeline: config + nested steps to execute per fragment.
63    Split {
64        config: SplitterConfig,
65        steps: Vec<BuilderStep>,
66    },
67    /// An Aggregator step: collects exchanges by correlation key, emits when complete.
68    Aggregate { config: AggregatorConfig },
69    /// A Filter sub-pipeline: predicate + nested steps executed only when predicate is true.
70    Filter {
71        predicate: FilterPredicate,
72        steps: Vec<BuilderStep>,
73    },
74    /// A Choice step: evaluates when-clauses in order, routes to the first match.
75    /// If no when matches, the optional otherwise branch is used.
76    Choice {
77        whens: Vec<WhenStep>,
78        otherwise: Option<Vec<BuilderStep>>,
79    },
80    /// A WireTap step: sends a clone of the exchange to a tap endpoint (fire-and-forget).
81    WireTap { uri: String },
82    /// A Multicast step: sends the same exchange to multiple destinations.
83    Multicast {
84        steps: Vec<BuilderStep>,
85        config: MulticastConfig,
86    },
87    /// Declarative log step with a language-evaluated message, resolved at route-add time.
88    DeclarativeLog {
89        level: camel_processor::LogLevel,
90        message: ValueSourceDef,
91    },
92    /// Bean invocation step — resolved at route-add time.
93    Bean { name: String, method: String },
94    /// Script step: executes a script that can mutate the exchange.
95    /// The script has access to `headers`, `properties`, and `body`.
96    Script { language: String, script: String },
97    /// Throttle step: rate limiting with configurable behavior when limit exceeded.
98    Throttle {
99        config: camel_api::ThrottlerConfig,
100        steps: Vec<BuilderStep>,
101    },
102    /// LoadBalance step: distributes exchanges across multiple endpoints using a strategy.
103    LoadBalance {
104        config: camel_api::LoadBalancerConfig,
105        steps: Vec<BuilderStep>,
106    },
107    /// DynamicRouter step: routes exchanges dynamically based on expression evaluation.
108    DynamicRouter {
109        config: camel_api::DynamicRouterConfig,
110    },
111}
112
113impl std::fmt::Debug for BuilderStep {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        match self {
116            BuilderStep::Processor(_) => write!(f, "BuilderStep::Processor(...)"),
117            BuilderStep::To(uri) => write!(f, "BuilderStep::To({uri:?})"),
118            BuilderStep::Stop => write!(f, "BuilderStep::Stop"),
119            BuilderStep::Log { level, message } => write!(
120                f,
121                "BuilderStep::Log {{ level: {level:?}, message: {message:?} }}"
122            ),
123            BuilderStep::DeclarativeSetHeader { key, .. } => {
124                write!(
125                    f,
126                    "BuilderStep::DeclarativeSetHeader {{ key: {key:?}, .. }}"
127                )
128            }
129            BuilderStep::DeclarativeSetBody { .. } => {
130                write!(f, "BuilderStep::DeclarativeSetBody {{ .. }}")
131            }
132            BuilderStep::DeclarativeFilter { steps, .. } => {
133                write!(
134                    f,
135                    "BuilderStep::DeclarativeFilter {{ steps: {steps:?}, .. }}"
136                )
137            }
138            BuilderStep::DeclarativeChoice { whens, otherwise } => {
139                write!(
140                    f,
141                    "BuilderStep::DeclarativeChoice {{ whens: {} clause(s), otherwise: {} }}",
142                    whens.len(),
143                    if otherwise.is_some() { "Some" } else { "None" }
144                )
145            }
146            BuilderStep::DeclarativeScript { expression } => write!(
147                f,
148                "BuilderStep::DeclarativeScript {{ language: {:?}, .. }}",
149                expression.language
150            ),
151            BuilderStep::DeclarativeSplit { steps, .. } => {
152                write!(
153                    f,
154                    "BuilderStep::DeclarativeSplit {{ steps: {steps:?}, .. }}"
155                )
156            }
157            BuilderStep::Split { steps, .. } => {
158                write!(f, "BuilderStep::Split {{ steps: {steps:?}, .. }}")
159            }
160            BuilderStep::Aggregate { .. } => write!(f, "BuilderStep::Aggregate {{ .. }}"),
161            BuilderStep::Filter { steps, .. } => {
162                write!(f, "BuilderStep::Filter {{ steps: {steps:?}, .. }}")
163            }
164            BuilderStep::Choice { whens, otherwise } => {
165                write!(
166                    f,
167                    "BuilderStep::Choice {{ whens: {} clause(s), otherwise: {} }}",
168                    whens.len(),
169                    if otherwise.is_some() { "Some" } else { "None" }
170                )
171            }
172            BuilderStep::WireTap { uri } => write!(f, "BuilderStep::WireTap {{ uri: {uri:?} }}"),
173            BuilderStep::Multicast { steps, .. } => {
174                write!(f, "BuilderStep::Multicast {{ steps: {steps:?}, .. }}")
175            }
176            BuilderStep::DeclarativeLog { level, .. } => {
177                write!(f, "BuilderStep::DeclarativeLog {{ level: {level:?}, .. }}")
178            }
179            BuilderStep::Bean { name, method } => {
180                write!(
181                    f,
182                    "BuilderStep::Bean {{ name: {name:?}, method: {method:?} }}"
183                )
184            }
185            BuilderStep::Script { language, .. } => {
186                write!(f, "BuilderStep::Script {{ language: {language:?}, .. }}")
187            }
188            BuilderStep::Throttle { steps, .. } => {
189                write!(f, "BuilderStep::Throttle {{ steps: {steps:?}, .. }}")
190            }
191            BuilderStep::LoadBalance { steps, .. } => {
192                write!(f, "BuilderStep::LoadBalance {{ steps: {steps:?}, .. }}")
193            }
194            BuilderStep::DynamicRouter { .. } => {
195                write!(f, "BuilderStep::DynamicRouter {{ .. }}")
196            }
197        }
198    }
199}
200
201/// An unresolved route definition. "to" URIs have not been resolved to producers yet.
202pub struct RouteDefinition {
203    pub(crate) from_uri: String,
204    pub(crate) steps: Vec<BuilderStep>,
205    /// Optional per-route error handler config. Takes precedence over the global one.
206    pub(crate) error_handler: Option<ErrorHandlerConfig>,
207    /// Optional circuit breaker config. Applied between error handler and step pipeline.
208    pub(crate) circuit_breaker: Option<CircuitBreakerConfig>,
209    /// User override for the consumer's concurrency model. `None` means
210    /// "use whatever the consumer declares".
211    pub(crate) concurrency: Option<ConcurrencyModel>,
212    /// Unique identifier for this route. Required.
213    pub(crate) route_id: String,
214    /// Whether this route should start automatically when the context starts.
215    pub(crate) auto_startup: bool,
216    /// Order in which routes are started. Lower values start first.
217    pub(crate) startup_order: i32,
218}
219
220impl RouteDefinition {
221    /// Create a new route definition with the required route ID.
222    pub fn new(from_uri: impl Into<String>, steps: Vec<BuilderStep>) -> Self {
223        Self {
224            from_uri: from_uri.into(),
225            steps,
226            error_handler: None,
227            circuit_breaker: None,
228            concurrency: None,
229            route_id: String::new(), // Will be set by with_route_id()
230            auto_startup: true,
231            startup_order: 1000,
232        }
233    }
234
235    /// The source endpoint URI.
236    pub fn from_uri(&self) -> &str {
237        &self.from_uri
238    }
239
240    /// The steps in this route definition.
241    pub fn steps(&self) -> &[BuilderStep] {
242        &self.steps
243    }
244
245    /// Set a per-route error handler, overriding the global one.
246    pub fn with_error_handler(mut self, config: ErrorHandlerConfig) -> Self {
247        self.error_handler = Some(config);
248        self
249    }
250
251    /// Set a circuit breaker for this route.
252    pub fn with_circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
253        self.circuit_breaker = Some(config);
254        self
255    }
256
257    /// Get the circuit breaker config, if set.
258    pub fn circuit_breaker_config(&self) -> Option<&CircuitBreakerConfig> {
259        self.circuit_breaker.as_ref()
260    }
261
262    /// User-specified concurrency override, if any.
263    pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
264        self.concurrency.as_ref()
265    }
266
267    /// Override the consumer's concurrency model for this route.
268    pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
269        self.concurrency = Some(model);
270        self
271    }
272
273    /// Get the route ID.
274    pub fn route_id(&self) -> &str {
275        &self.route_id
276    }
277
278    /// Whether this route should start automatically when the context starts.
279    pub fn auto_startup(&self) -> bool {
280        self.auto_startup
281    }
282
283    /// Order in which routes are started. Lower values start first.
284    pub fn startup_order(&self) -> i32 {
285        self.startup_order
286    }
287
288    /// Set a unique identifier for this route.
289    pub fn with_route_id(mut self, id: impl Into<String>) -> Self {
290        self.route_id = id.into();
291        self
292    }
293
294    /// Set whether this route should start automatically.
295    pub fn with_auto_startup(mut self, auto: bool) -> Self {
296        self.auto_startup = auto;
297        self
298    }
299
300    /// Set the startup order. Lower values start first.
301    pub fn with_startup_order(mut self, order: i32) -> Self {
302        self.startup_order = order;
303        self
304    }
305
306    /// Extract the metadata fields needed for introspection.
307    /// This is used by RouteController to store route info without the non-Sync steps.
308    pub fn to_info(&self) -> RouteDefinitionInfo {
309        RouteDefinitionInfo {
310            route_id: self.route_id.clone(),
311            auto_startup: self.auto_startup,
312            startup_order: self.startup_order,
313        }
314    }
315}
316
317/// Minimal route definition metadata for introspection.
318///
319/// This struct contains only the metadata fields from [`RouteDefinition`]
320/// that are needed for route lifecycle management, without the `steps` field
321/// (which contains non-Sync types and cannot be stored in a Sync struct).
322#[derive(Clone)]
323pub struct RouteDefinitionInfo {
324    route_id: String,
325    auto_startup: bool,
326    startup_order: i32,
327}
328
329impl RouteDefinitionInfo {
330    /// Get the route ID.
331    pub fn route_id(&self) -> &str {
332        &self.route_id
333    }
334
335    /// Whether this route should start automatically when the context starts.
336    pub fn auto_startup(&self) -> bool {
337        self.auto_startup
338    }
339
340    /// Order in which routes are started. Lower values start first.
341    pub fn startup_order(&self) -> i32 {
342        self.startup_order
343    }
344}
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349
350    #[test]
351    fn test_builder_step_multicast_variant() {
352        use camel_api::MulticastConfig;
353
354        let step = BuilderStep::Multicast {
355            steps: vec![BuilderStep::To("direct:a".into())],
356            config: MulticastConfig::new(),
357        };
358
359        assert!(matches!(step, BuilderStep::Multicast { .. }));
360    }
361
362    #[test]
363    fn test_route_definition_defaults() {
364        let def = RouteDefinition::new("direct:test", vec![]).with_route_id("test-route");
365        assert_eq!(def.route_id(), "test-route");
366        assert!(def.auto_startup());
367        assert_eq!(def.startup_order(), 1000);
368    }
369
370    #[test]
371    fn test_route_definition_builders() {
372        let def = RouteDefinition::new("direct:test", vec![])
373            .with_route_id("my-route")
374            .with_auto_startup(false)
375            .with_startup_order(50);
376        assert_eq!(def.route_id(), "my-route");
377        assert!(!def.auto_startup());
378        assert_eq!(def.startup_order(), 50);
379    }
380
381    #[test]
382    fn test_choice_builder_step_debug() {
383        use camel_api::{Exchange, FilterPredicate};
384        use std::sync::Arc;
385
386        fn always_true(_: &Exchange) -> bool {
387            true
388        }
389
390        let step = BuilderStep::Choice {
391            whens: vec![WhenStep {
392                predicate: Arc::new(always_true) as FilterPredicate,
393                steps: vec![BuilderStep::To("mock:a".into())],
394            }],
395            otherwise: None,
396        };
397        let debug = format!("{step:?}");
398        assert!(debug.contains("Choice"));
399    }
400}