Skip to main content

camel_core/lifecycle/application/
route_definition.rs

1// lifecycle/application/route_definition.rs
2// Route definition and builder-step types. Route (compiled artifact) lives in adapters.
3
4use std::sync::Arc;
5
6use camel_api::UnitOfWorkConfig;
7use camel_api::circuit_breaker::CircuitBreakerConfig;
8use camel_api::error_handler::ErrorHandlerConfig;
9use camel_api::loop_eip::LoopConfig;
10use camel_api::security_policy::SecurityPolicyConfig;
11use camel_api::{AggregatorConfig, BoxProcessor, FilterPredicate, MulticastConfig, SplitterConfig};
12use camel_auth::TokenAuthenticator;
13use camel_component_api::ConcurrencyModel;
14
15/// An unresolved when-clause: predicate + nested steps for the sub-pipeline.
16pub struct WhenStep {
17    pub predicate: FilterPredicate,
18    pub steps: Vec<BuilderStep>,
19}
20
21pub use camel_api::declarative::{LanguageExpressionDef, ValueSourceDef};
22
23/// Declarative `when` clause resolved later by the runtime.
24#[derive(Debug)]
25pub struct DeclarativeWhenStep {
26    pub predicate: LanguageExpressionDef,
27    pub steps: Vec<BuilderStep>,
28}
29
30/// Builder struct for a single `doCatch` clause in the declarative pipeline.
31#[derive(Debug)]
32pub struct DoTryCatchClauseBuilder {
33    pub exception: Option<Vec<String>>,
34    pub when: Option<LanguageExpressionDef>,
35    pub on_when: Option<LanguageExpressionDef>,
36    pub disposition: camel_api::error_handler::ExceptionDisposition,
37    pub steps: Vec<BuilderStep>,
38}
39
40/// Builder struct for the `doFinally` block in the declarative pipeline.
41#[derive(Debug)]
42pub struct DoTryFinallyBuilder {
43    pub on_when: Option<LanguageExpressionDef>,
44    pub steps: Vec<BuilderStep>,
45}
46
47/// A step in an unresolved route definition.
48pub enum BuilderStep {
49    /// A pre-built Tower processor service.
50    Processor(BoxProcessor),
51    /// A destination URI — resolved at start time by CamelContext.
52    To(String),
53    /// A stop step that halts processing immediately.
54    Stop,
55    /// A static log step.
56    Log {
57        level: camel_processor::LogLevel,
58        message: String,
59    },
60    /// Declarative set_header (literal or language-based value), resolved at route-add time.
61    DeclarativeSetHeader {
62        key: String,
63        value: ValueSourceDef,
64    },
65    DeclarativeSetProperty {
66        key: String,
67        value_source: ValueSourceDef,
68    },
69    /// Declarative set_body (literal or language-based value), resolved at route-add time.
70    DeclarativeSetBody {
71        value: ValueSourceDef,
72    },
73    /// Declarative filter using a language predicate, resolved at route-add time.
74    DeclarativeFilter {
75        predicate: LanguageExpressionDef,
76        steps: Vec<BuilderStep>,
77    },
78    /// Declarative choice/when/otherwise using language predicates, resolved at route-add time.
79    DeclarativeChoice {
80        whens: Vec<DeclarativeWhenStep>,
81        otherwise: Option<Vec<BuilderStep>>,
82    },
83    /// Declarative script step evaluated by language and written to body.
84    DeclarativeScript {
85        expression: LanguageExpressionDef,
86    },
87    DeclarativeFunction {
88        definition: camel_api::FunctionDefinition,
89    },
90    /// Declarative split using a language expression, resolved at route-add time.
91    DeclarativeSplit {
92        expression: LanguageExpressionDef,
93        aggregation: camel_api::splitter::AggregationStrategy,
94        parallel: bool,
95        parallel_limit: Option<usize>,
96        stop_on_exception: bool,
97        steps: Vec<BuilderStep>,
98    },
99    /// Declarative stream split using a streaming split expression, resolved at route-add time.
100    DeclarativeStreamSplit {
101        stream_config: camel_api::StreamSplitConfig,
102        aggregation: camel_api::splitter::AggregationStrategy,
103        stop_on_exception: bool,
104        steps: Vec<BuilderStep>,
105    },
106    DeclarativeDynamicRouter {
107        expression: LanguageExpressionDef,
108        uri_delimiter: String,
109        cache_size: i32,
110        ignore_invalid_endpoints: bool,
111        max_iterations: usize,
112    },
113    DeclarativeRoutingSlip {
114        expression: LanguageExpressionDef,
115        uri_delimiter: String,
116        cache_size: i32,
117        ignore_invalid_endpoints: bool,
118    },
119    /// A Splitter sub-pipeline: config + nested steps to execute per fragment.
120    Split {
121        config: SplitterConfig,
122        steps: Vec<BuilderStep>,
123    },
124    /// An Aggregator step: collects exchanges by correlation key, emits when complete.
125    Aggregate {
126        config: AggregatorConfig,
127    },
128    /// A Filter sub-pipeline: predicate + nested steps executed only when predicate is true.
129    Filter {
130        predicate: FilterPredicate,
131        steps: Vec<BuilderStep>,
132    },
133    /// A Choice step: evaluates when-clauses in order, routes to the first match.
134    /// If no when matches, the optional otherwise branch is used.
135    Choice {
136        whens: Vec<WhenStep>,
137        otherwise: Option<Vec<BuilderStep>>,
138    },
139    /// A WireTap step: sends a clone of the exchange to a tap endpoint (fire-and-forget).
140    WireTap {
141        uri: String,
142    },
143    /// A Multicast step: sends the same exchange to multiple destinations.
144    Multicast {
145        steps: Vec<BuilderStep>,
146        config: MulticastConfig,
147    },
148    /// Declarative log step with a language-evaluated message, resolved at route-add time.
149    DeclarativeLog {
150        level: camel_processor::LogLevel,
151        message: ValueSourceDef,
152    },
153    /// Bean invocation step — resolved at route-add time.
154    Bean {
155        name: String,
156        method: String,
157    },
158    /// Script step: executes a script that can mutate the exchange.
159    /// The script has access to `headers`, `properties`, and `body`.
160    Script {
161        language: String,
162        script: String,
163    },
164    /// Throttle step: rate limiting with configurable behavior when limit exceeded.
165    Throttle {
166        config: camel_api::ThrottlerConfig,
167        steps: Vec<BuilderStep>,
168    },
169    /// LoadBalance step: distributes exchanges across multiple endpoints using a strategy.
170    LoadBalance {
171        config: camel_api::LoadBalancerConfig,
172        steps: Vec<BuilderStep>,
173    },
174    /// DynamicRouter step: routes exchanges dynamically based on expression evaluation.
175    DynamicRouter {
176        config: camel_api::DynamicRouterConfig,
177    },
178    RoutingSlip {
179        config: camel_api::RoutingSlipConfig,
180    },
181    RecipientList {
182        config: camel_api::recipient_list::RecipientListConfig,
183    },
184    DeclarativeRecipientList {
185        expression: LanguageExpressionDef,
186        delimiter: String,
187        parallel: bool,
188        parallel_limit: Option<usize>,
189        stop_on_exception: bool,
190        aggregation: String,
191    },
192    Delay {
193        config: camel_api::DelayConfig,
194    },
195    /// Runtime loop with closure-based predicate (programmatic DSL).
196    Loop {
197        config: LoopConfig,
198        steps: Vec<BuilderStep>,
199    },
200    /// Declarative loop with optional language-based while predicate (YAML DSL).
201    DeclarativeLoop {
202        count: Option<usize>,
203        while_predicate: Option<LanguageExpressionDef>,
204        steps: Vec<BuilderStep>,
205    },
206    /// EIP-7 enrich: synchronous content enrichment via a resolved producer.
207    Enrich {
208        uri: String,
209        strategy: Option<String>,
210        timeout_ms: Option<u64>,
211    },
212    /// EIP-7 pollEnrich: blocking poll of a PollingConsumer with timeout.
213    PollEnrich {
214        uri: String,
215        strategy: Option<String>,
216        timeout_ms: Option<u64>,
217    },
218    /// Declarative doTry/doCatch/doFinally, resolved at route-add time.
219    DeclarativeDoTry {
220        try_steps: Vec<BuilderStep>,
221        catch: Vec<DoTryCatchClauseBuilder>,
222        finally: Option<DoTryFinallyBuilder>,
223    },
224}
225
226impl std::fmt::Debug for BuilderStep {
227    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228        match self {
229            BuilderStep::Processor(_) => write!(f, "BuilderStep::Processor(...)"),
230            BuilderStep::To(uri) => write!(f, "BuilderStep::To({uri:?})"),
231            BuilderStep::Stop => write!(f, "BuilderStep::Stop"),
232            BuilderStep::Log { level, message } => write!(
233                f,
234                "BuilderStep::Log {{ level: {level:?}, message: {message:?} }}"
235            ),
236            BuilderStep::DeclarativeSetHeader { key, .. } => {
237                write!(
238                    f,
239                    "BuilderStep::DeclarativeSetHeader {{ key: {key:?}, .. }}"
240                )
241            }
242            BuilderStep::DeclarativeSetBody { .. } => {
243                write!(f, "BuilderStep::DeclarativeSetBody {{ .. }}")
244            }
245            BuilderStep::DeclarativeSetProperty { key, .. } => {
246                write!(
247                    f,
248                    "BuilderStep::DeclarativeSetProperty {{ key: {key:?}, .. }}"
249                )
250            }
251            BuilderStep::DeclarativeFilter { steps, .. } => {
252                write!(
253                    f,
254                    "BuilderStep::DeclarativeFilter {{ steps: {steps:?}, .. }}"
255                )
256            }
257            BuilderStep::DeclarativeChoice { whens, otherwise } => {
258                write!(
259                    f,
260                    "BuilderStep::DeclarativeChoice {{ whens: {} clause(s), otherwise: {} }}",
261                    whens.len(),
262                    if otherwise.is_some() { "Some" } else { "None" }
263                )
264            }
265            BuilderStep::DeclarativeScript { expression } => write!(
266                f,
267                "BuilderStep::DeclarativeScript {{ language: {:?}, .. }}",
268                expression.language
269            ),
270            BuilderStep::DeclarativeFunction { definition } => write!(
271                f,
272                "BuilderStep::DeclarativeFunction {{ id: {:?}, runtime: {:?}, .. }}",
273                definition.id, definition.runtime
274            ),
275            BuilderStep::DeclarativeSplit { steps, .. } => {
276                write!(
277                    f,
278                    "BuilderStep::DeclarativeSplit {{ steps: {steps:?}, .. }}"
279                )
280            }
281            BuilderStep::DeclarativeStreamSplit {
282                steps,
283                stream_config,
284                ..
285            } => {
286                write!(
287                    f,
288                    "BuilderStep::DeclarativeStreamSplit {{ format: {:?}, steps: {} step(s) }}",
289                    stream_config.format,
290                    steps.len()
291                )
292            }
293            BuilderStep::DeclarativeDynamicRouter { expression, .. } => write!(
294                f,
295                "BuilderStep::DeclarativeDynamicRouter {{ language: {:?}, .. }}",
296                expression.language
297            ),
298            BuilderStep::DeclarativeRoutingSlip { expression, .. } => write!(
299                f,
300                "BuilderStep::DeclarativeRoutingSlip {{ language: {:?}, .. }}",
301                expression.language
302            ),
303            BuilderStep::Split { steps, .. } => {
304                write!(f, "BuilderStep::Split {{ steps: {steps:?}, .. }}")
305            }
306            BuilderStep::Aggregate { .. } => write!(f, "BuilderStep::Aggregate {{ .. }}"),
307            BuilderStep::Filter { steps, .. } => {
308                write!(f, "BuilderStep::Filter {{ steps: {steps:?}, .. }}")
309            }
310            BuilderStep::Choice { whens, otherwise } => {
311                write!(
312                    f,
313                    "BuilderStep::Choice {{ whens: {} clause(s), otherwise: {} }}",
314                    whens.len(),
315                    if otherwise.is_some() { "Some" } else { "None" }
316                )
317            }
318            BuilderStep::WireTap { uri } => write!(f, "BuilderStep::WireTap {{ uri: {uri:?} }}"),
319            BuilderStep::Multicast { steps, .. } => {
320                write!(f, "BuilderStep::Multicast {{ steps: {steps:?}, .. }}")
321            }
322            BuilderStep::DeclarativeLog { level, .. } => {
323                write!(f, "BuilderStep::DeclarativeLog {{ level: {level:?}, .. }}")
324            }
325            BuilderStep::Bean { name, method } => {
326                write!(
327                    f,
328                    "BuilderStep::Bean {{ name: {name:?}, method: {method:?} }}"
329                )
330            }
331            BuilderStep::Script { language, .. } => {
332                write!(f, "BuilderStep::Script {{ language: {language:?}, .. }}")
333            }
334            BuilderStep::Throttle { steps, .. } => {
335                write!(f, "BuilderStep::Throttle {{ steps: {steps:?}, .. }}")
336            }
337            BuilderStep::LoadBalance { steps, .. } => {
338                write!(f, "BuilderStep::LoadBalance {{ steps: {steps:?}, .. }}")
339            }
340            BuilderStep::DynamicRouter { .. } => {
341                write!(f, "BuilderStep::DynamicRouter {{ .. }}")
342            }
343            BuilderStep::RoutingSlip { .. } => {
344                write!(f, "BuilderStep::RoutingSlip {{ .. }}")
345            }
346            BuilderStep::RecipientList { .. } => {
347                write!(f, "BuilderStep::RecipientList {{ .. }}")
348            }
349            BuilderStep::DeclarativeRecipientList {
350                expression,
351                aggregation,
352                ..
353            } => write!(
354                f,
355                "BuilderStep::DeclarativeRecipientList {{ language: {:?}, aggregation: {:?}, .. }}",
356                expression.language, aggregation
357            ),
358            BuilderStep::Delay { config } => {
359                write!(f, "BuilderStep::Delay {{ config: {:?} }}", config)
360            }
361            BuilderStep::Loop { config, steps } => {
362                write!(
363                    f,
364                    "BuilderStep::Loop {{ config: {:?}, steps: {} }}",
365                    config.mode_name(),
366                    steps.len()
367                )
368            }
369            BuilderStep::DeclarativeLoop {
370                count,
371                while_predicate,
372                steps,
373            } => {
374                write!(
375                    f,
376                    "BuilderStep::DeclarativeLoop {{ count: {:?}, while: {}, steps: {} }}",
377                    count,
378                    while_predicate.is_some(),
379                    steps.len()
380                )
381            }
382            BuilderStep::Enrich {
383                uri,
384                strategy,
385                timeout_ms,
386            } => {
387                write!(
388                    f,
389                    "BuilderStep::Enrich {{ uri: {uri:?}, strategy: {strategy:?}, timeout_ms: {timeout_ms:?} }}"
390                )
391            }
392            BuilderStep::PollEnrich {
393                uri,
394                strategy,
395                timeout_ms,
396            } => {
397                write!(
398                    f,
399                    "BuilderStep::PollEnrich {{ uri: {uri:?}, strategy: {strategy:?}, timeout_ms: {timeout_ms:?} }}"
400                )
401            }
402            BuilderStep::DeclarativeDoTry {
403                try_steps,
404                catch,
405                finally,
406            } => {
407                write!(
408                    f,
409                    "BuilderStep::DeclarativeDoTry {{ try_steps: {} step(s), catch: {} clause(s), finally: {} }}",
410                    try_steps.len(),
411                    catch.len(),
412                    if finally.is_some() { "Some" } else { "None" }
413                )
414            }
415        }
416    }
417}
418
419/// An unresolved route definition. "to" URIs have not been resolved to producers yet.
420pub struct RouteDefinition {
421    pub(crate) from_uri: String,
422    pub(crate) steps: Vec<BuilderStep>,
423    /// Optional per-route error handler config. Takes precedence over the global one.
424    pub(crate) error_handler: Option<ErrorHandlerConfig>,
425    /// Optional circuit breaker config. Applied between error handler and step pipeline.
426    pub(crate) circuit_breaker: Option<CircuitBreakerConfig>,
427    pub(crate) security_policy: Option<SecurityPolicyConfig>,
428    /// Optional token authenticator for validating JWT/OAuth tokens.
429    pub(crate) security_authenticator: Option<Arc<dyn TokenAuthenticator>>,
430    /// Optional Unit of Work config for in-flight tracking and completion hooks.
431    pub(crate) unit_of_work: Option<UnitOfWorkConfig>,
432    /// User override for the consumer's concurrency model. `None` means
433    /// "use whatever the consumer declares".
434    pub(crate) concurrency: Option<ConcurrencyModel>,
435    /// Unique identifier for this route. Required.
436    pub(crate) route_id: String,
437    /// Whether this route should start automatically when the context starts.
438    pub(crate) auto_startup: bool,
439    /// Order in which routes are started. Lower values start first.
440    pub(crate) startup_order: i32,
441    pub(crate) source_hash: Option<u64>,
442}
443
444impl RouteDefinition {
445    /// Create a new route definition with the required route ID.
446    pub fn new(from_uri: impl Into<String>, steps: Vec<BuilderStep>) -> Self {
447        Self {
448            from_uri: from_uri.into(),
449            steps,
450            error_handler: None,
451            circuit_breaker: None,
452            security_policy: None,
453            security_authenticator: None,
454            unit_of_work: None,
455            concurrency: None,
456            route_id: String::new(), // Will be set by with_route_id()
457            auto_startup: true,
458            startup_order: 1000,
459            source_hash: None,
460        }
461    }
462
463    /// The source endpoint URI.
464    pub fn from_uri(&self) -> &str {
465        &self.from_uri
466    }
467
468    /// The steps in this route definition.
469    pub fn steps(&self) -> &[BuilderStep] {
470        &self.steps
471    }
472
473    /// Set a per-route error handler, overriding the global one.
474    pub fn with_error_handler(mut self, config: ErrorHandlerConfig) -> Self {
475        self.error_handler = Some(config);
476        self
477    }
478
479    /// Get the route-level error handler config, if set.
480    pub fn error_handler_config(&self) -> Option<&ErrorHandlerConfig> {
481        self.error_handler.as_ref()
482    }
483
484    /// Set a circuit breaker for this route.
485    pub fn with_circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
486        self.circuit_breaker = Some(config);
487        self
488    }
489
490    /// Set a security policy for this route.
491    pub fn with_security_policy(mut self, config: SecurityPolicyConfig) -> Self {
492        self.security_policy = Some(config);
493        self
494    }
495
496    /// Set a token authenticator for this route.
497    pub fn with_security_authenticator(
498        mut self,
499        authenticator: Arc<dyn TokenAuthenticator>,
500    ) -> Self {
501        self.security_authenticator = Some(authenticator);
502        self
503    }
504
505    /// Set a unit of work config for this route.
506    pub fn with_unit_of_work(mut self, config: UnitOfWorkConfig) -> Self {
507        self.unit_of_work = Some(config);
508        self
509    }
510
511    /// Get the unit of work config, if set.
512    pub fn unit_of_work_config(&self) -> Option<&UnitOfWorkConfig> {
513        self.unit_of_work.as_ref()
514    }
515
516    /// Get the circuit breaker config, if set.
517    pub fn circuit_breaker_config(&self) -> Option<&CircuitBreakerConfig> {
518        self.circuit_breaker.as_ref()
519    }
520
521    pub fn security_policy_config(&self) -> Option<&SecurityPolicyConfig> {
522        self.security_policy.as_ref()
523    }
524
525    pub fn security_authenticator(&self) -> Option<&Arc<dyn TokenAuthenticator>> {
526        self.security_authenticator.as_ref()
527    }
528
529    /// User-specified concurrency override, if any.
530    pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
531        self.concurrency.as_ref()
532    }
533
534    /// Override the consumer's concurrency model for this route.
535    pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
536        self.concurrency = Some(model);
537        self
538    }
539
540    /// Get the route ID.
541    pub fn route_id(&self) -> &str {
542        &self.route_id
543    }
544
545    /// Whether this route should start automatically when the context starts.
546    pub fn auto_startup(&self) -> bool {
547        self.auto_startup
548    }
549
550    /// Order in which routes are started. Lower values start first.
551    pub fn startup_order(&self) -> i32 {
552        self.startup_order
553    }
554
555    /// Set a unique identifier for this route.
556    pub fn with_route_id(mut self, id: impl Into<String>) -> Self {
557        self.route_id = id.into();
558        self
559    }
560
561    /// Set whether this route should start automatically.
562    pub fn with_auto_startup(mut self, auto: bool) -> Self {
563        self.auto_startup = auto;
564        self
565    }
566
567    /// Set the startup order. Lower values start first.
568    pub fn with_startup_order(mut self, order: i32) -> Self {
569        self.startup_order = order;
570        self
571    }
572
573    pub fn with_source_hash(mut self, hash: u64) -> Self {
574        self.source_hash = Some(hash);
575        self
576    }
577
578    pub fn source_hash(&self) -> Option<u64> {
579        self.source_hash
580    }
581
582    /// Extract the metadata fields needed for introspection.
583    /// This is used by RouteController to store route info without the non-Sync steps.
584    pub fn to_info(&self) -> RouteDefinitionInfo {
585        RouteDefinitionInfo {
586            route_id: self.route_id.clone(),
587            auto_startup: self.auto_startup,
588            startup_order: self.startup_order,
589            source_hash: self.source_hash,
590        }
591    }
592}
593
594/// Minimal route definition metadata for introspection.
595///
596/// This struct contains only the metadata fields from [`RouteDefinition`]
597/// that are needed for route lifecycle management, without the `steps` field
598/// (which contains non-Sync types and cannot be stored in a Sync struct).
599#[derive(Clone)]
600pub struct RouteDefinitionInfo {
601    route_id: String,
602    auto_startup: bool,
603    startup_order: i32,
604    pub(crate) source_hash: Option<u64>,
605}
606
607impl RouteDefinitionInfo {
608    /// Get the route ID.
609    pub fn route_id(&self) -> &str {
610        &self.route_id
611    }
612
613    /// Whether this route should start automatically when the context starts.
614    pub fn auto_startup(&self) -> bool {
615        self.auto_startup
616    }
617
618    /// Order in which routes are started. Lower values start first.
619    pub fn startup_order(&self) -> i32 {
620        self.startup_order
621    }
622
623    pub fn source_hash(&self) -> Option<u64> {
624        self.source_hash
625    }
626}
627
628#[cfg(test)]
629mod tests {
630    use super::*;
631
632    #[test]
633    fn test_builder_step_multicast_variant() {
634        use camel_api::MulticastConfig;
635
636        let step = BuilderStep::Multicast {
637            steps: vec![BuilderStep::To("direct:a".into())],
638            config: MulticastConfig::new(),
639        };
640
641        assert!(matches!(step, BuilderStep::Multicast { .. }));
642    }
643
644    #[test]
645    fn test_route_definition_defaults() {
646        let def = RouteDefinition::new("direct:test", vec![]).with_route_id("test-route");
647        assert_eq!(def.route_id(), "test-route");
648        assert!(def.auto_startup());
649        assert_eq!(def.startup_order(), 1000);
650    }
651
652    #[test]
653    fn test_route_definition_builders() {
654        let def = RouteDefinition::new("direct:test", vec![])
655            .with_route_id("my-route")
656            .with_auto_startup(false)
657            .with_startup_order(50);
658        assert_eq!(def.route_id(), "my-route");
659        assert!(!def.auto_startup());
660        assert_eq!(def.startup_order(), 50);
661    }
662
663    #[test]
664    fn test_route_definition_accessors_cover_core_fields() {
665        let def = RouteDefinition::new("direct:in", vec![BuilderStep::To("mock:out".into())])
666            .with_route_id("accessor-route");
667
668        assert_eq!(def.from_uri(), "direct:in");
669        assert_eq!(def.steps().len(), 1);
670        assert!(matches!(def.steps()[0], BuilderStep::To(_)));
671    }
672
673    #[test]
674    fn test_route_definition_error_handler_circuit_breaker_and_concurrency_accessors() {
675        use camel_api::circuit_breaker::CircuitBreakerConfig;
676        use camel_api::error_handler::ErrorHandlerConfig;
677        use camel_component_api::ConcurrencyModel;
678
679        let def = RouteDefinition::new("direct:test", vec![])
680            .with_route_id("eh-route")
681            .with_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlc"))
682            .with_circuit_breaker(CircuitBreakerConfig::new())
683            .with_concurrency(ConcurrencyModel::Concurrent { max: Some(4) });
684
685        let eh = def
686            .error_handler_config()
687            .expect("error handler should be set");
688        assert_eq!(eh.dlc_uri.as_deref(), Some("log:dlc"));
689        assert!(def.circuit_breaker_config().is_some());
690        assert!(matches!(
691            def.concurrency_override(),
692            Some(ConcurrencyModel::Concurrent { max: Some(4) })
693        ));
694    }
695
696    #[test]
697    fn test_builder_step_debug_covers_many_variants() {
698        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
699        use camel_api::{
700            DynamicRouterConfig, Exchange, IdentityProcessor, RoutingSlipConfig, Value,
701        };
702        use std::sync::Arc;
703
704        let expr = LanguageExpressionDef {
705            language: "simple".into(),
706            source: "${body}".into(),
707        };
708
709        let steps = vec![
710            BuilderStep::Processor(BoxProcessor::new(IdentityProcessor)),
711            BuilderStep::To("mock:out".into()),
712            BuilderStep::Stop,
713            BuilderStep::Log {
714                level: camel_processor::LogLevel::Info,
715                message: "hello".into(),
716            },
717            BuilderStep::DeclarativeSetHeader {
718                key: "k".into(),
719                value: ValueSourceDef::Literal(Value::String("v".into())),
720            },
721            BuilderStep::DeclarativeSetBody {
722                value: ValueSourceDef::Expression(expr.clone()),
723            },
724            BuilderStep::DeclarativeFilter {
725                predicate: expr.clone(),
726                steps: vec![BuilderStep::Stop],
727            },
728            BuilderStep::DeclarativeChoice {
729                whens: vec![DeclarativeWhenStep {
730                    predicate: expr.clone(),
731                    steps: vec![BuilderStep::Stop],
732                }],
733                otherwise: Some(vec![BuilderStep::Stop]),
734            },
735            BuilderStep::DeclarativeScript {
736                expression: expr.clone(),
737            },
738            BuilderStep::DeclarativeSplit {
739                expression: expr.clone(),
740                aggregation: AggregationStrategy::Original,
741                parallel: false,
742                parallel_limit: Some(2),
743                stop_on_exception: true,
744                steps: vec![BuilderStep::Stop],
745            },
746            BuilderStep::Split {
747                config: SplitterConfig::new(split_body_lines()),
748                steps: vec![BuilderStep::Stop],
749            },
750            BuilderStep::Aggregate {
751                config: camel_api::AggregatorConfig::correlate_by("id")
752                    .complete_when_size(1)
753                    .build()
754                    .unwrap(),
755            },
756            BuilderStep::Filter {
757                predicate: Arc::new(|_: &Exchange| true),
758                steps: vec![BuilderStep::Stop],
759            },
760            BuilderStep::WireTap {
761                uri: "mock:tap".into(),
762            },
763            BuilderStep::DeclarativeLog {
764                level: camel_processor::LogLevel::Info,
765                message: ValueSourceDef::Expression(expr.clone()),
766            },
767            BuilderStep::Bean {
768                name: "bean".into(),
769                method: "call".into(),
770            },
771            BuilderStep::Script {
772                language: "rhai".into(),
773                script: "body".into(),
774            },
775            BuilderStep::Throttle {
776                config: camel_api::ThrottlerConfig::new(10, std::time::Duration::from_millis(10)),
777                steps: vec![BuilderStep::Stop],
778            },
779            BuilderStep::LoadBalance {
780                config: camel_api::LoadBalancerConfig::round_robin(),
781                steps: vec![BuilderStep::To("mock:l1".into())],
782            },
783            BuilderStep::DynamicRouter {
784                config: DynamicRouterConfig::new(Arc::new(|_| Some("mock:dr".into()))),
785            },
786            BuilderStep::RoutingSlip {
787                config: RoutingSlipConfig::new(Arc::new(|_| Some("mock:rs".into()))),
788            },
789        ];
790
791        for step in steps {
792            let dbg = format!("{step:?}");
793            assert!(!dbg.is_empty());
794        }
795    }
796
797    #[test]
798    fn test_route_definition_to_info_preserves_metadata() {
799        let info = RouteDefinition::new("direct:test", vec![])
800            .with_route_id("meta-route")
801            .with_auto_startup(false)
802            .with_startup_order(7)
803            .to_info();
804
805        assert_eq!(info.route_id(), "meta-route");
806        assert!(!info.auto_startup());
807        assert_eq!(info.startup_order(), 7);
808    }
809
810    #[test]
811    fn test_choice_builder_step_debug() {
812        use camel_api::{Exchange, FilterPredicate};
813        use std::sync::Arc;
814
815        fn always_true(_: &Exchange) -> bool {
816            true
817        }
818
819        let step = BuilderStep::Choice {
820            whens: vec![WhenStep {
821                predicate: Arc::new(always_true) as FilterPredicate,
822                steps: vec![BuilderStep::To("mock:a".into())],
823            }],
824            otherwise: None,
825        };
826        let debug = format!("{step:?}");
827        assert!(debug.contains("Choice"));
828    }
829
830    #[test]
831    fn test_route_definition_unit_of_work() {
832        use camel_api::UnitOfWorkConfig;
833        let config = UnitOfWorkConfig {
834            on_complete: Some("log:complete".into()),
835            on_failure: Some("log:failed".into()),
836        };
837        let def = RouteDefinition::new("direct:test", vec![])
838            .with_route_id("uow-test")
839            .with_unit_of_work(config.clone());
840        assert_eq!(
841            def.unit_of_work_config().unwrap().on_complete.as_deref(),
842            Some("log:complete")
843        );
844        assert_eq!(
845            def.unit_of_work_config().unwrap().on_failure.as_deref(),
846            Some("log:failed")
847        );
848
849        let def_no_uow = RouteDefinition::new("direct:test", vec![]).with_route_id("no-uow");
850        assert!(def_no_uow.unit_of_work_config().is_none());
851    }
852
853    #[test]
854    fn test_route_definition_security_policy_accessor() {
855        use async_trait::async_trait;
856        use camel_api::CamelError;
857        use camel_api::Exchange;
858        use camel_api::security_policy::{
859            AuthorizationDecision, Principal, SecurityPolicy, SecurityPolicyConfig,
860        };
861
862        struct StubPolicy;
863        #[async_trait]
864        impl SecurityPolicy for StubPolicy {
865            async fn evaluate(
866                &self,
867                _exchange: &mut Exchange,
868            ) -> Result<AuthorizationDecision, CamelError> {
869                Ok(AuthorizationDecision::Granted {
870                    principal: Principal {
871                        subject: "test".into(),
872                        issuer: "test".into(),
873                        audience: vec![],
874                        scopes: vec![],
875                        roles: vec![],
876                        claims: serde_json::Value::Null,
877                    },
878                })
879            }
880        }
881
882        let def_no_sp = RouteDefinition::new("direct:test", vec![]).with_route_id("no-sp");
883        assert!(def_no_sp.security_policy_config().is_none());
884
885        let def = RouteDefinition::new("direct:test", vec![])
886            .with_route_id("sp-test")
887            .with_security_policy(SecurityPolicyConfig::new(StubPolicy));
888        assert!(def.security_policy_config().is_some());
889    }
890
891    #[test]
892    fn test_route_definition_security_authenticator_accessor() {
893        use camel_api::security_policy::Principal;
894
895        struct TestAuth;
896        #[async_trait::async_trait]
897        impl TokenAuthenticator for TestAuth {
898            async fn authenticate_bearer(
899                &self,
900                _token: &str,
901            ) -> Result<Principal, camel_api::CamelError> {
902                Ok(Principal {
903                    subject: "test".into(),
904                    issuer: "test".into(),
905                    audience: vec![],
906                    scopes: vec![],
907                    roles: vec![],
908                    claims: serde_json::Value::Null,
909                })
910            }
911        }
912
913        let def_no_auth = RouteDefinition::new("direct:test".to_string(), vec![]);
914        assert!(def_no_auth.security_authenticator().is_none());
915
916        let auth = Arc::new(TestAuth);
917        let def = RouteDefinition::new("direct:test".to_string(), vec![])
918            .with_security_authenticator(auth);
919        assert!(def.security_authenticator().is_some());
920    }
921}