Skip to main content

camel_core/lifecycle/application/
route_definition.rs

1// lifecycle/application/route_definition.rs
2// Route definition and builder-step types. Route (compiled artifact) lives in adapters.
3
4use std::sync::Arc;
5
6use camel_api::UnitOfWorkConfig;
7use camel_api::circuit_breaker::CircuitBreakerConfig;
8use camel_api::error_handler::ErrorHandlerConfig;
9use camel_api::loop_eip::LoopConfig;
10use camel_api::security_policy::SecurityPolicyConfig;
11use camel_api::{AggregatorConfig, BoxProcessor, FilterPredicate, MulticastConfig, SplitterConfig};
12use camel_auth::TokenAuthenticator;
13use camel_component_api::ConcurrencyModel;
14
15/// An unresolved when-clause: predicate + nested steps for the sub-pipeline.
16pub struct WhenStep {
17    pub predicate: FilterPredicate,
18    pub steps: Vec<BuilderStep>,
19}
20
21pub use camel_api::declarative::{LanguageExpressionDef, ValueSourceDef};
22
23/// Declarative `when` clause resolved later by the runtime.
24#[derive(Debug)]
25pub struct DeclarativeWhenStep {
26    pub predicate: LanguageExpressionDef,
27    pub steps: Vec<BuilderStep>,
28}
29
30/// A step in an unresolved route definition.
31pub enum BuilderStep {
32    /// A pre-built Tower processor service.
33    Processor(BoxProcessor),
34    /// A destination URI — resolved at start time by CamelContext.
35    To(String),
36    /// A stop step that halts processing immediately.
37    Stop,
38    /// A static log step.
39    Log {
40        level: camel_processor::LogLevel,
41        message: String,
42    },
43    /// Declarative set_header (literal or language-based value), resolved at route-add time.
44    DeclarativeSetHeader {
45        key: String,
46        value: ValueSourceDef,
47    },
48    DeclarativeSetProperty {
49        key: String,
50        value_source: ValueSourceDef,
51    },
52    /// Declarative set_body (literal or language-based value), resolved at route-add time.
53    DeclarativeSetBody {
54        value: ValueSourceDef,
55    },
56    /// Declarative filter using a language predicate, resolved at route-add time.
57    DeclarativeFilter {
58        predicate: LanguageExpressionDef,
59        steps: Vec<BuilderStep>,
60    },
61    /// Declarative choice/when/otherwise using language predicates, resolved at route-add time.
62    DeclarativeChoice {
63        whens: Vec<DeclarativeWhenStep>,
64        otherwise: Option<Vec<BuilderStep>>,
65    },
66    /// Declarative script step evaluated by language and written to body.
67    DeclarativeScript {
68        expression: LanguageExpressionDef,
69    },
70    DeclarativeFunction {
71        definition: camel_api::FunctionDefinition,
72    },
73    /// Declarative split using a language expression, resolved at route-add time.
74    DeclarativeSplit {
75        expression: LanguageExpressionDef,
76        aggregation: camel_api::splitter::AggregationStrategy,
77        parallel: bool,
78        parallel_limit: Option<usize>,
79        stop_on_exception: bool,
80        steps: Vec<BuilderStep>,
81    },
82    DeclarativeDynamicRouter {
83        expression: LanguageExpressionDef,
84        uri_delimiter: String,
85        cache_size: i32,
86        ignore_invalid_endpoints: bool,
87        max_iterations: usize,
88    },
89    DeclarativeRoutingSlip {
90        expression: LanguageExpressionDef,
91        uri_delimiter: String,
92        cache_size: i32,
93        ignore_invalid_endpoints: bool,
94    },
95    /// A Splitter sub-pipeline: config + nested steps to execute per fragment.
96    Split {
97        config: SplitterConfig,
98        steps: Vec<BuilderStep>,
99    },
100    /// An Aggregator step: collects exchanges by correlation key, emits when complete.
101    Aggregate {
102        config: AggregatorConfig,
103    },
104    /// A Filter sub-pipeline: predicate + nested steps executed only when predicate is true.
105    Filter {
106        predicate: FilterPredicate,
107        steps: Vec<BuilderStep>,
108    },
109    /// A Choice step: evaluates when-clauses in order, routes to the first match.
110    /// If no when matches, the optional otherwise branch is used.
111    Choice {
112        whens: Vec<WhenStep>,
113        otherwise: Option<Vec<BuilderStep>>,
114    },
115    /// A WireTap step: sends a clone of the exchange to a tap endpoint (fire-and-forget).
116    WireTap {
117        uri: String,
118    },
119    /// A Multicast step: sends the same exchange to multiple destinations.
120    Multicast {
121        steps: Vec<BuilderStep>,
122        config: MulticastConfig,
123    },
124    /// Declarative log step with a language-evaluated message, resolved at route-add time.
125    DeclarativeLog {
126        level: camel_processor::LogLevel,
127        message: ValueSourceDef,
128    },
129    /// Bean invocation step — resolved at route-add time.
130    Bean {
131        name: String,
132        method: String,
133    },
134    /// Script step: executes a script that can mutate the exchange.
135    /// The script has access to `headers`, `properties`, and `body`.
136    Script {
137        language: String,
138        script: String,
139    },
140    /// Throttle step: rate limiting with configurable behavior when limit exceeded.
141    Throttle {
142        config: camel_api::ThrottlerConfig,
143        steps: Vec<BuilderStep>,
144    },
145    /// LoadBalance step: distributes exchanges across multiple endpoints using a strategy.
146    LoadBalance {
147        config: camel_api::LoadBalancerConfig,
148        steps: Vec<BuilderStep>,
149    },
150    /// DynamicRouter step: routes exchanges dynamically based on expression evaluation.
151    DynamicRouter {
152        config: camel_api::DynamicRouterConfig,
153    },
154    RoutingSlip {
155        config: camel_api::RoutingSlipConfig,
156    },
157    RecipientList {
158        config: camel_api::recipient_list::RecipientListConfig,
159    },
160    DeclarativeRecipientList {
161        expression: LanguageExpressionDef,
162        delimiter: String,
163        parallel: bool,
164        parallel_limit: Option<usize>,
165        stop_on_exception: bool,
166        aggregation: String,
167    },
168    Delay {
169        config: camel_api::DelayConfig,
170    },
171    /// Runtime loop with closure-based predicate (programmatic DSL).
172    Loop {
173        config: LoopConfig,
174        steps: Vec<BuilderStep>,
175    },
176    /// Declarative loop with optional language-based while predicate (YAML DSL).
177    DeclarativeLoop {
178        count: Option<usize>,
179        while_predicate: Option<LanguageExpressionDef>,
180        steps: Vec<BuilderStep>,
181    },
182    /// EIP-7 enrich: synchronous content enrichment via a resolved producer.
183    Enrich {
184        uri: String,
185        strategy: Option<String>,
186        timeout_ms: Option<u64>,
187    },
188    /// EIP-7 pollEnrich: blocking poll of a PollingConsumer with timeout.
189    PollEnrich {
190        uri: String,
191        strategy: Option<String>,
192        timeout_ms: Option<u64>,
193    },
194}
195
196impl std::fmt::Debug for BuilderStep {
197    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198        match self {
199            BuilderStep::Processor(_) => write!(f, "BuilderStep::Processor(...)"),
200            BuilderStep::To(uri) => write!(f, "BuilderStep::To({uri:?})"),
201            BuilderStep::Stop => write!(f, "BuilderStep::Stop"),
202            BuilderStep::Log { level, message } => write!(
203                f,
204                "BuilderStep::Log {{ level: {level:?}, message: {message:?} }}"
205            ),
206            BuilderStep::DeclarativeSetHeader { key, .. } => {
207                write!(
208                    f,
209                    "BuilderStep::DeclarativeSetHeader {{ key: {key:?}, .. }}"
210                )
211            }
212            BuilderStep::DeclarativeSetBody { .. } => {
213                write!(f, "BuilderStep::DeclarativeSetBody {{ .. }}")
214            }
215            BuilderStep::DeclarativeSetProperty { key, .. } => {
216                write!(
217                    f,
218                    "BuilderStep::DeclarativeSetProperty {{ key: {key:?}, .. }}"
219                )
220            }
221            BuilderStep::DeclarativeFilter { steps, .. } => {
222                write!(
223                    f,
224                    "BuilderStep::DeclarativeFilter {{ steps: {steps:?}, .. }}"
225                )
226            }
227            BuilderStep::DeclarativeChoice { whens, otherwise } => {
228                write!(
229                    f,
230                    "BuilderStep::DeclarativeChoice {{ whens: {} clause(s), otherwise: {} }}",
231                    whens.len(),
232                    if otherwise.is_some() { "Some" } else { "None" }
233                )
234            }
235            BuilderStep::DeclarativeScript { expression } => write!(
236                f,
237                "BuilderStep::DeclarativeScript {{ language: {:?}, .. }}",
238                expression.language
239            ),
240            BuilderStep::DeclarativeFunction { definition } => write!(
241                f,
242                "BuilderStep::DeclarativeFunction {{ id: {:?}, runtime: {:?}, .. }}",
243                definition.id, definition.runtime
244            ),
245            BuilderStep::DeclarativeSplit { steps, .. } => {
246                write!(
247                    f,
248                    "BuilderStep::DeclarativeSplit {{ steps: {steps:?}, .. }}"
249                )
250            }
251            BuilderStep::DeclarativeDynamicRouter { expression, .. } => write!(
252                f,
253                "BuilderStep::DeclarativeDynamicRouter {{ language: {:?}, .. }}",
254                expression.language
255            ),
256            BuilderStep::DeclarativeRoutingSlip { expression, .. } => write!(
257                f,
258                "BuilderStep::DeclarativeRoutingSlip {{ language: {:?}, .. }}",
259                expression.language
260            ),
261            BuilderStep::Split { steps, .. } => {
262                write!(f, "BuilderStep::Split {{ steps: {steps:?}, .. }}")
263            }
264            BuilderStep::Aggregate { .. } => write!(f, "BuilderStep::Aggregate {{ .. }}"),
265            BuilderStep::Filter { steps, .. } => {
266                write!(f, "BuilderStep::Filter {{ steps: {steps:?}, .. }}")
267            }
268            BuilderStep::Choice { whens, otherwise } => {
269                write!(
270                    f,
271                    "BuilderStep::Choice {{ whens: {} clause(s), otherwise: {} }}",
272                    whens.len(),
273                    if otherwise.is_some() { "Some" } else { "None" }
274                )
275            }
276            BuilderStep::WireTap { uri } => write!(f, "BuilderStep::WireTap {{ uri: {uri:?} }}"),
277            BuilderStep::Multicast { steps, .. } => {
278                write!(f, "BuilderStep::Multicast {{ steps: {steps:?}, .. }}")
279            }
280            BuilderStep::DeclarativeLog { level, .. } => {
281                write!(f, "BuilderStep::DeclarativeLog {{ level: {level:?}, .. }}")
282            }
283            BuilderStep::Bean { name, method } => {
284                write!(
285                    f,
286                    "BuilderStep::Bean {{ name: {name:?}, method: {method:?} }}"
287                )
288            }
289            BuilderStep::Script { language, .. } => {
290                write!(f, "BuilderStep::Script {{ language: {language:?}, .. }}")
291            }
292            BuilderStep::Throttle { steps, .. } => {
293                write!(f, "BuilderStep::Throttle {{ steps: {steps:?}, .. }}")
294            }
295            BuilderStep::LoadBalance { steps, .. } => {
296                write!(f, "BuilderStep::LoadBalance {{ steps: {steps:?}, .. }}")
297            }
298            BuilderStep::DynamicRouter { .. } => {
299                write!(f, "BuilderStep::DynamicRouter {{ .. }}")
300            }
301            BuilderStep::RoutingSlip { .. } => {
302                write!(f, "BuilderStep::RoutingSlip {{ .. }}")
303            }
304            BuilderStep::RecipientList { .. } => {
305                write!(f, "BuilderStep::RecipientList {{ .. }}")
306            }
307            BuilderStep::DeclarativeRecipientList {
308                expression,
309                aggregation,
310                ..
311            } => write!(
312                f,
313                "BuilderStep::DeclarativeRecipientList {{ language: {:?}, aggregation: {:?}, .. }}",
314                expression.language, aggregation
315            ),
316            BuilderStep::Delay { config } => {
317                write!(f, "BuilderStep::Delay {{ config: {:?} }}", config)
318            }
319            BuilderStep::Loop { config, steps } => {
320                write!(
321                    f,
322                    "BuilderStep::Loop {{ config: {:?}, steps: {} }}",
323                    config.mode_name(),
324                    steps.len()
325                )
326            }
327            BuilderStep::DeclarativeLoop {
328                count,
329                while_predicate,
330                steps,
331            } => {
332                write!(
333                    f,
334                    "BuilderStep::DeclarativeLoop {{ count: {:?}, while: {}, steps: {} }}",
335                    count,
336                    while_predicate.is_some(),
337                    steps.len()
338                )
339            }
340            BuilderStep::Enrich {
341                uri,
342                strategy,
343                timeout_ms,
344            } => {
345                write!(
346                    f,
347                    "BuilderStep::Enrich {{ uri: {uri:?}, strategy: {strategy:?}, timeout_ms: {timeout_ms:?} }}"
348                )
349            }
350            BuilderStep::PollEnrich {
351                uri,
352                strategy,
353                timeout_ms,
354            } => {
355                write!(
356                    f,
357                    "BuilderStep::PollEnrich {{ uri: {uri:?}, strategy: {strategy:?}, timeout_ms: {timeout_ms:?} }}"
358                )
359            }
360        }
361    }
362}
363
364/// An unresolved route definition. "to" URIs have not been resolved to producers yet.
365pub struct RouteDefinition {
366    pub(crate) from_uri: String,
367    pub(crate) steps: Vec<BuilderStep>,
368    /// Optional per-route error handler config. Takes precedence over the global one.
369    pub(crate) error_handler: Option<ErrorHandlerConfig>,
370    /// Optional circuit breaker config. Applied between error handler and step pipeline.
371    pub(crate) circuit_breaker: Option<CircuitBreakerConfig>,
372    pub(crate) security_policy: Option<SecurityPolicyConfig>,
373    /// Optional token authenticator for validating JWT/OAuth tokens.
374    pub(crate) security_authenticator: Option<Arc<dyn TokenAuthenticator>>,
375    /// Optional Unit of Work config for in-flight tracking and completion hooks.
376    pub(crate) unit_of_work: Option<UnitOfWorkConfig>,
377    /// User override for the consumer's concurrency model. `None` means
378    /// "use whatever the consumer declares".
379    pub(crate) concurrency: Option<ConcurrencyModel>,
380    /// Unique identifier for this route. Required.
381    pub(crate) route_id: String,
382    /// Whether this route should start automatically when the context starts.
383    pub(crate) auto_startup: bool,
384    /// Order in which routes are started. Lower values start first.
385    pub(crate) startup_order: i32,
386    pub(crate) source_hash: Option<u64>,
387}
388
389impl RouteDefinition {
390    /// Create a new route definition with the required route ID.
391    pub fn new(from_uri: impl Into<String>, steps: Vec<BuilderStep>) -> Self {
392        Self {
393            from_uri: from_uri.into(),
394            steps,
395            error_handler: None,
396            circuit_breaker: None,
397            security_policy: None,
398            security_authenticator: None,
399            unit_of_work: None,
400            concurrency: None,
401            route_id: String::new(), // Will be set by with_route_id()
402            auto_startup: true,
403            startup_order: 1000,
404            source_hash: None,
405        }
406    }
407
408    /// The source endpoint URI.
409    pub fn from_uri(&self) -> &str {
410        &self.from_uri
411    }
412
413    /// The steps in this route definition.
414    pub fn steps(&self) -> &[BuilderStep] {
415        &self.steps
416    }
417
418    /// Set a per-route error handler, overriding the global one.
419    pub fn with_error_handler(mut self, config: ErrorHandlerConfig) -> Self {
420        self.error_handler = Some(config);
421        self
422    }
423
424    /// Get the route-level error handler config, if set.
425    pub fn error_handler_config(&self) -> Option<&ErrorHandlerConfig> {
426        self.error_handler.as_ref()
427    }
428
429    /// Set a circuit breaker for this route.
430    pub fn with_circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
431        self.circuit_breaker = Some(config);
432        self
433    }
434
435    /// Set a security policy for this route.
436    pub fn with_security_policy(mut self, config: SecurityPolicyConfig) -> Self {
437        self.security_policy = Some(config);
438        self
439    }
440
441    /// Set a token authenticator for this route.
442    pub fn with_security_authenticator(
443        mut self,
444        authenticator: Arc<dyn TokenAuthenticator>,
445    ) -> Self {
446        self.security_authenticator = Some(authenticator);
447        self
448    }
449
450    /// Set a unit of work config for this route.
451    pub fn with_unit_of_work(mut self, config: UnitOfWorkConfig) -> Self {
452        self.unit_of_work = Some(config);
453        self
454    }
455
456    /// Get the unit of work config, if set.
457    pub fn unit_of_work_config(&self) -> Option<&UnitOfWorkConfig> {
458        self.unit_of_work.as_ref()
459    }
460
461    /// Get the circuit breaker config, if set.
462    pub fn circuit_breaker_config(&self) -> Option<&CircuitBreakerConfig> {
463        self.circuit_breaker.as_ref()
464    }
465
466    pub fn security_policy_config(&self) -> Option<&SecurityPolicyConfig> {
467        self.security_policy.as_ref()
468    }
469
470    pub fn security_authenticator(&self) -> Option<&Arc<dyn TokenAuthenticator>> {
471        self.security_authenticator.as_ref()
472    }
473
474    /// User-specified concurrency override, if any.
475    pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
476        self.concurrency.as_ref()
477    }
478
479    /// Override the consumer's concurrency model for this route.
480    pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
481        self.concurrency = Some(model);
482        self
483    }
484
485    /// Get the route ID.
486    pub fn route_id(&self) -> &str {
487        &self.route_id
488    }
489
490    /// Whether this route should start automatically when the context starts.
491    pub fn auto_startup(&self) -> bool {
492        self.auto_startup
493    }
494
495    /// Order in which routes are started. Lower values start first.
496    pub fn startup_order(&self) -> i32 {
497        self.startup_order
498    }
499
500    /// Set a unique identifier for this route.
501    pub fn with_route_id(mut self, id: impl Into<String>) -> Self {
502        self.route_id = id.into();
503        self
504    }
505
506    /// Set whether this route should start automatically.
507    pub fn with_auto_startup(mut self, auto: bool) -> Self {
508        self.auto_startup = auto;
509        self
510    }
511
512    /// Set the startup order. Lower values start first.
513    pub fn with_startup_order(mut self, order: i32) -> Self {
514        self.startup_order = order;
515        self
516    }
517
518    pub fn with_source_hash(mut self, hash: u64) -> Self {
519        self.source_hash = Some(hash);
520        self
521    }
522
523    pub fn source_hash(&self) -> Option<u64> {
524        self.source_hash
525    }
526
527    /// Extract the metadata fields needed for introspection.
528    /// This is used by RouteController to store route info without the non-Sync steps.
529    pub fn to_info(&self) -> RouteDefinitionInfo {
530        RouteDefinitionInfo {
531            route_id: self.route_id.clone(),
532            auto_startup: self.auto_startup,
533            startup_order: self.startup_order,
534            source_hash: self.source_hash,
535        }
536    }
537}
538
539/// Minimal route definition metadata for introspection.
540///
541/// This struct contains only the metadata fields from [`RouteDefinition`]
542/// that are needed for route lifecycle management, without the `steps` field
543/// (which contains non-Sync types and cannot be stored in a Sync struct).
544#[derive(Clone)]
545pub struct RouteDefinitionInfo {
546    route_id: String,
547    auto_startup: bool,
548    startup_order: i32,
549    pub(crate) source_hash: Option<u64>,
550}
551
552impl RouteDefinitionInfo {
553    /// Get the route ID.
554    pub fn route_id(&self) -> &str {
555        &self.route_id
556    }
557
558    /// Whether this route should start automatically when the context starts.
559    pub fn auto_startup(&self) -> bool {
560        self.auto_startup
561    }
562
563    /// Order in which routes are started. Lower values start first.
564    pub fn startup_order(&self) -> i32 {
565        self.startup_order
566    }
567
568    pub fn source_hash(&self) -> Option<u64> {
569        self.source_hash
570    }
571}
572
573#[cfg(test)]
574mod tests {
575    use super::*;
576
577    #[test]
578    fn test_builder_step_multicast_variant() {
579        use camel_api::MulticastConfig;
580
581        let step = BuilderStep::Multicast {
582            steps: vec![BuilderStep::To("direct:a".into())],
583            config: MulticastConfig::new(),
584        };
585
586        assert!(matches!(step, BuilderStep::Multicast { .. }));
587    }
588
589    #[test]
590    fn test_route_definition_defaults() {
591        let def = RouteDefinition::new("direct:test", vec![]).with_route_id("test-route");
592        assert_eq!(def.route_id(), "test-route");
593        assert!(def.auto_startup());
594        assert_eq!(def.startup_order(), 1000);
595    }
596
597    #[test]
598    fn test_route_definition_builders() {
599        let def = RouteDefinition::new("direct:test", vec![])
600            .with_route_id("my-route")
601            .with_auto_startup(false)
602            .with_startup_order(50);
603        assert_eq!(def.route_id(), "my-route");
604        assert!(!def.auto_startup());
605        assert_eq!(def.startup_order(), 50);
606    }
607
608    #[test]
609    fn test_route_definition_accessors_cover_core_fields() {
610        let def = RouteDefinition::new("direct:in", vec![BuilderStep::To("mock:out".into())])
611            .with_route_id("accessor-route");
612
613        assert_eq!(def.from_uri(), "direct:in");
614        assert_eq!(def.steps().len(), 1);
615        assert!(matches!(def.steps()[0], BuilderStep::To(_)));
616    }
617
618    #[test]
619    fn test_route_definition_error_handler_circuit_breaker_and_concurrency_accessors() {
620        use camel_api::circuit_breaker::CircuitBreakerConfig;
621        use camel_api::error_handler::ErrorHandlerConfig;
622        use camel_component_api::ConcurrencyModel;
623
624        let def = RouteDefinition::new("direct:test", vec![])
625            .with_route_id("eh-route")
626            .with_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlc"))
627            .with_circuit_breaker(CircuitBreakerConfig::new())
628            .with_concurrency(ConcurrencyModel::Concurrent { max: Some(4) });
629
630        let eh = def
631            .error_handler_config()
632            .expect("error handler should be set");
633        assert_eq!(eh.dlc_uri.as_deref(), Some("log:dlc"));
634        assert!(def.circuit_breaker_config().is_some());
635        assert!(matches!(
636            def.concurrency_override(),
637            Some(ConcurrencyModel::Concurrent { max: Some(4) })
638        ));
639    }
640
641    #[test]
642    fn test_builder_step_debug_covers_many_variants() {
643        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
644        use camel_api::{
645            DynamicRouterConfig, Exchange, IdentityProcessor, RoutingSlipConfig, Value,
646        };
647        use std::sync::Arc;
648
649        let expr = LanguageExpressionDef {
650            language: "simple".into(),
651            source: "${body}".into(),
652        };
653
654        let steps = vec![
655            BuilderStep::Processor(BoxProcessor::new(IdentityProcessor)),
656            BuilderStep::To("mock:out".into()),
657            BuilderStep::Stop,
658            BuilderStep::Log {
659                level: camel_processor::LogLevel::Info,
660                message: "hello".into(),
661            },
662            BuilderStep::DeclarativeSetHeader {
663                key: "k".into(),
664                value: ValueSourceDef::Literal(Value::String("v".into())),
665            },
666            BuilderStep::DeclarativeSetBody {
667                value: ValueSourceDef::Expression(expr.clone()),
668            },
669            BuilderStep::DeclarativeFilter {
670                predicate: expr.clone(),
671                steps: vec![BuilderStep::Stop],
672            },
673            BuilderStep::DeclarativeChoice {
674                whens: vec![DeclarativeWhenStep {
675                    predicate: expr.clone(),
676                    steps: vec![BuilderStep::Stop],
677                }],
678                otherwise: Some(vec![BuilderStep::Stop]),
679            },
680            BuilderStep::DeclarativeScript {
681                expression: expr.clone(),
682            },
683            BuilderStep::DeclarativeSplit {
684                expression: expr.clone(),
685                aggregation: AggregationStrategy::Original,
686                parallel: false,
687                parallel_limit: Some(2),
688                stop_on_exception: true,
689                steps: vec![BuilderStep::Stop],
690            },
691            BuilderStep::Split {
692                config: SplitterConfig::new(split_body_lines()),
693                steps: vec![BuilderStep::Stop],
694            },
695            BuilderStep::Aggregate {
696                config: camel_api::AggregatorConfig::correlate_by("id")
697                    .complete_when_size(1)
698                    .build()
699                    .unwrap(),
700            },
701            BuilderStep::Filter {
702                predicate: Arc::new(|_: &Exchange| true),
703                steps: vec![BuilderStep::Stop],
704            },
705            BuilderStep::WireTap {
706                uri: "mock:tap".into(),
707            },
708            BuilderStep::DeclarativeLog {
709                level: camel_processor::LogLevel::Info,
710                message: ValueSourceDef::Expression(expr.clone()),
711            },
712            BuilderStep::Bean {
713                name: "bean".into(),
714                method: "call".into(),
715            },
716            BuilderStep::Script {
717                language: "rhai".into(),
718                script: "body".into(),
719            },
720            BuilderStep::Throttle {
721                config: camel_api::ThrottlerConfig::new(10, std::time::Duration::from_millis(10)),
722                steps: vec![BuilderStep::Stop],
723            },
724            BuilderStep::LoadBalance {
725                config: camel_api::LoadBalancerConfig::round_robin(),
726                steps: vec![BuilderStep::To("mock:l1".into())],
727            },
728            BuilderStep::DynamicRouter {
729                config: DynamicRouterConfig::new(Arc::new(|_| Some("mock:dr".into()))),
730            },
731            BuilderStep::RoutingSlip {
732                config: RoutingSlipConfig::new(Arc::new(|_| Some("mock:rs".into()))),
733            },
734        ];
735
736        for step in steps {
737            let dbg = format!("{step:?}");
738            assert!(!dbg.is_empty());
739        }
740    }
741
742    #[test]
743    fn test_route_definition_to_info_preserves_metadata() {
744        let info = RouteDefinition::new("direct:test", vec![])
745            .with_route_id("meta-route")
746            .with_auto_startup(false)
747            .with_startup_order(7)
748            .to_info();
749
750        assert_eq!(info.route_id(), "meta-route");
751        assert!(!info.auto_startup());
752        assert_eq!(info.startup_order(), 7);
753    }
754
755    #[test]
756    fn test_choice_builder_step_debug() {
757        use camel_api::{Exchange, FilterPredicate};
758        use std::sync::Arc;
759
760        fn always_true(_: &Exchange) -> bool {
761            true
762        }
763
764        let step = BuilderStep::Choice {
765            whens: vec![WhenStep {
766                predicate: Arc::new(always_true) as FilterPredicate,
767                steps: vec![BuilderStep::To("mock:a".into())],
768            }],
769            otherwise: None,
770        };
771        let debug = format!("{step:?}");
772        assert!(debug.contains("Choice"));
773    }
774
775    #[test]
776    fn test_route_definition_unit_of_work() {
777        use camel_api::UnitOfWorkConfig;
778        let config = UnitOfWorkConfig {
779            on_complete: Some("log:complete".into()),
780            on_failure: Some("log:failed".into()),
781        };
782        let def = RouteDefinition::new("direct:test", vec![])
783            .with_route_id("uow-test")
784            .with_unit_of_work(config.clone());
785        assert_eq!(
786            def.unit_of_work_config().unwrap().on_complete.as_deref(),
787            Some("log:complete")
788        );
789        assert_eq!(
790            def.unit_of_work_config().unwrap().on_failure.as_deref(),
791            Some("log:failed")
792        );
793
794        let def_no_uow = RouteDefinition::new("direct:test", vec![]).with_route_id("no-uow");
795        assert!(def_no_uow.unit_of_work_config().is_none());
796    }
797
798    #[test]
799    fn test_route_definition_security_policy_accessor() {
800        use async_trait::async_trait;
801        use camel_api::CamelError;
802        use camel_api::Exchange;
803        use camel_api::security_policy::{
804            AuthorizationDecision, Principal, SecurityPolicy, SecurityPolicyConfig,
805        };
806
807        struct StubPolicy;
808        #[async_trait]
809        impl SecurityPolicy for StubPolicy {
810            async fn evaluate(
811                &self,
812                _exchange: &mut Exchange,
813            ) -> Result<AuthorizationDecision, CamelError> {
814                Ok(AuthorizationDecision::Granted {
815                    principal: Principal {
816                        subject: "test".into(),
817                        issuer: "test".into(),
818                        audience: vec![],
819                        scopes: vec![],
820                        roles: vec![],
821                        claims: serde_json::Value::Null,
822                    },
823                })
824            }
825        }
826
827        let def_no_sp = RouteDefinition::new("direct:test", vec![]).with_route_id("no-sp");
828        assert!(def_no_sp.security_policy_config().is_none());
829
830        let def = RouteDefinition::new("direct:test", vec![])
831            .with_route_id("sp-test")
832            .with_security_policy(SecurityPolicyConfig::new(StubPolicy));
833        assert!(def.security_policy_config().is_some());
834    }
835
836    #[test]
837    fn test_route_definition_security_authenticator_accessor() {
838        use camel_api::security_policy::Principal;
839
840        struct TestAuth;
841        #[async_trait::async_trait]
842        impl TokenAuthenticator for TestAuth {
843            async fn authenticate_bearer(
844                &self,
845                _token: &str,
846            ) -> Result<Principal, camel_api::CamelError> {
847                Ok(Principal {
848                    subject: "test".into(),
849                    issuer: "test".into(),
850                    audience: vec![],
851                    scopes: vec![],
852                    roles: vec![],
853                    claims: serde_json::Value::Null,
854                })
855            }
856        }
857
858        let def_no_auth = RouteDefinition::new("direct:test".to_string(), vec![]);
859        assert!(def_no_auth.security_authenticator().is_none());
860
861        let auth = Arc::new(TestAuth);
862        let def = RouteDefinition::new("direct:test".to_string(), vec![])
863            .with_security_authenticator(auth);
864        assert!(def.security_authenticator().is_some());
865    }
866}