Skip to main content

camel_core/lifecycle/application/
route_definition.rs

1// lifecycle/application/route_definition.rs
2// Route definition and builder-step types. Route (compiled artifact) lives in adapters.
3
4use std::sync::Arc;
5
6use camel_api::UnitOfWorkConfig;
7use camel_api::circuit_breaker::CircuitBreakerConfig;
8use camel_api::error_handler::ErrorHandlerConfig;
9use camel_api::loop_eip::LoopConfig;
10use camel_api::security_policy::SecurityPolicyConfig;
11use camel_api::{AggregatorConfig, BoxProcessor, FilterPredicate, MulticastConfig, SplitterConfig};
12use camel_auth::TokenAuthenticator;
13use camel_component_api::ConcurrencyModel;
14
15/// An unresolved when-clause: predicate + nested steps for the sub-pipeline.
16pub struct WhenStep {
17    pub predicate: FilterPredicate,
18    pub steps: Vec<BuilderStep>,
19}
20
21pub use camel_api::declarative::{LanguageExpressionDef, ValueSourceDef};
22
23/// Declarative `when` clause resolved later by the runtime.
24#[derive(Debug)]
25pub struct DeclarativeWhenStep {
26    pub predicate: LanguageExpressionDef,
27    pub steps: Vec<BuilderStep>,
28}
29
30/// A step in an unresolved route definition.
31pub enum BuilderStep {
32    /// A pre-built Tower processor service.
33    Processor(BoxProcessor),
34    /// A destination URI — resolved at start time by CamelContext.
35    To(String),
36    /// A stop step that halts processing immediately.
37    Stop,
38    /// A static log step.
39    Log {
40        level: camel_processor::LogLevel,
41        message: String,
42    },
43    /// Declarative set_header (literal or language-based value), resolved at route-add time.
44    DeclarativeSetHeader {
45        key: String,
46        value: ValueSourceDef,
47    },
48    DeclarativeSetProperty {
49        key: String,
50        value_source: ValueSourceDef,
51    },
52    /// Declarative set_body (literal or language-based value), resolved at route-add time.
53    DeclarativeSetBody {
54        value: ValueSourceDef,
55    },
56    /// Declarative filter using a language predicate, resolved at route-add time.
57    DeclarativeFilter {
58        predicate: LanguageExpressionDef,
59        steps: Vec<BuilderStep>,
60    },
61    /// Declarative choice/when/otherwise using language predicates, resolved at route-add time.
62    DeclarativeChoice {
63        whens: Vec<DeclarativeWhenStep>,
64        otherwise: Option<Vec<BuilderStep>>,
65    },
66    /// Declarative script step evaluated by language and written to body.
67    DeclarativeScript {
68        expression: LanguageExpressionDef,
69    },
70    DeclarativeFunction {
71        definition: camel_api::FunctionDefinition,
72    },
73    /// Declarative split using a language expression, resolved at route-add time.
74    DeclarativeSplit {
75        expression: LanguageExpressionDef,
76        aggregation: camel_api::splitter::AggregationStrategy,
77        parallel: bool,
78        parallel_limit: Option<usize>,
79        stop_on_exception: bool,
80        steps: Vec<BuilderStep>,
81    },
82    /// Declarative stream split using a streaming split expression, resolved at route-add time.
83    DeclarativeStreamSplit {
84        stream_config: camel_api::StreamSplitConfig,
85        aggregation: camel_api::splitter::AggregationStrategy,
86        stop_on_exception: bool,
87        steps: Vec<BuilderStep>,
88    },
89    DeclarativeDynamicRouter {
90        expression: LanguageExpressionDef,
91        uri_delimiter: String,
92        cache_size: i32,
93        ignore_invalid_endpoints: bool,
94        max_iterations: usize,
95    },
96    DeclarativeRoutingSlip {
97        expression: LanguageExpressionDef,
98        uri_delimiter: String,
99        cache_size: i32,
100        ignore_invalid_endpoints: bool,
101    },
102    /// A Splitter sub-pipeline: config + nested steps to execute per fragment.
103    Split {
104        config: SplitterConfig,
105        steps: Vec<BuilderStep>,
106    },
107    /// An Aggregator step: collects exchanges by correlation key, emits when complete.
108    Aggregate {
109        config: AggregatorConfig,
110    },
111    /// A Filter sub-pipeline: predicate + nested steps executed only when predicate is true.
112    Filter {
113        predicate: FilterPredicate,
114        steps: Vec<BuilderStep>,
115    },
116    /// A Choice step: evaluates when-clauses in order, routes to the first match.
117    /// If no when matches, the optional otherwise branch is used.
118    Choice {
119        whens: Vec<WhenStep>,
120        otherwise: Option<Vec<BuilderStep>>,
121    },
122    /// A WireTap step: sends a clone of the exchange to a tap endpoint (fire-and-forget).
123    WireTap {
124        uri: String,
125    },
126    /// A Multicast step: sends the same exchange to multiple destinations.
127    Multicast {
128        steps: Vec<BuilderStep>,
129        config: MulticastConfig,
130    },
131    /// Declarative log step with a language-evaluated message, resolved at route-add time.
132    DeclarativeLog {
133        level: camel_processor::LogLevel,
134        message: ValueSourceDef,
135    },
136    /// Bean invocation step — resolved at route-add time.
137    Bean {
138        name: String,
139        method: String,
140    },
141    /// Script step: executes a script that can mutate the exchange.
142    /// The script has access to `headers`, `properties`, and `body`.
143    Script {
144        language: String,
145        script: String,
146    },
147    /// Throttle step: rate limiting with configurable behavior when limit exceeded.
148    Throttle {
149        config: camel_api::ThrottlerConfig,
150        steps: Vec<BuilderStep>,
151    },
152    /// LoadBalance step: distributes exchanges across multiple endpoints using a strategy.
153    LoadBalance {
154        config: camel_api::LoadBalancerConfig,
155        steps: Vec<BuilderStep>,
156    },
157    /// DynamicRouter step: routes exchanges dynamically based on expression evaluation.
158    DynamicRouter {
159        config: camel_api::DynamicRouterConfig,
160    },
161    RoutingSlip {
162        config: camel_api::RoutingSlipConfig,
163    },
164    RecipientList {
165        config: camel_api::recipient_list::RecipientListConfig,
166    },
167    DeclarativeRecipientList {
168        expression: LanguageExpressionDef,
169        delimiter: String,
170        parallel: bool,
171        parallel_limit: Option<usize>,
172        stop_on_exception: bool,
173        aggregation: String,
174    },
175    Delay {
176        config: camel_api::DelayConfig,
177    },
178    /// Runtime loop with closure-based predicate (programmatic DSL).
179    Loop {
180        config: LoopConfig,
181        steps: Vec<BuilderStep>,
182    },
183    /// Declarative loop with optional language-based while predicate (YAML DSL).
184    DeclarativeLoop {
185        count: Option<usize>,
186        while_predicate: Option<LanguageExpressionDef>,
187        steps: Vec<BuilderStep>,
188    },
189    /// EIP-7 enrich: synchronous content enrichment via a resolved producer.
190    Enrich {
191        uri: String,
192        strategy: Option<String>,
193        timeout_ms: Option<u64>,
194    },
195    /// EIP-7 pollEnrich: blocking poll of a PollingConsumer with timeout.
196    PollEnrich {
197        uri: String,
198        strategy: Option<String>,
199        timeout_ms: Option<u64>,
200    },
201}
202
203impl std::fmt::Debug for BuilderStep {
204    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205        match self {
206            BuilderStep::Processor(_) => write!(f, "BuilderStep::Processor(...)"),
207            BuilderStep::To(uri) => write!(f, "BuilderStep::To({uri:?})"),
208            BuilderStep::Stop => write!(f, "BuilderStep::Stop"),
209            BuilderStep::Log { level, message } => write!(
210                f,
211                "BuilderStep::Log {{ level: {level:?}, message: {message:?} }}"
212            ),
213            BuilderStep::DeclarativeSetHeader { key, .. } => {
214                write!(
215                    f,
216                    "BuilderStep::DeclarativeSetHeader {{ key: {key:?}, .. }}"
217                )
218            }
219            BuilderStep::DeclarativeSetBody { .. } => {
220                write!(f, "BuilderStep::DeclarativeSetBody {{ .. }}")
221            }
222            BuilderStep::DeclarativeSetProperty { key, .. } => {
223                write!(
224                    f,
225                    "BuilderStep::DeclarativeSetProperty {{ key: {key:?}, .. }}"
226                )
227            }
228            BuilderStep::DeclarativeFilter { steps, .. } => {
229                write!(
230                    f,
231                    "BuilderStep::DeclarativeFilter {{ steps: {steps:?}, .. }}"
232                )
233            }
234            BuilderStep::DeclarativeChoice { whens, otherwise } => {
235                write!(
236                    f,
237                    "BuilderStep::DeclarativeChoice {{ whens: {} clause(s), otherwise: {} }}",
238                    whens.len(),
239                    if otherwise.is_some() { "Some" } else { "None" }
240                )
241            }
242            BuilderStep::DeclarativeScript { expression } => write!(
243                f,
244                "BuilderStep::DeclarativeScript {{ language: {:?}, .. }}",
245                expression.language
246            ),
247            BuilderStep::DeclarativeFunction { definition } => write!(
248                f,
249                "BuilderStep::DeclarativeFunction {{ id: {:?}, runtime: {:?}, .. }}",
250                definition.id, definition.runtime
251            ),
252            BuilderStep::DeclarativeSplit { steps, .. } => {
253                write!(
254                    f,
255                    "BuilderStep::DeclarativeSplit {{ steps: {steps:?}, .. }}"
256                )
257            }
258            BuilderStep::DeclarativeStreamSplit {
259                steps,
260                stream_config,
261                ..
262            } => {
263                write!(
264                    f,
265                    "BuilderStep::DeclarativeStreamSplit {{ format: {:?}, steps: {} step(s) }}",
266                    stream_config.format,
267                    steps.len()
268                )
269            }
270            BuilderStep::DeclarativeDynamicRouter { expression, .. } => write!(
271                f,
272                "BuilderStep::DeclarativeDynamicRouter {{ language: {:?}, .. }}",
273                expression.language
274            ),
275            BuilderStep::DeclarativeRoutingSlip { expression, .. } => write!(
276                f,
277                "BuilderStep::DeclarativeRoutingSlip {{ language: {:?}, .. }}",
278                expression.language
279            ),
280            BuilderStep::Split { steps, .. } => {
281                write!(f, "BuilderStep::Split {{ steps: {steps:?}, .. }}")
282            }
283            BuilderStep::Aggregate { .. } => write!(f, "BuilderStep::Aggregate {{ .. }}"),
284            BuilderStep::Filter { steps, .. } => {
285                write!(f, "BuilderStep::Filter {{ steps: {steps:?}, .. }}")
286            }
287            BuilderStep::Choice { whens, otherwise } => {
288                write!(
289                    f,
290                    "BuilderStep::Choice {{ whens: {} clause(s), otherwise: {} }}",
291                    whens.len(),
292                    if otherwise.is_some() { "Some" } else { "None" }
293                )
294            }
295            BuilderStep::WireTap { uri } => write!(f, "BuilderStep::WireTap {{ uri: {uri:?} }}"),
296            BuilderStep::Multicast { steps, .. } => {
297                write!(f, "BuilderStep::Multicast {{ steps: {steps:?}, .. }}")
298            }
299            BuilderStep::DeclarativeLog { level, .. } => {
300                write!(f, "BuilderStep::DeclarativeLog {{ level: {level:?}, .. }}")
301            }
302            BuilderStep::Bean { name, method } => {
303                write!(
304                    f,
305                    "BuilderStep::Bean {{ name: {name:?}, method: {method:?} }}"
306                )
307            }
308            BuilderStep::Script { language, .. } => {
309                write!(f, "BuilderStep::Script {{ language: {language:?}, .. }}")
310            }
311            BuilderStep::Throttle { steps, .. } => {
312                write!(f, "BuilderStep::Throttle {{ steps: {steps:?}, .. }}")
313            }
314            BuilderStep::LoadBalance { steps, .. } => {
315                write!(f, "BuilderStep::LoadBalance {{ steps: {steps:?}, .. }}")
316            }
317            BuilderStep::DynamicRouter { .. } => {
318                write!(f, "BuilderStep::DynamicRouter {{ .. }}")
319            }
320            BuilderStep::RoutingSlip { .. } => {
321                write!(f, "BuilderStep::RoutingSlip {{ .. }}")
322            }
323            BuilderStep::RecipientList { .. } => {
324                write!(f, "BuilderStep::RecipientList {{ .. }}")
325            }
326            BuilderStep::DeclarativeRecipientList {
327                expression,
328                aggregation,
329                ..
330            } => write!(
331                f,
332                "BuilderStep::DeclarativeRecipientList {{ language: {:?}, aggregation: {:?}, .. }}",
333                expression.language, aggregation
334            ),
335            BuilderStep::Delay { config } => {
336                write!(f, "BuilderStep::Delay {{ config: {:?} }}", config)
337            }
338            BuilderStep::Loop { config, steps } => {
339                write!(
340                    f,
341                    "BuilderStep::Loop {{ config: {:?}, steps: {} }}",
342                    config.mode_name(),
343                    steps.len()
344                )
345            }
346            BuilderStep::DeclarativeLoop {
347                count,
348                while_predicate,
349                steps,
350            } => {
351                write!(
352                    f,
353                    "BuilderStep::DeclarativeLoop {{ count: {:?}, while: {}, steps: {} }}",
354                    count,
355                    while_predicate.is_some(),
356                    steps.len()
357                )
358            }
359            BuilderStep::Enrich {
360                uri,
361                strategy,
362                timeout_ms,
363            } => {
364                write!(
365                    f,
366                    "BuilderStep::Enrich {{ uri: {uri:?}, strategy: {strategy:?}, timeout_ms: {timeout_ms:?} }}"
367                )
368            }
369            BuilderStep::PollEnrich {
370                uri,
371                strategy,
372                timeout_ms,
373            } => {
374                write!(
375                    f,
376                    "BuilderStep::PollEnrich {{ uri: {uri:?}, strategy: {strategy:?}, timeout_ms: {timeout_ms:?} }}"
377                )
378            }
379        }
380    }
381}
382
383/// An unresolved route definition. "to" URIs have not been resolved to producers yet.
384pub struct RouteDefinition {
385    pub(crate) from_uri: String,
386    pub(crate) steps: Vec<BuilderStep>,
387    /// Optional per-route error handler config. Takes precedence over the global one.
388    pub(crate) error_handler: Option<ErrorHandlerConfig>,
389    /// Optional circuit breaker config. Applied between error handler and step pipeline.
390    pub(crate) circuit_breaker: Option<CircuitBreakerConfig>,
391    pub(crate) security_policy: Option<SecurityPolicyConfig>,
392    /// Optional token authenticator for validating JWT/OAuth tokens.
393    pub(crate) security_authenticator: Option<Arc<dyn TokenAuthenticator>>,
394    /// Optional Unit of Work config for in-flight tracking and completion hooks.
395    pub(crate) unit_of_work: Option<UnitOfWorkConfig>,
396    /// User override for the consumer's concurrency model. `None` means
397    /// "use whatever the consumer declares".
398    pub(crate) concurrency: Option<ConcurrencyModel>,
399    /// Unique identifier for this route. Required.
400    pub(crate) route_id: String,
401    /// Whether this route should start automatically when the context starts.
402    pub(crate) auto_startup: bool,
403    /// Order in which routes are started. Lower values start first.
404    pub(crate) startup_order: i32,
405    pub(crate) source_hash: Option<u64>,
406}
407
408impl RouteDefinition {
409    /// Create a new route definition with the required route ID.
410    pub fn new(from_uri: impl Into<String>, steps: Vec<BuilderStep>) -> Self {
411        Self {
412            from_uri: from_uri.into(),
413            steps,
414            error_handler: None,
415            circuit_breaker: None,
416            security_policy: None,
417            security_authenticator: None,
418            unit_of_work: None,
419            concurrency: None,
420            route_id: String::new(), // Will be set by with_route_id()
421            auto_startup: true,
422            startup_order: 1000,
423            source_hash: None,
424        }
425    }
426
427    /// The source endpoint URI.
428    pub fn from_uri(&self) -> &str {
429        &self.from_uri
430    }
431
432    /// The steps in this route definition.
433    pub fn steps(&self) -> &[BuilderStep] {
434        &self.steps
435    }
436
437    /// Set a per-route error handler, overriding the global one.
438    pub fn with_error_handler(mut self, config: ErrorHandlerConfig) -> Self {
439        self.error_handler = Some(config);
440        self
441    }
442
443    /// Get the route-level error handler config, if set.
444    pub fn error_handler_config(&self) -> Option<&ErrorHandlerConfig> {
445        self.error_handler.as_ref()
446    }
447
448    /// Set a circuit breaker for this route.
449    pub fn with_circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
450        self.circuit_breaker = Some(config);
451        self
452    }
453
454    /// Set a security policy for this route.
455    pub fn with_security_policy(mut self, config: SecurityPolicyConfig) -> Self {
456        self.security_policy = Some(config);
457        self
458    }
459
460    /// Set a token authenticator for this route.
461    pub fn with_security_authenticator(
462        mut self,
463        authenticator: Arc<dyn TokenAuthenticator>,
464    ) -> Self {
465        self.security_authenticator = Some(authenticator);
466        self
467    }
468
469    /// Set a unit of work config for this route.
470    pub fn with_unit_of_work(mut self, config: UnitOfWorkConfig) -> Self {
471        self.unit_of_work = Some(config);
472        self
473    }
474
475    /// Get the unit of work config, if set.
476    pub fn unit_of_work_config(&self) -> Option<&UnitOfWorkConfig> {
477        self.unit_of_work.as_ref()
478    }
479
480    /// Get the circuit breaker config, if set.
481    pub fn circuit_breaker_config(&self) -> Option<&CircuitBreakerConfig> {
482        self.circuit_breaker.as_ref()
483    }
484
485    pub fn security_policy_config(&self) -> Option<&SecurityPolicyConfig> {
486        self.security_policy.as_ref()
487    }
488
489    pub fn security_authenticator(&self) -> Option<&Arc<dyn TokenAuthenticator>> {
490        self.security_authenticator.as_ref()
491    }
492
493    /// User-specified concurrency override, if any.
494    pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
495        self.concurrency.as_ref()
496    }
497
498    /// Override the consumer's concurrency model for this route.
499    pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
500        self.concurrency = Some(model);
501        self
502    }
503
504    /// Get the route ID.
505    pub fn route_id(&self) -> &str {
506        &self.route_id
507    }
508
509    /// Whether this route should start automatically when the context starts.
510    pub fn auto_startup(&self) -> bool {
511        self.auto_startup
512    }
513
514    /// Order in which routes are started. Lower values start first.
515    pub fn startup_order(&self) -> i32 {
516        self.startup_order
517    }
518
519    /// Set a unique identifier for this route.
520    pub fn with_route_id(mut self, id: impl Into<String>) -> Self {
521        self.route_id = id.into();
522        self
523    }
524
525    /// Set whether this route should start automatically.
526    pub fn with_auto_startup(mut self, auto: bool) -> Self {
527        self.auto_startup = auto;
528        self
529    }
530
531    /// Set the startup order. Lower values start first.
532    pub fn with_startup_order(mut self, order: i32) -> Self {
533        self.startup_order = order;
534        self
535    }
536
537    pub fn with_source_hash(mut self, hash: u64) -> Self {
538        self.source_hash = Some(hash);
539        self
540    }
541
542    pub fn source_hash(&self) -> Option<u64> {
543        self.source_hash
544    }
545
546    /// Extract the metadata fields needed for introspection.
547    /// This is used by RouteController to store route info without the non-Sync steps.
548    pub fn to_info(&self) -> RouteDefinitionInfo {
549        RouteDefinitionInfo {
550            route_id: self.route_id.clone(),
551            auto_startup: self.auto_startup,
552            startup_order: self.startup_order,
553            source_hash: self.source_hash,
554        }
555    }
556}
557
558/// Minimal route definition metadata for introspection.
559///
560/// This struct contains only the metadata fields from [`RouteDefinition`]
561/// that are needed for route lifecycle management, without the `steps` field
562/// (which contains non-Sync types and cannot be stored in a Sync struct).
563#[derive(Clone)]
564pub struct RouteDefinitionInfo {
565    route_id: String,
566    auto_startup: bool,
567    startup_order: i32,
568    pub(crate) source_hash: Option<u64>,
569}
570
571impl RouteDefinitionInfo {
572    /// Get the route ID.
573    pub fn route_id(&self) -> &str {
574        &self.route_id
575    }
576
577    /// Whether this route should start automatically when the context starts.
578    pub fn auto_startup(&self) -> bool {
579        self.auto_startup
580    }
581
582    /// Order in which routes are started. Lower values start first.
583    pub fn startup_order(&self) -> i32 {
584        self.startup_order
585    }
586
587    pub fn source_hash(&self) -> Option<u64> {
588        self.source_hash
589    }
590}
591
592#[cfg(test)]
593mod tests {
594    use super::*;
595
596    #[test]
597    fn test_builder_step_multicast_variant() {
598        use camel_api::MulticastConfig;
599
600        let step = BuilderStep::Multicast {
601            steps: vec![BuilderStep::To("direct:a".into())],
602            config: MulticastConfig::new(),
603        };
604
605        assert!(matches!(step, BuilderStep::Multicast { .. }));
606    }
607
608    #[test]
609    fn test_route_definition_defaults() {
610        let def = RouteDefinition::new("direct:test", vec![]).with_route_id("test-route");
611        assert_eq!(def.route_id(), "test-route");
612        assert!(def.auto_startup());
613        assert_eq!(def.startup_order(), 1000);
614    }
615
616    #[test]
617    fn test_route_definition_builders() {
618        let def = RouteDefinition::new("direct:test", vec![])
619            .with_route_id("my-route")
620            .with_auto_startup(false)
621            .with_startup_order(50);
622        assert_eq!(def.route_id(), "my-route");
623        assert!(!def.auto_startup());
624        assert_eq!(def.startup_order(), 50);
625    }
626
627    #[test]
628    fn test_route_definition_accessors_cover_core_fields() {
629        let def = RouteDefinition::new("direct:in", vec![BuilderStep::To("mock:out".into())])
630            .with_route_id("accessor-route");
631
632        assert_eq!(def.from_uri(), "direct:in");
633        assert_eq!(def.steps().len(), 1);
634        assert!(matches!(def.steps()[0], BuilderStep::To(_)));
635    }
636
637    #[test]
638    fn test_route_definition_error_handler_circuit_breaker_and_concurrency_accessors() {
639        use camel_api::circuit_breaker::CircuitBreakerConfig;
640        use camel_api::error_handler::ErrorHandlerConfig;
641        use camel_component_api::ConcurrencyModel;
642
643        let def = RouteDefinition::new("direct:test", vec![])
644            .with_route_id("eh-route")
645            .with_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlc"))
646            .with_circuit_breaker(CircuitBreakerConfig::new())
647            .with_concurrency(ConcurrencyModel::Concurrent { max: Some(4) });
648
649        let eh = def
650            .error_handler_config()
651            .expect("error handler should be set");
652        assert_eq!(eh.dlc_uri.as_deref(), Some("log:dlc"));
653        assert!(def.circuit_breaker_config().is_some());
654        assert!(matches!(
655            def.concurrency_override(),
656            Some(ConcurrencyModel::Concurrent { max: Some(4) })
657        ));
658    }
659
660    #[test]
661    fn test_builder_step_debug_covers_many_variants() {
662        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
663        use camel_api::{
664            DynamicRouterConfig, Exchange, IdentityProcessor, RoutingSlipConfig, Value,
665        };
666        use std::sync::Arc;
667
668        let expr = LanguageExpressionDef {
669            language: "simple".into(),
670            source: "${body}".into(),
671        };
672
673        let steps = vec![
674            BuilderStep::Processor(BoxProcessor::new(IdentityProcessor)),
675            BuilderStep::To("mock:out".into()),
676            BuilderStep::Stop,
677            BuilderStep::Log {
678                level: camel_processor::LogLevel::Info,
679                message: "hello".into(),
680            },
681            BuilderStep::DeclarativeSetHeader {
682                key: "k".into(),
683                value: ValueSourceDef::Literal(Value::String("v".into())),
684            },
685            BuilderStep::DeclarativeSetBody {
686                value: ValueSourceDef::Expression(expr.clone()),
687            },
688            BuilderStep::DeclarativeFilter {
689                predicate: expr.clone(),
690                steps: vec![BuilderStep::Stop],
691            },
692            BuilderStep::DeclarativeChoice {
693                whens: vec![DeclarativeWhenStep {
694                    predicate: expr.clone(),
695                    steps: vec![BuilderStep::Stop],
696                }],
697                otherwise: Some(vec![BuilderStep::Stop]),
698            },
699            BuilderStep::DeclarativeScript {
700                expression: expr.clone(),
701            },
702            BuilderStep::DeclarativeSplit {
703                expression: expr.clone(),
704                aggregation: AggregationStrategy::Original,
705                parallel: false,
706                parallel_limit: Some(2),
707                stop_on_exception: true,
708                steps: vec![BuilderStep::Stop],
709            },
710            BuilderStep::Split {
711                config: SplitterConfig::new(split_body_lines()),
712                steps: vec![BuilderStep::Stop],
713            },
714            BuilderStep::Aggregate {
715                config: camel_api::AggregatorConfig::correlate_by("id")
716                    .complete_when_size(1)
717                    .build()
718                    .unwrap(),
719            },
720            BuilderStep::Filter {
721                predicate: Arc::new(|_: &Exchange| true),
722                steps: vec![BuilderStep::Stop],
723            },
724            BuilderStep::WireTap {
725                uri: "mock:tap".into(),
726            },
727            BuilderStep::DeclarativeLog {
728                level: camel_processor::LogLevel::Info,
729                message: ValueSourceDef::Expression(expr.clone()),
730            },
731            BuilderStep::Bean {
732                name: "bean".into(),
733                method: "call".into(),
734            },
735            BuilderStep::Script {
736                language: "rhai".into(),
737                script: "body".into(),
738            },
739            BuilderStep::Throttle {
740                config: camel_api::ThrottlerConfig::new(10, std::time::Duration::from_millis(10)),
741                steps: vec![BuilderStep::Stop],
742            },
743            BuilderStep::LoadBalance {
744                config: camel_api::LoadBalancerConfig::round_robin(),
745                steps: vec![BuilderStep::To("mock:l1".into())],
746            },
747            BuilderStep::DynamicRouter {
748                config: DynamicRouterConfig::new(Arc::new(|_| Some("mock:dr".into()))),
749            },
750            BuilderStep::RoutingSlip {
751                config: RoutingSlipConfig::new(Arc::new(|_| Some("mock:rs".into()))),
752            },
753        ];
754
755        for step in steps {
756            let dbg = format!("{step:?}");
757            assert!(!dbg.is_empty());
758        }
759    }
760
761    #[test]
762    fn test_route_definition_to_info_preserves_metadata() {
763        let info = RouteDefinition::new("direct:test", vec![])
764            .with_route_id("meta-route")
765            .with_auto_startup(false)
766            .with_startup_order(7)
767            .to_info();
768
769        assert_eq!(info.route_id(), "meta-route");
770        assert!(!info.auto_startup());
771        assert_eq!(info.startup_order(), 7);
772    }
773
774    #[test]
775    fn test_choice_builder_step_debug() {
776        use camel_api::{Exchange, FilterPredicate};
777        use std::sync::Arc;
778
779        fn always_true(_: &Exchange) -> bool {
780            true
781        }
782
783        let step = BuilderStep::Choice {
784            whens: vec![WhenStep {
785                predicate: Arc::new(always_true) as FilterPredicate,
786                steps: vec![BuilderStep::To("mock:a".into())],
787            }],
788            otherwise: None,
789        };
790        let debug = format!("{step:?}");
791        assert!(debug.contains("Choice"));
792    }
793
794    #[test]
795    fn test_route_definition_unit_of_work() {
796        use camel_api::UnitOfWorkConfig;
797        let config = UnitOfWorkConfig {
798            on_complete: Some("log:complete".into()),
799            on_failure: Some("log:failed".into()),
800        };
801        let def = RouteDefinition::new("direct:test", vec![])
802            .with_route_id("uow-test")
803            .with_unit_of_work(config.clone());
804        assert_eq!(
805            def.unit_of_work_config().unwrap().on_complete.as_deref(),
806            Some("log:complete")
807        );
808        assert_eq!(
809            def.unit_of_work_config().unwrap().on_failure.as_deref(),
810            Some("log:failed")
811        );
812
813        let def_no_uow = RouteDefinition::new("direct:test", vec![]).with_route_id("no-uow");
814        assert!(def_no_uow.unit_of_work_config().is_none());
815    }
816
817    #[test]
818    fn test_route_definition_security_policy_accessor() {
819        use async_trait::async_trait;
820        use camel_api::CamelError;
821        use camel_api::Exchange;
822        use camel_api::security_policy::{
823            AuthorizationDecision, Principal, SecurityPolicy, SecurityPolicyConfig,
824        };
825
826        struct StubPolicy;
827        #[async_trait]
828        impl SecurityPolicy for StubPolicy {
829            async fn evaluate(
830                &self,
831                _exchange: &mut Exchange,
832            ) -> Result<AuthorizationDecision, CamelError> {
833                Ok(AuthorizationDecision::Granted {
834                    principal: Principal {
835                        subject: "test".into(),
836                        issuer: "test".into(),
837                        audience: vec![],
838                        scopes: vec![],
839                        roles: vec![],
840                        claims: serde_json::Value::Null,
841                    },
842                })
843            }
844        }
845
846        let def_no_sp = RouteDefinition::new("direct:test", vec![]).with_route_id("no-sp");
847        assert!(def_no_sp.security_policy_config().is_none());
848
849        let def = RouteDefinition::new("direct:test", vec![])
850            .with_route_id("sp-test")
851            .with_security_policy(SecurityPolicyConfig::new(StubPolicy));
852        assert!(def.security_policy_config().is_some());
853    }
854
855    #[test]
856    fn test_route_definition_security_authenticator_accessor() {
857        use camel_api::security_policy::Principal;
858
859        struct TestAuth;
860        #[async_trait::async_trait]
861        impl TokenAuthenticator for TestAuth {
862            async fn authenticate_bearer(
863                &self,
864                _token: &str,
865            ) -> Result<Principal, camel_api::CamelError> {
866                Ok(Principal {
867                    subject: "test".into(),
868                    issuer: "test".into(),
869                    audience: vec![],
870                    scopes: vec![],
871                    roles: vec![],
872                    claims: serde_json::Value::Null,
873                })
874            }
875        }
876
877        let def_no_auth = RouteDefinition::new("direct:test".to_string(), vec![]);
878        assert!(def_no_auth.security_authenticator().is_none());
879
880        let auth = Arc::new(TestAuth);
881        let def = RouteDefinition::new("direct:test".to_string(), vec![])
882            .with_security_authenticator(auth);
883        assert!(def.security_authenticator().is_some());
884    }
885}