Skip to main content

camel_builder/
lib.rs

1//! Fluent builder API for constructing Camel routes programmatically with EIP patterns.
2//!
3//! Main types: `RouteBuilder`, `StepAccumulator`, `SplitBuilder`, `ChoiceBuilder`, `MulticastBuilder`,
4//! `ThrottleBuilder`, `LoopBuilder`, `LoadBalancerBuilder`, `OnExceptionBuilder`.
5
6use camel_api::DelayConfig;
7use camel_api::aggregator::{
8    AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
9};
10use camel_api::body::Body;
11use camel_api::body_converter::BodyType;
12use camel_api::circuit_breaker::CircuitBreakerConfig;
13use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
14use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
15use camel_api::load_balancer::LoadBalancerConfig;
16use camel_api::loop_eip::{LoopConfig, LoopMode};
17use camel_api::multicast::{MulticastConfig, MulticastStrategy};
18use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
19use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
20use camel_api::splitter::SplitterConfig;
21use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
22use camel_api::{
23    BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
24    LanguageExpressionDef, ProcessorFn, Value,
25    runtime::{
26        CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
27        CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
28        CanonicalWhenSpec,
29    },
30};
31use camel_component_api::ConcurrencyModel;
32use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
33use camel_processor::{
34    ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
35    StreamCacheService, UnmarshalService, builtin_data_format,
36};
37
38// ── Module declarations ─────────────────────────────────────────────────────
39pub mod do_try;
40pub use do_try::{DoCatchBuilder, DoFinallyBuilder, DoTryBuilder};
41
42/// Shared step-accumulation methods for all builder types.
43///
44/// Implementors provide `steps_mut()` and get step-adding methods for free.
45/// `filter()` and other branching methods are NOT included — they return
46/// different types per builder and stay as per-builder methods.
47pub trait StepAccumulator: Sized {
48    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
49
50    fn to(mut self, endpoint: impl Into<String>) -> Self {
51        self.steps_mut().push(BuilderStep::To(endpoint.into()));
52        self
53    }
54
55    fn process<F, Fut>(mut self, f: F) -> Self
56    where
57        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
58        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
59    {
60        let svc = ProcessorFn::new(f);
61        self.steps_mut()
62            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
63        self
64    }
65
66    fn process_fn(mut self, processor: BoxProcessor) -> Self {
67        self.steps_mut().push(BuilderStep::Processor(processor));
68        self
69    }
70
71    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
72        let svc = SetHeader::new(IdentityProcessor, key, value);
73        self.steps_mut()
74            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
75        self
76    }
77
78    fn map_body<F>(mut self, mapper: F) -> Self
79    where
80        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
81    {
82        let svc = MapBody::new(IdentityProcessor, mapper);
83        self.steps_mut()
84            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
85        self
86    }
87
88    fn set_body<B>(mut self, body: B) -> Self
89    where
90        B: Into<Body> + Clone + Send + Sync + 'static,
91    {
92        let body: Body = body.into();
93        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
94        self.steps_mut()
95            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
96        self
97    }
98
99    /// Apache Camel-compatible alias for [`set_body`](Self::set_body).
100    ///
101    /// Transforms the message body using the given value. Semantically identical
102    /// to `set_body` — provided for familiarity with Apache Camel route DSLs.
103    fn transform<B>(self, body: B) -> Self
104    where
105        B: Into<Body> + Clone + Send + Sync + 'static,
106    {
107        self.set_body(body)
108    }
109
110    fn set_body_fn<F>(mut self, expr: F) -> Self
111    where
112        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
113    {
114        let svc = SetBody::new(IdentityProcessor, expr);
115        self.steps_mut()
116            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
117        self
118    }
119
120    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
121    where
122        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
123    {
124        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
125        self.steps_mut()
126            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
127        self
128    }
129
130    fn aggregate(mut self, config: AggregatorConfig) -> Self {
131        self.steps_mut().push(BuilderStep::Aggregate { config });
132        self
133    }
134
135    /// Stop processing this exchange immediately. No further steps in the
136    /// current pipeline will run.
137    ///
138    /// Can be used at any point in the route: directly on RouteBuilder,
139    /// inside `.filter()`, inside `.split()`, etc.
140    fn stop(mut self) -> Self {
141        self.steps_mut().push(BuilderStep::Stop);
142        self
143    }
144
145    fn delay(mut self, duration: std::time::Duration) -> Self {
146        self.steps_mut().push(BuilderStep::Delay {
147            config: DelayConfig::from_duration(duration),
148        });
149        self
150    }
151
152    fn delay_with_header(
153        mut self,
154        duration: std::time::Duration,
155        header: impl Into<String>,
156    ) -> Self {
157        self.steps_mut().push(BuilderStep::Delay {
158            config: DelayConfig::from_duration_with_header(duration, header),
159        });
160        self
161    }
162
163    /// Log a message at the specified level.
164    ///
165    /// The message will be logged when an exchange passes through this step.
166    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
167        self.steps_mut().push(BuilderStep::Log {
168            level,
169            message: message.into(),
170        });
171        self
172    }
173
174    /// Convert the message body to the target type.
175    ///
176    /// Supported: Text ↔ Json ↔ Bytes. `Body::Stream` always fails.
177    /// Returns `TypeConversionFailed` if conversion is not possible.
178    ///
179    /// # Example
180    /// ```ignore
181    /// route.set_body(Value::String(r#"{"x":1}"#.into()))
182    ///      .convert_body_to(BodyType::Json)
183    ///      .to("direct:next")
184    /// ```
185    fn convert_body_to(mut self, target: BodyType) -> Self {
186        let svc = ConvertBodyTo::new(IdentityProcessor, target);
187        self.steps_mut()
188            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
189        self
190    }
191
192    fn stream_cache(mut self, threshold: usize) -> Self {
193        let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
194        let svc = StreamCacheService::new(IdentityProcessor, config);
195        self.steps_mut()
196            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
197        self
198    }
199
200    /// Materialize `Body::Stream` into `Body::Bytes` using the default threshold (128 KB).
201    ///
202    /// Equivalent to `.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)`.
203    fn stream_cache_default(self) -> Self {
204        self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
205    }
206
207    /// Marshal the message body using the specified data format.
208    ///
209    /// Supported formats: `"json"`, `"xml"`. Returns `Err(CamelError::Config)` if
210    /// the format name is unknown.
211    /// Converts a structured body (e.g., `Body::Json`) to a wire-format body (e.g., `Body::Text`).
212    ///
213    /// # Example
214    /// ```ignore
215    /// route.marshal("json")?.to("direct:next")
216    /// ```
217    fn marshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
218        let name = format.into();
219        let df = builtin_data_format(&name)
220            .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
221        let svc = MarshalService::new(IdentityProcessor, df);
222        self.steps_mut()
223            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
224        Ok(self)
225    }
226
227    /// Unmarshal the message body using the specified data format.
228    ///
229    /// Supported formats: `"json"`, `"xml"`. Returns `Err(CamelError::Config)` if
230    /// the format name is unknown.
231    /// Converts a wire-format body (e.g., `Body::Text`) to a structured body (e.g., `Body::Json`).
232    ///
233    /// # Example
234    /// ```ignore
235    /// route.unmarshal("json")?.to("direct:next")
236    /// ```
237    fn unmarshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
238        let name = format.into();
239        let df = builtin_data_format(&name)
240            .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
241        let svc = UnmarshalService::new(IdentityProcessor, df);
242        self.steps_mut()
243            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
244        Ok(self)
245    }
246
247    /// Validate the exchange using a predicate expression.
248    ///
249    /// If the expression evaluates to `true`, the exchange continues.
250    /// If `false`, a `CamelError::ValidationError` is returned into the route error handler.
251    ///
252    /// # Example
253    /// ```ignore
254    /// route.validate("${body.size()} > 0").to("direct:out")
255    /// ```
256    fn validate(mut self, expression: impl Into<String>) -> Self {
257        let source = expression.into();
258        let expression = LanguageExpressionDef {
259            language: "simple".into(),
260            source,
261        };
262        self.steps_mut().push(BuilderStep::Validate {
263            predicate: expression,
264        });
265        self
266    }
267
268    /// Execute a script that can modify the exchange (headers, properties, body).
269    ///
270    /// The script has access to `headers`, `properties`, and `body` variables
271    /// and can modify them with assignment syntax: `headers["k"] = v`.
272    ///
273    /// # Example
274    /// ```ignore
275    /// // ignore: requires full CamelContext setup with registered language
276    /// route.script("rhai", r#"headers["tenant"] = "acme"; body = body + "_processed""#)
277    /// ```
278    fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
279        self.steps_mut().push(BuilderStep::Script {
280            language: language.into(),
281            script: script.into(),
282        });
283        self
284    }
285
286    /// EIP-7 enrich: synchronous content enrichment via a resolved producer.
287    ///
288    /// Calls the given endpoint URI as a producer, then merges the response
289    /// back into the original exchange body using the default `UseEnrichedBody`
290    /// strategy (original headers/properties are preserved).
291    fn enrich(mut self, uri: impl Into<String>) -> Self {
292        self.steps_mut().push(BuilderStep::Enrich {
293            uri: uri.into(),
294            strategy: None,
295            timeout_ms: None,
296        });
297        self
298    }
299
300    /// EIP-7 pollEnrich: blocking poll of a PollingConsumer with timeout.
301    ///
302    /// Reads from a polling endpoint (e.g., file) and merges the result
303    /// into the exchange body using the default `UseEnrichedBody` strategy.
304    /// `timeout_ms` controls how long to wait for data.
305    fn poll_enrich(mut self, uri: impl Into<String>, timeout_ms: u64) -> Self {
306        self.steps_mut().push(BuilderStep::PollEnrich {
307            uri: uri.into(),
308            strategy: None,
309            timeout_ms: Some(timeout_ms),
310        });
311        self
312    }
313
314    fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
315        self.steps_mut().push(BuilderStep::Bean {
316            name: name.into(),
317            method: method.into(),
318        });
319        self
320    }
321}
322
323/// A fluent builder for constructing routes.
324///
325/// # Example
326///
327/// ```ignore
328/// let definition = RouteBuilder::from("timer:tick?period=1000")
329///     .set_header("source", Value::String("timer".into()))
330///     .filter(|ex| ex.input.body.as_text().is_some())
331///     .to("log:info?showHeaders=true")
332///     .build()?;
333/// ```
334// TODO(BUILDER-003): RouteBuilder does not support clone-and-reuse semantics.
335// Once built, the builder is consumed. Add `Clone` or a `fork()` method to allow
336// reusing a partially-built route as a template for multiple routes.
337pub struct RouteBuilder {
338    from_uri: String,
339    steps: Vec<BuilderStep>,
340    error_handler: Option<ErrorHandlerConfig>,
341    error_handler_mode: ErrorHandlerMode,
342    circuit_breaker_config: Option<CircuitBreakerConfig>,
343    security_policy_config: Option<camel_api::security_policy::SecurityPolicyConfig>,
344    security_authenticator: Option<std::sync::Arc<dyn camel_auth::TokenAuthenticator>>,
345    concurrency: Option<ConcurrencyModel>,
346    route_id: Option<String>,
347    auto_startup: Option<bool>,
348    startup_order: Option<i32>,
349}
350
351#[derive(Default)]
352enum ErrorHandlerMode {
353    #[default]
354    None,
355    ExplicitConfig,
356    Shorthand {
357        dlc_uri: Option<String>,
358        specs: Vec<OnExceptionSpec>,
359    },
360    Mixed,
361}
362
363#[derive(Clone)]
364struct OnExceptionSpec {
365    matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
366    retry: Option<RedeliveryPolicy>,
367    handled_by: Option<String>,
368}
369
370impl RouteBuilder {
371    /// Start building a route from the given source endpoint URI.
372    pub fn from(endpoint: &str) -> Self {
373        Self {
374            from_uri: endpoint.to_string(),
375            steps: Vec::new(),
376            error_handler: None,
377            error_handler_mode: ErrorHandlerMode::None,
378            circuit_breaker_config: None,
379            security_policy_config: None,
380            security_authenticator: None,
381            concurrency: None,
382            route_id: None,
383            auto_startup: None,
384            startup_order: None,
385        }
386    }
387
388    /// Open a filter scope. Only exchanges matching `predicate` will be processed
389    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
390    /// and continue to steps after `.end_filter()`.
391    pub fn filter<F>(self, predicate: F) -> FilterBuilder
392    where
393        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
394    {
395        FilterBuilder {
396            parent: self,
397            predicate: std::sync::Arc::new(predicate),
398            steps: vec![],
399        }
400    }
401
402    /// Open a choice scope for content-based routing.
403    ///
404    /// Within the choice, you can define multiple `.when()` clauses and an
405    /// optional `.otherwise()` clause. The first matching `when` predicate
406    /// determines which sub-pipeline executes.
407    pub fn choice(self) -> ChoiceBuilder {
408        ChoiceBuilder {
409            parent: self,
410            whens: vec![],
411            _otherwise: None,
412        }
413    }
414
415    /// Add a WireTap step that sends a clone of the exchange to the given
416    /// endpoint URI (fire-and-forget). The original exchange continues
417    /// downstream unchanged.
418    pub fn wire_tap(mut self, endpoint: &str) -> Self {
419        self.steps.push(BuilderStep::WireTap {
420            uri: endpoint.to_string(),
421        });
422        self
423    }
424
425    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
426    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
427        self.error_handler_mode = match self.error_handler_mode {
428            ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
429                ErrorHandlerMode::ExplicitConfig
430            }
431            ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
432        };
433        self.error_handler = Some(config);
434        self
435    }
436
437    /// Set a dead letter channel URI for shorthand error handler mode.
438    pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
439        let uri = uri.into();
440        self.error_handler_mode = match self.error_handler_mode {
441            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
442                dlc_uri: Some(uri),
443                specs: Vec::new(),
444            },
445            ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
446                dlc_uri: Some(uri),
447                specs,
448            },
449            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
450        };
451        self
452    }
453
454    /// Add a shorthand exception policy scope. Call `.end_on_exception()` to return to route builder.
455    pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
456    where
457        F: Fn(&CamelError) -> bool + Send + Sync + 'static,
458    {
459        self.error_handler_mode = match self.error_handler_mode {
460            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
461                dlc_uri: None,
462                specs: Vec::new(),
463            },
464            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
465            shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
466        };
467
468        OnExceptionBuilder {
469            parent: self,
470            policy: OnExceptionSpec {
471                matches: std::sync::Arc::new(matches),
472                retry: None,
473                handled_by: None,
474            },
475        }
476    }
477
478    /// Set a circuit breaker for this route.
479    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
480        self.circuit_breaker_config = Some(config);
481        self
482    }
483
484    pub fn security_policy(
485        mut self,
486        config: camel_api::security_policy::SecurityPolicyConfig,
487    ) -> Self {
488        self.security_policy_config = Some(config);
489        self
490    }
491
492    pub fn security_authenticator(
493        mut self,
494        auth: std::sync::Arc<dyn camel_auth::TokenAuthenticator>,
495    ) -> Self {
496        self.security_authenticator = Some(auth);
497        self
498    }
499
500    /// Override the consumer's default concurrency model.
501    ///
502    /// When set, the pipeline spawns a task per exchange, processing them
503    /// concurrently. `max` limits the number of simultaneously active
504    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
505    ///
506    /// # Example
507    /// ```ignore
508    /// RouteBuilder::from("http://0.0.0.0:8080/api")
509    ///     .concurrent(16)  // max 16 in-flight pipeline executions
510    ///     .process(handle_request)
511    ///     .build()
512    /// ```
513    pub fn concurrent(mut self, max: usize) -> Self {
514        let max = if max == 0 { None } else { Some(max) };
515        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
516        self
517    }
518
519    /// Force sequential processing, overriding a concurrent-capable consumer.
520    ///
521    /// Useful for HTTP routes that mutate shared state and need ordering
522    /// guarantees.
523    pub fn sequential(mut self) -> Self {
524        self.concurrency = Some(ConcurrencyModel::Sequential);
525        self
526    }
527
528    /// Set the route ID for this route.
529    ///
530    /// If not set, the route will be assigned an auto-generated ID.
531    pub fn route_id(mut self, id: impl Into<String>) -> Self {
532        self.route_id = Some(id.into());
533        self
534    }
535
536    /// Set whether this route should automatically start when the context starts.
537    ///
538    /// Default is `true`.
539    pub fn auto_startup(mut self, auto: bool) -> Self {
540        self.auto_startup = Some(auto);
541        self
542    }
543
544    /// Set the startup order for this route.
545    ///
546    /// Routes with lower values start first. Default is 1000.
547    pub fn startup_order(mut self, order: i32) -> Self {
548        self.startup_order = Some(order);
549        self
550    }
551
552    /// Begin a Splitter sub-pipeline. Steps added after this call (until
553    /// `.end_split()`) will be executed per-fragment.
554    ///
555    /// Returns a `SplitBuilder` — you cannot call `.build()` until
556    /// `.end_split()` closes the split scope (enforced by the type system).
557    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
558        SplitBuilder {
559            parent: self,
560            config,
561            steps: Vec::new(),
562        }
563    }
564
565    /// Begin a Multicast sub-pipeline. Steps added after this call (until
566    /// `.end_multicast()`) will each receive a copy of the exchange.
567    ///
568    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
569    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
570    pub fn multicast(self) -> MulticastBuilder {
571        MulticastBuilder {
572            parent: self,
573            steps: Vec::new(),
574            config: MulticastConfig::new(),
575        }
576    }
577
578    /// Begin a Throttle sub-pipeline. Rate limits message processing to at most
579    /// `max_requests` per `period`. Steps inside the throttle scope are only
580    /// executed when the rate limit allows.
581    ///
582    /// Returns a `ThrottleBuilder` — you cannot call `.build()` until
583    /// `.end_throttle()` closes the throttle scope (enforced by the type system).
584    pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
585        ThrottleBuilder {
586            parent: self,
587            config: ThrottlerConfig::new(max_requests, period),
588            steps: Vec::new(),
589        }
590    }
591
592    /// Begin a Loop sub-pipeline that iterates a fixed number of times.
593    pub fn loop_count(self, count: usize) -> LoopBuilder {
594        LoopBuilder {
595            parent: self,
596            config: LoopConfig {
597                mode: LoopMode::Count(count),
598            },
599            steps: vec![],
600        }
601    }
602
603    /// Begin a Loop sub-pipeline that iterates while a predicate is true.
604    pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
605    where
606        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
607    {
608        LoopBuilder {
609            parent: self,
610            config: LoopConfig {
611                mode: LoopMode::While(std::sync::Arc::new(predicate)),
612            },
613            steps: vec![],
614        }
615    }
616
617    /// Begin a LoadBalance sub-pipeline. Distributes exchanges across multiple
618    /// endpoints using a configurable strategy (round-robin, random, weighted, failover).
619    ///
620    /// Returns a `LoadBalancerBuilder` — you cannot call `.build()` until
621    /// `.end_load_balance()` closes the load balance scope (enforced by the type system).
622    pub fn load_balance(self) -> LoadBalancerBuilder {
623        LoadBalancerBuilder {
624            parent: self,
625            config: LoadBalancerConfig::round_robin(),
626            steps: Vec::new(),
627        }
628    }
629
630    /// Add a dynamic router step that routes exchanges dynamically based on
631    /// expression evaluation at runtime.
632    ///
633    /// The expression receives the exchange and returns `Some(uri)` to route to
634    /// the next endpoint, or `None` to stop routing.
635    ///
636    /// # Example
637    /// ```ignore
638    /// RouteBuilder::from("timer:tick")
639    ///     .route_id("test-route")
640    ///     .dynamic_router(|ex| {
641    ///         ex.input.header("dest").and_then(|v| v.as_str().map(|s| s.to_string()))
642    ///     })
643    ///     .build()
644    /// ```
645    pub fn dynamic_router(self, expression: RouterExpression) -> Self {
646        self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
647    }
648
649    /// Add a dynamic router step with full configuration.
650    ///
651    /// Allows customization of URI delimiter, cache size, timeout, and other options.
652    pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
653        self.steps.push(BuilderStep::DynamicRouter { config });
654        self
655    }
656
657    pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
658        self.routing_slip_with_config(RoutingSlipConfig::new(expression))
659    }
660
661    pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
662        self.steps.push(BuilderStep::RoutingSlip { config });
663        self
664    }
665
666    pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
667        self.recipient_list_with_config(RecipientListConfig::new(expression))
668    }
669
670    pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
671        self.steps.push(BuilderStep::RecipientList { config });
672        self
673    }
674
675    /// Consume the builder and produce a [`RouteDefinition`].
676    // TODO(BUILDER-006): Validate duplicate route IDs. When a route with the same
677    // ID is already registered in the context, return `Err(CamelError::RouteError)`.
678    // Currently, duplicate IDs are silently accepted; detection should happen at
679    // `CamelContext::add_route_definition` time.
680    pub fn build(self) -> Result<RouteDefinition, CamelError> {
681        validate_uri(&self.from_uri)?;
682        let route_id = self
683            .route_id
684            .filter(|s| !s.trim().is_empty())
685            .ok_or_else(|| {
686                CamelError::RouteError(
687                    "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
688                        .to_string(),
689                )
690            })?;
691        let resolved_error_handler = match self.error_handler_mode {
692            ErrorHandlerMode::None => self.error_handler,
693            ErrorHandlerMode::ExplicitConfig => self.error_handler,
694            ErrorHandlerMode::Mixed => {
695                return Err(CamelError::RouteError(
696                    "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
697                ));
698            }
699            ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
700                let mut config = if let Some(uri) = dlc_uri {
701                    ErrorHandlerConfig::dead_letter_channel(uri)
702                } else {
703                    ErrorHandlerConfig::log_only()
704                };
705
706                for spec in specs {
707                    let matcher = spec.matches.clone();
708                    let mut builder = config.on_exception(move |e| matcher(e));
709
710                    if let Some(retry) = spec.retry {
711                        builder = builder.retry(retry.max_attempts).with_backoff(
712                            retry.initial_delay,
713                            retry.multiplier,
714                            retry.max_delay,
715                        );
716                        if retry.jitter_factor > 0.0 {
717                            builder = builder.with_jitter(retry.jitter_factor);
718                        }
719                    }
720
721                    if let Some(uri) = spec.handled_by {
722                        builder = builder.handled_by(uri);
723                    }
724
725                    config = builder.build();
726                }
727
728                Some(config)
729            }
730        };
731
732        let definition = RouteDefinition::new(self.from_uri, self.steps);
733        let definition = if let Some(eh) = resolved_error_handler {
734            definition.with_error_handler(eh)
735        } else {
736            definition
737        };
738        let definition = if let Some(cb) = self.circuit_breaker_config {
739            definition.with_circuit_breaker(cb)
740        } else {
741            definition
742        };
743        let definition = if let Some(sp) = self.security_policy_config {
744            definition.with_security_policy(sp)
745        } else {
746            definition
747        };
748        let definition = if let Some(auth) = self.security_authenticator {
749            definition.with_security_authenticator(auth)
750        } else {
751            definition
752        };
753        let definition = if let Some(concurrency) = self.concurrency {
754            definition.with_concurrency(concurrency)
755        } else {
756            definition
757        };
758        let definition = definition.with_route_id(route_id);
759        let definition = if let Some(auto) = self.auto_startup {
760            definition.with_auto_startup(auto)
761        } else {
762            definition
763        };
764        let definition = if let Some(order) = self.startup_order {
765            definition.with_startup_order(order)
766        } else {
767            definition
768        };
769        Ok(definition)
770    }
771
772    /// Compile this builder route into canonical spec.
773    pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
774        validate_uri(&self.from_uri)?;
775        let route_id = self
776            .route_id
777            .filter(|s| !s.trim().is_empty())
778            .ok_or_else(|| {
779                CamelError::RouteError(
780                    "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
781                        .to_string(),
782                )
783            })?;
784
785        let steps = canonicalize_steps(self.steps)?;
786        let circuit_breaker = self
787            .circuit_breaker_config
788            .map(canonicalize_circuit_breaker);
789
790        if self.security_policy_config.is_some() {
791            return Err(CamelError::RouteError(
792                "routes with security_policy cannot use the canonical/hot-reload path (not yet supported)"
793                    .into(),
794            ));
795        }
796
797        let spec = CanonicalRouteSpec {
798            route_id,
799            from: self.from_uri,
800            steps,
801            circuit_breaker,
802            auto_startup: None,
803            startup_order: None,
804            concurrency: None,
805            version: camel_api::CANONICAL_CONTRACT_VERSION,
806        };
807        spec.validate_contract()?;
808        Ok(spec)
809    }
810}
811
812pub struct OnExceptionBuilder {
813    parent: RouteBuilder,
814    policy: OnExceptionSpec,
815}
816
817impl OnExceptionBuilder {
818    pub fn retry(mut self, max_attempts: u32) -> Self {
819        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
820        self
821    }
822
823    pub fn with_backoff(
824        mut self,
825        initial: std::time::Duration,
826        multiplier: f64,
827        max: std::time::Duration,
828    ) -> Self {
829        if let Some(ref mut retry) = self.policy.retry {
830            retry.initial_delay = initial;
831            retry.multiplier = multiplier;
832            retry.max_delay = max;
833        } else {
834            tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
835        }
836        self
837    }
838
839    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
840        if let Some(ref mut retry) = self.policy.retry {
841            retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
842        } else {
843            tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
844        }
845        self
846    }
847
848    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
849        self.policy.handled_by = Some(uri.into());
850        self
851    }
852
853    pub fn end_on_exception(mut self) -> RouteBuilder {
854        if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
855            specs.push(self.policy);
856        }
857        self.parent
858    }
859}
860
861/// Validate that a URI is non-empty and contains a scheme component.
862fn validate_uri(uri: &str) -> Result<(), CamelError> {
863    let trimmed = uri.trim();
864    if trimmed.is_empty() {
865        return Err(CamelError::RouteError(
866            "route must have a 'from' URI".to_string(),
867        ));
868    }
869    if !trimmed.contains(':') {
870        return Err(CamelError::RouteError(
871            "URI must have a scheme (e.g. 'timer:tick')".to_string(),
872        ));
873    }
874    let scheme = trimmed.split(':').next().unwrap_or("");
875    if scheme.trim().is_empty() {
876        return Err(CamelError::RouteError(
877            "URI scheme must not be empty".to_string(),
878        ));
879    }
880    Ok(())
881}
882
883fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
884    let mut canonical = Vec::with_capacity(steps.len());
885    for step in steps {
886        canonical.push(canonicalize_step(step)?);
887    }
888    Ok(canonical)
889}
890
891fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
892    match step {
893        BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
894        BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
895        BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
896        BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
897        BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
898            delay_ms: config.delay_ms,
899            dynamic_header: config.dynamic_header,
900        }),
901        BuilderStep::DeclarativeScript { expression } => {
902            Ok(CanonicalStepSpec::Script { expression })
903        }
904        BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
905            predicate,
906            steps: canonicalize_steps(steps)?,
907        }),
908        BuilderStep::DeclarativeChoice { whens, otherwise } => {
909            let mut canonical_whens = Vec::with_capacity(whens.len());
910            for DeclarativeWhenStep { predicate, steps } in whens {
911                canonical_whens.push(CanonicalWhenSpec {
912                    predicate,
913                    steps: canonicalize_steps(steps)?,
914                });
915            }
916            let otherwise = match otherwise {
917                Some(steps) => Some(canonicalize_steps(steps)?),
918                None => None,
919            };
920            Ok(CanonicalStepSpec::Choice {
921                whens: canonical_whens,
922                otherwise,
923            })
924        }
925        BuilderStep::DeclarativeSplit {
926            expression,
927            aggregation,
928            parallel,
929            parallel_limit,
930            stop_on_exception,
931            steps,
932        } => Ok(CanonicalStepSpec::Split {
933            expression: CanonicalSplitExpressionSpec::Language(expression),
934            aggregation: canonicalize_split_aggregation(aggregation)?,
935            parallel,
936            parallel_limit,
937            stop_on_exception,
938            steps: canonicalize_steps(steps)?,
939        }),
940        BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate(
941            canonicalize_aggregate(config)?,
942        )),
943        other => {
944            let step_name = canonical_step_name(&other);
945            let detail = camel_api::canonical_contract_rejection_reason(step_name)
946                .unwrap_or("not included in canonical v1");
947            Err(CamelError::RouteError(format!(
948                "canonical v1 does not support step `{step_name}`: {detail}"
949            )))
950        }
951    }
952}
953
954fn canonicalize_split_aggregation(
955    strategy: camel_api::splitter::AggregationStrategy,
956) -> Result<CanonicalSplitAggregationSpec, CamelError> {
957    match strategy {
958        camel_api::splitter::AggregationStrategy::LastWins => {
959            Ok(CanonicalSplitAggregationSpec::LastWins)
960        }
961        camel_api::splitter::AggregationStrategy::CollectAll => {
962            Ok(CanonicalSplitAggregationSpec::CollectAll)
963        }
964        camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
965            "canonical v1 does not support custom split aggregation".to_string(),
966        )),
967        camel_api::splitter::AggregationStrategy::Original => {
968            Ok(CanonicalSplitAggregationSpec::Original)
969        }
970    }
971}
972
973fn extract_completion_fields(
974    mode: &CompletionMode,
975) -> Result<(Option<usize>, Option<u64>), CamelError> {
976    match mode {
977        CompletionMode::Single(cond) => match cond {
978            CompletionCondition::Size(n) => Ok((Some(*n), None)),
979            CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
980            CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
981                "canonical v1 does not support aggregate predicate completion".to_string(),
982            )),
983        },
984        CompletionMode::Any(conds) => {
985            let mut size = None;
986            let mut timeout_ms = None;
987            for cond in conds {
988                match cond {
989                    CompletionCondition::Size(n) => size = Some(*n),
990                    CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
991                    CompletionCondition::Predicate(_) => {
992                        return Err(CamelError::RouteError(
993                            "canonical v1 does not support aggregate predicate completion"
994                                .to_string(),
995                        ));
996                    }
997                }
998            }
999            Ok((size, timeout_ms))
1000        }
1001    }
1002}
1003
1004fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
1005    let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
1006
1007    let header = match &config.correlation {
1008        CorrelationStrategy::HeaderName(h) => h.clone(),
1009        CorrelationStrategy::Expression { expr, .. } => expr.clone(),
1010        CorrelationStrategy::Fn(_) => {
1011            return Err(CamelError::RouteError(
1012                "canonical v1 does not support Fn correlation strategy".to_string(),
1013            ));
1014        }
1015    };
1016
1017    let correlation_key = match &config.correlation {
1018        CorrelationStrategy::HeaderName(_) => None,
1019        CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
1020        CorrelationStrategy::Fn(_) => unreachable!(),
1021    };
1022
1023    let strategy = match config.strategy {
1024        AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
1025        AggregationStrategy::Custom(_) => {
1026            return Err(CamelError::RouteError(
1027                "canonical v1 does not support custom aggregate strategy".to_string(),
1028            ));
1029        }
1030    };
1031    let bucket_ttl_ms = config
1032        .bucket_ttl
1033        .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
1034
1035    Ok(CanonicalAggregateSpec {
1036        header,
1037        completion_size,
1038        completion_timeout_ms,
1039        correlation_key,
1040        force_completion_on_stop: if config.force_completion_on_stop {
1041            Some(true)
1042        } else {
1043            None
1044        },
1045        discard_on_timeout: if config.discard_on_timeout {
1046            Some(true)
1047        } else {
1048            None
1049        },
1050        strategy,
1051        max_buckets: config.max_buckets,
1052        bucket_ttl_ms,
1053    })
1054}
1055
1056fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
1057    CanonicalCircuitBreakerSpec {
1058        failure_threshold: config.failure_threshold,
1059        open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
1060    }
1061}
1062
1063fn canonical_step_name(step: &BuilderStep) -> &'static str {
1064    match step {
1065        BuilderStep::Processor(_) => "processor",
1066        BuilderStep::To(_) => "to",
1067        BuilderStep::Stop => "stop",
1068        BuilderStep::Log { .. } => "log",
1069        BuilderStep::DeclarativeSetHeader { .. } => "set_header",
1070        BuilderStep::DeclarativeSetBody { .. } => "set_body",
1071        BuilderStep::DeclarativeFilter { .. } => "filter",
1072        BuilderStep::DeclarativeChoice { .. } => "choice",
1073        BuilderStep::DeclarativeScript { .. } => "script",
1074        BuilderStep::DeclarativeFunction { .. } => "function",
1075        BuilderStep::DeclarativeSplit { .. } => "split",
1076        BuilderStep::Split { .. } => "split",
1077        BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
1078        BuilderStep::Aggregate { .. } => "aggregate",
1079        BuilderStep::Filter { .. } => "filter",
1080        BuilderStep::Choice { .. } => "choice",
1081        BuilderStep::WireTap { .. } => "wire_tap",
1082        BuilderStep::Delay { .. } => "delay",
1083        BuilderStep::Multicast { .. } => "multicast",
1084        BuilderStep::DeclarativeLog { .. } => "log",
1085        BuilderStep::Bean { .. } => "bean",
1086        BuilderStep::Script { .. } => "script",
1087        BuilderStep::Throttle { .. } => "throttle",
1088        BuilderStep::LoadBalance { .. } => "load_balancer",
1089        BuilderStep::DynamicRouter { .. } => "dynamic_router",
1090        BuilderStep::RoutingSlip { .. } => "routing_slip",
1091        BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
1092        BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
1093        BuilderStep::RecipientList { .. } => "recipient_list",
1094        BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
1095        BuilderStep::DeclarativeSetProperty { .. } => "set_property",
1096        BuilderStep::DeclarativeStreamSplit { .. } => "stream_split",
1097        BuilderStep::Enrich { .. } => "enrich",
1098        BuilderStep::PollEnrich { .. } => "poll_enrich",
1099        BuilderStep::Validate { .. } => "validate",
1100        BuilderStep::IdempotentConsumer { .. } => "idempotent_consumer",
1101        BuilderStep::ClaimCheck { .. } => "claim_check",
1102        BuilderStep::Sampling { .. } => "sampling",
1103        BuilderStep::Sort { .. } => "sort",
1104        BuilderStep::DeclarativeDoTry { .. } => "do_try",
1105        BuilderStep::Resequence { .. } => "resequence",
1106    }
1107}
1108
1109impl StepAccumulator for RouteBuilder {
1110    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1111        &mut self.steps
1112    }
1113}
1114
1115/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
1116///
1117/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1118/// but NOT `.build()` and NOT `.split()` (no nested splits).
1119///
1120/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
1121/// and returns the parent `RouteBuilder`.
1122pub struct SplitBuilder {
1123    parent: RouteBuilder,
1124    config: SplitterConfig,
1125    steps: Vec<BuilderStep>,
1126}
1127
1128impl SplitBuilder {
1129    /// Open a filter scope within the split sub-pipeline.
1130    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1131    where
1132        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1133    {
1134        FilterInSplitBuilder {
1135            parent: self,
1136            predicate: std::sync::Arc::new(predicate),
1137            steps: vec![],
1138        }
1139    }
1140
1141    /// Close the split scope. Packages the accumulated sub-steps into a
1142    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
1143    pub fn end_split(mut self) -> RouteBuilder {
1144        let split_step = BuilderStep::Split {
1145            config: self.config,
1146            steps: self.steps,
1147        };
1148        self.parent.steps.push(split_step);
1149        self.parent
1150    }
1151}
1152
1153impl StepAccumulator for SplitBuilder {
1154    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1155        &mut self.steps
1156    }
1157}
1158
1159/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
1160pub struct FilterBuilder {
1161    parent: RouteBuilder,
1162    predicate: FilterPredicate,
1163    steps: Vec<BuilderStep>,
1164}
1165
1166impl FilterBuilder {
1167    /// Close the filter scope. Packages the accumulated sub-steps into a
1168    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
1169    pub fn end_filter(mut self) -> RouteBuilder {
1170        let step = BuilderStep::Filter {
1171            predicate: self.predicate,
1172            steps: self.steps,
1173        };
1174        self.parent.steps.push(step);
1175        self.parent
1176    }
1177}
1178
1179impl StepAccumulator for FilterBuilder {
1180    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1181        &mut self.steps
1182    }
1183}
1184
1185/// Builder for a filter scope nested inside a `.split()` block.
1186pub struct FilterInSplitBuilder {
1187    parent: SplitBuilder,
1188    predicate: FilterPredicate,
1189    steps: Vec<BuilderStep>,
1190}
1191
1192impl FilterInSplitBuilder {
1193    /// Close the filter scope and return the parent `SplitBuilder`.
1194    pub fn end_filter(mut self) -> SplitBuilder {
1195        let step = BuilderStep::Filter {
1196            predicate: self.predicate,
1197            steps: self.steps,
1198        };
1199        self.parent.steps.push(step);
1200        self.parent
1201    }
1202}
1203
1204impl StepAccumulator for FilterInSplitBuilder {
1205    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1206        &mut self.steps
1207    }
1208}
1209
1210// ── Choice/When/Otherwise builders ─────────────────────────────────────────
1211
1212/// Builder for a `.choice()` ... `.end_choice()` block.
1213///
1214/// Accumulates `when` clauses and an optional `otherwise` clause.
1215/// Cannot call `.build()` until `.end_choice()` is called.
1216pub struct ChoiceBuilder {
1217    parent: RouteBuilder,
1218    whens: Vec<WhenStep>,
1219    _otherwise: Option<Vec<BuilderStep>>,
1220}
1221
1222impl ChoiceBuilder {
1223    /// Open a `when` clause. Only exchanges matching `predicate` will be
1224    /// processed by the steps inside the `.when()` ... `.end_when()` scope.
1225    pub fn when<F>(self, predicate: F) -> WhenBuilder
1226    where
1227        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1228    {
1229        WhenBuilder {
1230            parent: self,
1231            predicate: std::sync::Arc::new(predicate),
1232            steps: vec![],
1233        }
1234    }
1235
1236    /// Open an `otherwise` clause. Executed when no `when` predicate matched.
1237    ///
1238    /// Only one `otherwise` is allowed per `choice`. Call this after all `.when()` clauses.
1239    pub fn otherwise(self) -> OtherwiseBuilder {
1240        OtherwiseBuilder {
1241            parent: self,
1242            steps: vec![],
1243        }
1244    }
1245
1246    /// Close the choice scope. Packages all accumulated `when` clauses and
1247    /// optional `otherwise` into a `BuilderStep::Choice` and returns the
1248    /// parent `RouteBuilder`.
1249    pub fn end_choice(mut self) -> RouteBuilder {
1250        let step = BuilderStep::Choice {
1251            whens: self.whens,
1252            otherwise: self._otherwise,
1253        };
1254        self.parent.steps.push(step);
1255        self.parent
1256    }
1257}
1258
1259/// Builder for the sub-pipeline within a `.when()` ... `.end_when()` block.
1260pub struct WhenBuilder {
1261    parent: ChoiceBuilder,
1262    predicate: camel_api::FilterPredicate,
1263    steps: Vec<BuilderStep>,
1264}
1265
1266impl WhenBuilder {
1267    /// Close the when scope. Packages the accumulated sub-steps into a
1268    /// `WhenStep` and returns the parent `ChoiceBuilder`.
1269    pub fn end_when(mut self) -> ChoiceBuilder {
1270        self.parent.whens.push(WhenStep {
1271            predicate: self.predicate,
1272            steps: self.steps,
1273        });
1274        self.parent
1275    }
1276}
1277
1278impl StepAccumulator for WhenBuilder {
1279    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1280        &mut self.steps
1281    }
1282}
1283
1284/// Builder for the sub-pipeline within an `.otherwise()` ... `.end_otherwise()` block.
1285pub struct OtherwiseBuilder {
1286    parent: ChoiceBuilder,
1287    steps: Vec<BuilderStep>,
1288}
1289
1290impl OtherwiseBuilder {
1291    /// Close the otherwise scope and return the parent `ChoiceBuilder`.
1292    pub fn end_otherwise(self) -> ChoiceBuilder {
1293        let OtherwiseBuilder { mut parent, steps } = self;
1294        parent._otherwise = Some(steps);
1295        parent
1296    }
1297}
1298
1299impl StepAccumulator for OtherwiseBuilder {
1300    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1301        &mut self.steps
1302    }
1303}
1304
1305/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
1306///
1307/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1308/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
1309///
1310/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
1311/// and returns the parent `RouteBuilder`.
1312pub struct MulticastBuilder {
1313    parent: RouteBuilder,
1314    steps: Vec<BuilderStep>,
1315    config: MulticastConfig,
1316}
1317
1318impl MulticastBuilder {
1319    pub fn parallel(mut self, parallel: bool) -> Self {
1320        self.config = self.config.parallel(parallel);
1321        self
1322    }
1323
1324    pub fn parallel_limit(mut self, limit: usize) -> Self {
1325        self.config = self.config.parallel_limit(limit);
1326        self
1327    }
1328
1329    pub fn stop_on_exception(mut self, stop: bool) -> Self {
1330        self.config = self.config.stop_on_exception(stop);
1331        self
1332    }
1333
1334    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1335        self.config = self.config.timeout(duration);
1336        self
1337    }
1338
1339    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1340        self.config = self.config.aggregation(strategy);
1341        self
1342    }
1343
1344    pub fn end_multicast(mut self) -> RouteBuilder {
1345        let step = BuilderStep::Multicast {
1346            steps: self.steps,
1347            config: self.config,
1348        };
1349        self.parent.steps.push(step);
1350        self.parent
1351    }
1352}
1353
1354impl StepAccumulator for MulticastBuilder {
1355    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1356        &mut self.steps
1357    }
1358}
1359
1360/// Builder for the sub-pipeline within a `.throttle()` ... `.end_throttle()` block.
1361///
1362/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1363/// but NOT `.build()` and NOT `.throttle()` (no nested throttles).
1364///
1365/// Calling `.end_throttle()` packages the sub-steps into a `BuilderStep::Throttle`
1366/// and returns the parent `RouteBuilder`.
1367pub struct ThrottleBuilder {
1368    parent: RouteBuilder,
1369    config: ThrottlerConfig,
1370    steps: Vec<BuilderStep>,
1371}
1372
1373impl ThrottleBuilder {
1374    /// Set the throttle strategy. Default is `Delay`.
1375    ///
1376    /// - `Delay`: Queue messages until capacity available
1377    /// - `Reject`: Return error immediately when throttled
1378    /// - `Drop`: Silently discard excess messages
1379    pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1380        self.config = self.config.strategy(strategy);
1381        self
1382    }
1383
1384    /// Close the throttle scope. Packages the accumulated sub-steps into a
1385    /// `BuilderStep::Throttle` and returns the parent `RouteBuilder`.
1386    pub fn end_throttle(mut self) -> RouteBuilder {
1387        let step = BuilderStep::Throttle {
1388            config: self.config,
1389            steps: self.steps,
1390        };
1391        self.parent.steps.push(step);
1392        self.parent
1393    }
1394}
1395
1396impl StepAccumulator for ThrottleBuilder {
1397    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1398        &mut self.steps
1399    }
1400}
1401
1402/// Builder for the sub-pipeline within a `.loop_count()` / `.loop_while()` ... `.end_loop()` block.
1403pub struct LoopBuilder {
1404    parent: RouteBuilder,
1405    config: LoopConfig,
1406    steps: Vec<BuilderStep>,
1407}
1408
1409impl LoopBuilder {
1410    pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1411        LoopInLoopBuilder {
1412            parent: self,
1413            config: LoopConfig {
1414                mode: LoopMode::Count(count),
1415            },
1416            steps: vec![],
1417        }
1418    }
1419
1420    pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1421    where
1422        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1423    {
1424        LoopInLoopBuilder {
1425            parent: self,
1426            config: LoopConfig {
1427                mode: LoopMode::While(std::sync::Arc::new(predicate)),
1428            },
1429            steps: vec![],
1430        }
1431    }
1432
1433    pub fn end_loop(mut self) -> RouteBuilder {
1434        let step = BuilderStep::Loop {
1435            config: self.config,
1436            steps: self.steps,
1437        };
1438        self.parent.steps.push(step);
1439        self.parent
1440    }
1441}
1442
1443impl StepAccumulator for LoopBuilder {
1444    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1445        &mut self.steps
1446    }
1447}
1448
1449pub struct LoopInLoopBuilder {
1450    parent: LoopBuilder,
1451    config: LoopConfig,
1452    steps: Vec<BuilderStep>,
1453}
1454
1455impl LoopInLoopBuilder {
1456    pub fn end_loop(mut self) -> LoopBuilder {
1457        let step = BuilderStep::Loop {
1458            config: self.config,
1459            steps: self.steps,
1460        };
1461        self.parent.steps.push(step);
1462        self.parent
1463    }
1464}
1465
1466impl StepAccumulator for LoopInLoopBuilder {
1467    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1468        &mut self.steps
1469    }
1470}
1471
1472/// Builder for the sub-pipeline within a `.load_balance()` ... `.end_load_balance()` block.
1473///
1474/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1475/// but NOT `.build()` and NOT `.load_balance()` (no nested load balancers).
1476///
1477/// Calling `.end_load_balance()` packages the sub-steps into a `BuilderStep::LoadBalance`
1478/// and returns the parent `RouteBuilder`.
1479pub struct LoadBalancerBuilder {
1480    parent: RouteBuilder,
1481    config: LoadBalancerConfig,
1482    steps: Vec<BuilderStep>,
1483}
1484
1485impl LoadBalancerBuilder {
1486    /// Set the load balance strategy to round-robin (default).
1487    pub fn round_robin(mut self) -> Self {
1488        self.config = LoadBalancerConfig::round_robin();
1489        self
1490    }
1491
1492    /// Set the load balance strategy to random selection.
1493    pub fn random(mut self) -> Self {
1494        self.config = LoadBalancerConfig::random();
1495        self
1496    }
1497
1498    /// Set the load balance strategy to weighted selection.
1499    ///
1500    /// Each endpoint is assigned a weight that determines its probability
1501    /// of being selected.
1502    pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1503        self.config = LoadBalancerConfig::weighted(weights);
1504        self
1505    }
1506
1507    /// Set the load balance strategy to failover.
1508    ///
1509    /// Exchanges are sent to the first endpoint; on failure, the next endpoint
1510    /// is tried.
1511    pub fn failover(mut self) -> Self {
1512        self.config = LoadBalancerConfig::failover();
1513        self
1514    }
1515
1516    /// Close the load balance scope. Packages the accumulated sub-steps into a
1517    /// `BuilderStep::LoadBalance` and returns the parent `RouteBuilder`.
1518    pub fn end_load_balance(mut self) -> RouteBuilder {
1519        let step = BuilderStep::LoadBalance {
1520            config: self.config,
1521            steps: self.steps,
1522        };
1523        self.parent.steps.push(step);
1524        self.parent
1525    }
1526}
1527
1528impl StepAccumulator for LoadBalancerBuilder {
1529    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1530        &mut self.steps
1531    }
1532}
1533
1534// ---------------------------------------------------------------------------
1535// Tests
1536// ---------------------------------------------------------------------------
1537
1538#[cfg(test)]
1539mod tests {
1540    use super::*;
1541    use camel_api::error_handler::ErrorHandlerConfig;
1542    use camel_api::load_balancer::LoadBalanceStrategy;
1543    use camel_api::{Exchange, Message};
1544    use camel_core::route::BuilderStep;
1545    use std::sync::Arc;
1546    use std::time::Duration;
1547    use tower::{Service, ServiceExt};
1548
1549    #[test]
1550    fn test_builder_from_creates_definition() {
1551        let definition = RouteBuilder::from("timer:tick")
1552            .route_id("test-route")
1553            .build()
1554            .unwrap();
1555        assert_eq!(definition.from_uri(), "timer:tick");
1556    }
1557
1558    #[test]
1559    fn test_builder_empty_from_uri_errors() {
1560        let result = RouteBuilder::from("").route_id("test-route").build();
1561        assert!(result.is_err());
1562    }
1563
1564    #[test]
1565    fn test_build_rejects_schemeless_uri() {
1566        let result = RouteBuilder::from("no-scheme-here")
1567            .route_id("test-route")
1568            .build();
1569        match result {
1570            Err(err) => {
1571                let err_msg = format!("{err}");
1572                assert!(
1573                    err_msg.contains("scheme"),
1574                    "expected scheme-related error, got: {err_msg}"
1575                );
1576            }
1577            Ok(_) => panic!("schemeless URI should fail"),
1578        }
1579    }
1580
1581    #[test]
1582    fn test_build_rejects_empty_scheme_uri() {
1583        let result = RouteBuilder::from(":missing-scheme")
1584            .route_id("test-route")
1585            .build();
1586        match result {
1587            Err(err) => {
1588                let err_msg = format!("{err}");
1589                assert!(
1590                    err_msg.contains("scheme"),
1591                    "expected scheme-related error, got: {err_msg}"
1592                );
1593            }
1594            Ok(_) => panic!("empty-scheme URI should fail"),
1595        }
1596    }
1597
1598    #[test]
1599    fn test_build_accepts_valid_uri() {
1600        let result = RouteBuilder::from("timer:tick")
1601            .route_id("test-route")
1602            .build();
1603        assert!(result.is_ok());
1604    }
1605
1606    #[test]
1607    fn test_build_canonical_rejects_schemeless_uri() {
1608        let result = RouteBuilder::from("no-scheme-here")
1609            .route_id("test-route")
1610            .build_canonical();
1611        assert!(result.is_err());
1612    }
1613
1614    #[test]
1615    fn test_builder_to_adds_step() {
1616        let definition = RouteBuilder::from("timer:tick")
1617            .route_id("test-route")
1618            .to("log:info")
1619            .build()
1620            .unwrap();
1621
1622        assert_eq!(definition.from_uri(), "timer:tick");
1623        // We can verify steps were added by checking the structure
1624        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1625    }
1626
1627    #[test]
1628    fn test_builder_filter_adds_filter_step() {
1629        let definition = RouteBuilder::from("timer:tick")
1630            .route_id("test-route")
1631            .filter(|_ex| true)
1632            .to("mock:result")
1633            .end_filter()
1634            .build()
1635            .unwrap();
1636
1637        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1638    }
1639
1640    #[test]
1641    fn test_builder_set_header_adds_processor_step() {
1642        let definition = RouteBuilder::from("timer:tick")
1643            .route_id("test-route")
1644            .set_header("key", Value::String("value".into()))
1645            .build()
1646            .unwrap();
1647
1648        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1649    }
1650
1651    #[test]
1652    fn test_builder_map_body_adds_processor_step() {
1653        let definition = RouteBuilder::from("timer:tick")
1654            .route_id("test-route")
1655            .map_body(|body| body)
1656            .build()
1657            .unwrap();
1658
1659        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1660    }
1661
1662    #[test]
1663    fn test_builder_process_adds_processor_step() {
1664        let definition = RouteBuilder::from("timer:tick")
1665            .route_id("test-route")
1666            .process(|ex| async move { Ok(ex) })
1667            .build()
1668            .unwrap();
1669
1670        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1671    }
1672
1673    #[test]
1674    fn test_builder_chain_multiple_steps() {
1675        let definition = RouteBuilder::from("timer:tick")
1676            .route_id("test-route")
1677            .set_header("source", Value::String("timer".into()))
1678            .filter(|ex| ex.input.header("source").is_some())
1679            .to("log:info")
1680            .end_filter()
1681            .to("mock:result")
1682            .build()
1683            .unwrap();
1684
1685        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
1686        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
1687        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
1688        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1689    }
1690
1691    #[test]
1692    fn test_loop_count_builder() {
1693        use camel_api::loop_eip::LoopMode;
1694
1695        let def = RouteBuilder::from("direct:start")
1696            .route_id("loop-test")
1697            .loop_count(3)
1698            .to("mock:inside")
1699            .end_loop()
1700            .to("mock:after")
1701            .build()
1702            .unwrap();
1703
1704        assert_eq!(def.steps().len(), 2);
1705        match &def.steps()[0] {
1706            BuilderStep::Loop { config, steps } => {
1707                assert!(matches!(config.mode, LoopMode::Count(3)));
1708                assert_eq!(steps.len(), 1);
1709            }
1710            other => panic!("Expected Loop, got {:?}", other),
1711        }
1712        assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1713    }
1714
1715    #[test]
1716    fn test_loop_while_builder() {
1717        use camel_api::loop_eip::LoopMode;
1718
1719        let def = RouteBuilder::from("direct:start")
1720            .route_id("loop-while-test")
1721            .loop_while(|_ex| true)
1722            .to("mock:retry")
1723            .end_loop()
1724            .build()
1725            .unwrap();
1726
1727        assert_eq!(def.steps().len(), 1);
1728        match &def.steps()[0] {
1729            BuilderStep::Loop { config, steps } => {
1730                assert!(matches!(config.mode, LoopMode::While(_)));
1731                assert_eq!(steps.len(), 1);
1732            }
1733            other => panic!("Expected Loop, got {:?}", other),
1734        }
1735    }
1736
1737    #[test]
1738    fn test_nested_loop_builder() {
1739        use camel_api::loop_eip::LoopMode;
1740
1741        let def = RouteBuilder::from("direct:start")
1742            .route_id("nested-loop-test")
1743            .loop_count(2)
1744            .to("mock:outer")
1745            .loop_count(3)
1746            .to("mock:inner")
1747            .end_loop()
1748            .end_loop()
1749            .to("mock:after")
1750            .build()
1751            .unwrap();
1752
1753        assert_eq!(def.steps().len(), 2);
1754        match &def.steps()[0] {
1755            BuilderStep::Loop { steps, .. } => {
1756                assert_eq!(steps.len(), 2);
1757                match &steps[1] {
1758                    BuilderStep::Loop {
1759                        config,
1760                        steps: inner_steps,
1761                    } => {
1762                        assert!(matches!(config.mode, LoopMode::Count(3)));
1763                        assert_eq!(inner_steps.len(), 1);
1764                    }
1765                    other => panic!("Expected nested Loop, got {:?}", other),
1766                }
1767            }
1768            other => panic!("Expected outer Loop, got {:?}", other),
1769        }
1770    }
1771
1772    // -----------------------------------------------------------------------
1773    // Processor behavior tests — exercise the real Tower services directly
1774    // -----------------------------------------------------------------------
1775
1776    #[tokio::test]
1777    async fn test_set_header_processor_works() {
1778        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1779        let exchange = Exchange::new(Message::new("test"));
1780        let result = svc.call(exchange).await.unwrap();
1781        assert_eq!(
1782            result.input.header("greeting"),
1783            Some(&Value::String("hello".into()))
1784        );
1785    }
1786
1787    #[tokio::test]
1788    async fn test_filter_processor_passes() {
1789        use camel_api::BoxProcessorExt;
1790        use camel_processor::FilterService;
1791
1792        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1793        let mut svc =
1794            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1795        let exchange = Exchange::new(Message::new("pass"));
1796        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1797        assert_eq!(result.input.body.as_text(), Some("pass"));
1798    }
1799
1800    #[tokio::test]
1801    async fn test_filter_processor_blocks() {
1802        use camel_api::BoxProcessorExt;
1803        use camel_processor::FilterService;
1804
1805        let sub = BoxProcessor::from_fn(|_ex| {
1806            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1807        });
1808        let mut svc =
1809            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1810        let exchange = Exchange::new(Message::new("reject"));
1811        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1812        assert_eq!(result.input.body.as_text(), Some("reject"));
1813    }
1814
1815    #[tokio::test]
1816    async fn test_map_body_processor_works() {
1817        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1818            if let Some(text) = body.as_text() {
1819                Body::Text(text.to_uppercase())
1820            } else {
1821                body
1822            }
1823        });
1824        let exchange = Exchange::new(Message::new("hello"));
1825        let result = mapper.oneshot(exchange).await.unwrap();
1826        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1827    }
1828
1829    #[tokio::test]
1830    async fn test_process_custom_processor_works() {
1831        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1832            ex.set_property("custom", Value::Bool(true));
1833            Ok(ex)
1834        });
1835        let exchange = Exchange::new(Message::default());
1836        let result = processor.oneshot(exchange).await.unwrap();
1837        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1838    }
1839
1840    // -----------------------------------------------------------------------
1841    // Sequential pipeline test
1842    // -----------------------------------------------------------------------
1843
1844    #[tokio::test]
1845    async fn test_compose_pipeline_runs_steps_in_order() {
1846        use camel_core::route::{CompiledStep, compose_pipeline};
1847
1848        let processors = vec![
1849            CompiledStep::Process {
1850                processor: BoxProcessor::new(SetHeader::new(
1851                    IdentityProcessor,
1852                    "step",
1853                    Value::String("one".into()),
1854                )),
1855                body_contract: None,
1856                lifecycle: None,
1857            },
1858            CompiledStep::Process {
1859                processor: BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1860                    if let Some(text) = body.as_text() {
1861                        Body::Text(format!("{}-processed", text))
1862                    } else {
1863                        body
1864                    }
1865                })),
1866                body_contract: None,
1867                lifecycle: None,
1868            },
1869        ];
1870
1871        let pipeline = compose_pipeline(processors);
1872        let exchange = Exchange::new(Message::new("hello"));
1873        let result = pipeline.oneshot(exchange).await.unwrap();
1874
1875        assert_eq!(
1876            result.input.header("step"),
1877            Some(&Value::String("one".into()))
1878        );
1879        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1880    }
1881
1882    #[tokio::test]
1883    async fn test_compose_pipeline_empty_is_identity() {
1884        use camel_core::route::compose_pipeline;
1885
1886        let pipeline = compose_pipeline(vec![]);
1887        let exchange = Exchange::new(Message::new("unchanged"));
1888        let result = pipeline.oneshot(exchange).await.unwrap();
1889        assert_eq!(result.input.body.as_text(), Some("unchanged"));
1890    }
1891
1892    // -----------------------------------------------------------------------
1893    // Circuit breaker builder tests
1894    // -----------------------------------------------------------------------
1895
1896    #[test]
1897    fn test_builder_circuit_breaker_sets_config() {
1898        use camel_api::circuit_breaker::CircuitBreakerConfig;
1899
1900        let config = CircuitBreakerConfig::new().failure_threshold(5);
1901        let definition = RouteBuilder::from("timer:tick")
1902            .route_id("test-route")
1903            .circuit_breaker(config)
1904            .build()
1905            .unwrap();
1906
1907        let cb = definition
1908            .circuit_breaker_config()
1909            .expect("circuit breaker should be set");
1910        assert_eq!(cb.failure_threshold, 5);
1911    }
1912
1913    #[test]
1914    fn test_builder_circuit_breaker_with_error_handler() {
1915        use camel_api::circuit_breaker::CircuitBreakerConfig;
1916        use camel_api::error_handler::ErrorHandlerConfig;
1917
1918        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1919        let eh_config = ErrorHandlerConfig::log_only();
1920
1921        let definition = RouteBuilder::from("timer:tick")
1922            .route_id("test-route")
1923            .to("log:info")
1924            .circuit_breaker(cb_config)
1925            .error_handler(eh_config)
1926            .build()
1927            .unwrap();
1928
1929        assert!(
1930            definition.circuit_breaker_config().is_some(),
1931            "circuit breaker config should be set"
1932        );
1933        // Route definition was built successfully with both configs.
1934    }
1935
1936    #[test]
1937    fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1938        let definition = RouteBuilder::from("direct:start")
1939            .route_id("test-route")
1940            .dead_letter_channel("log:dlc")
1941            .on_exception(|e| matches!(e, CamelError::Io(_)))
1942            .retry(3)
1943            .handled_by("log:io")
1944            .end_on_exception()
1945            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1946            .retry(1)
1947            .end_on_exception()
1948            .to("mock:out")
1949            .build()
1950            .expect("route should build");
1951
1952        let cfg = definition
1953            .error_handler_config()
1954            .expect("error handler should be set");
1955        assert_eq!(cfg.policies.len(), 2);
1956        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1957        assert_eq!(
1958            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1959            Some(3)
1960        );
1961        assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1962        assert_eq!(
1963            cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1964            Some(1)
1965        );
1966    }
1967
1968    #[test]
1969    fn test_builder_on_exception_mixed_mode_rejected() {
1970        let result = RouteBuilder::from("direct:start")
1971            .route_id("test-route")
1972            .error_handler(ErrorHandlerConfig::log_only())
1973            .on_exception(|_e| true)
1974            .end_on_exception()
1975            .to("mock:out")
1976            .build();
1977
1978        let err = result.err().expect("mixed mode should fail with an error");
1979
1980        assert!(
1981            format!("{err}").contains("mixed error handler modes"),
1982            "unexpected error: {err}"
1983        );
1984    }
1985
1986    #[test]
1987    fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1988        let definition = RouteBuilder::from("direct:start")
1989            .route_id("test-route")
1990            .on_exception(|_e| true)
1991            .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1992            .with_jitter(0.5)
1993            .end_on_exception()
1994            .to("mock:out")
1995            .build()
1996            .expect("route should build");
1997
1998        let cfg = definition
1999            .error_handler_config()
2000            .expect("error handler should be set");
2001        assert_eq!(cfg.policies.len(), 1);
2002        assert!(cfg.policies[0].retry.is_none());
2003    }
2004
2005    #[test]
2006    fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
2007        let definition = RouteBuilder::from("direct:start")
2008            .route_id("test-route")
2009            .dead_letter_channel("log:dlc")
2010            .to("mock:out")
2011            .build()
2012            .expect("route should build");
2013
2014        let cfg = definition
2015            .error_handler_config()
2016            .expect("error handler should be set");
2017        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
2018        assert!(cfg.policies.is_empty());
2019    }
2020
2021    #[test]
2022    fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
2023        let definition = RouteBuilder::from("direct:start")
2024            .route_id("test-route")
2025            .dead_letter_channel("log:first")
2026            .on_exception(|e| matches!(e, CamelError::Io(_)))
2027            .retry(2)
2028            .end_on_exception()
2029            .dead_letter_channel("log:second")
2030            .to("mock:out")
2031            .build()
2032            .expect("route should build");
2033
2034        let cfg = definition
2035            .error_handler_config()
2036            .expect("error handler should be set");
2037        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2038        assert_eq!(cfg.policies.len(), 1);
2039        assert_eq!(
2040            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
2041            Some(2)
2042        );
2043    }
2044
2045    #[test]
2046    fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
2047        let definition = RouteBuilder::from("direct:start")
2048            .route_id("test-route")
2049            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
2050            .retry(1)
2051            .end_on_exception()
2052            .to("mock:out")
2053            .build()
2054            .expect("route should build");
2055
2056        let cfg = definition
2057            .error_handler_config()
2058            .expect("error handler should be set");
2059        assert!(cfg.dlc_uri.is_none());
2060        assert_eq!(cfg.policies.len(), 1);
2061    }
2062
2063    #[test]
2064    fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
2065        let first = ErrorHandlerConfig::dead_letter_channel("log:first");
2066        let second = ErrorHandlerConfig::dead_letter_channel("log:second");
2067
2068        let definition = RouteBuilder::from("direct:start")
2069            .route_id("test-route")
2070            .error_handler(first)
2071            .error_handler(second)
2072            .to("mock:out")
2073            .build()
2074            .expect("route should build");
2075
2076        let cfg = definition
2077            .error_handler_config()
2078            .expect("error handler should be set");
2079        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2080    }
2081
2082    // --- Splitter builder tests ---
2083
2084    #[test]
2085    fn test_split_builder_typestate() {
2086        use camel_api::splitter::{SplitterConfig, split_body_lines};
2087
2088        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
2089        let definition = RouteBuilder::from("timer:test?period=1000")
2090            .route_id("test-route")
2091            .split(SplitterConfig::new(split_body_lines()))
2092            .to("mock:per-fragment")
2093            .end_split()
2094            .to("mock:final")
2095            .build()
2096            .unwrap();
2097
2098        // Should have 2 top-level steps: Split + To("mock:final")
2099        assert_eq!(definition.steps().len(), 2);
2100    }
2101
2102    #[test]
2103    fn test_split_builder_steps_collected() {
2104        use camel_api::splitter::{SplitterConfig, split_body_lines};
2105
2106        let definition = RouteBuilder::from("timer:test?period=1000")
2107            .route_id("test-route")
2108            .split(SplitterConfig::new(split_body_lines()))
2109            .set_header("fragment", Value::String("yes".into()))
2110            .to("mock:per-fragment")
2111            .end_split()
2112            .build()
2113            .unwrap();
2114
2115        // Should have 1 top-level step: Split (containing 2 sub-steps)
2116        assert_eq!(definition.steps().len(), 1);
2117        match &definition.steps()[0] {
2118            BuilderStep::Split { steps, .. } => {
2119                assert_eq!(steps.len(), 2); // SetHeader + To
2120            }
2121            other => panic!("Expected Split, got {:?}", other),
2122        }
2123    }
2124
2125    #[test]
2126    fn test_split_builder_config_propagated() {
2127        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
2128
2129        let definition = RouteBuilder::from("timer:test?period=1000")
2130            .route_id("test-route")
2131            .split(
2132                SplitterConfig::new(split_body_lines())
2133                    .parallel(true)
2134                    .parallel_limit(4)
2135                    .aggregation(AggregationStrategy::CollectAll),
2136            )
2137            .to("mock:per-fragment")
2138            .end_split()
2139            .build()
2140            .unwrap();
2141
2142        match &definition.steps()[0] {
2143            BuilderStep::Split { config, .. } => {
2144                assert!(config.parallel);
2145                assert_eq!(config.parallel_limit, Some(4));
2146                assert!(matches!(
2147                    config.aggregation,
2148                    AggregationStrategy::CollectAll
2149                ));
2150            }
2151            other => panic!("Expected Split, got {:?}", other),
2152        }
2153    }
2154
2155    #[test]
2156    fn test_aggregate_builder_adds_step() {
2157        use camel_api::aggregator::AggregatorConfig;
2158        use camel_core::route::BuilderStep;
2159
2160        let definition = RouteBuilder::from("timer:tick")
2161            .route_id("test-route")
2162            .aggregate(
2163                AggregatorConfig::correlate_by("key")
2164                    .complete_when_size(2)
2165                    .build()
2166                    .unwrap(),
2167            )
2168            .build()
2169            .unwrap();
2170
2171        assert_eq!(definition.steps().len(), 1);
2172        assert!(matches!(
2173            definition.steps()[0],
2174            BuilderStep::Aggregate { .. }
2175        ));
2176    }
2177
2178    #[test]
2179    fn test_aggregate_in_split_builder() {
2180        use camel_api::aggregator::AggregatorConfig;
2181        use camel_api::splitter::{SplitterConfig, split_body_lines};
2182        use camel_core::route::BuilderStep;
2183
2184        let definition = RouteBuilder::from("timer:tick")
2185            .route_id("test-route")
2186            .split(SplitterConfig::new(split_body_lines()))
2187            .aggregate(
2188                AggregatorConfig::correlate_by("key")
2189                    .complete_when_size(1)
2190                    .build()
2191                    .unwrap(),
2192            )
2193            .end_split()
2194            .build()
2195            .unwrap();
2196
2197        assert_eq!(definition.steps().len(), 1);
2198        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2199            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2200        } else {
2201            panic!("expected Split step");
2202        }
2203    }
2204
2205    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
2206
2207    #[test]
2208    fn test_builder_set_body_static_adds_processor() {
2209        let definition = RouteBuilder::from("timer:tick")
2210            .route_id("test-route")
2211            .set_body("fixed")
2212            .build()
2213            .unwrap();
2214        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2215    }
2216
2217    #[test]
2218    fn test_builder_set_body_fn_adds_processor() {
2219        let definition = RouteBuilder::from("timer:tick")
2220            .route_id("test-route")
2221            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2222            .build()
2223            .unwrap();
2224        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2225    }
2226
2227    #[test]
2228    fn transform_alias_produces_same_as_set_body() {
2229        let route_transform = RouteBuilder::from("timer:tick")
2230            .route_id("test-route")
2231            .transform("hello")
2232            .build()
2233            .unwrap();
2234
2235        let route_set_body = RouteBuilder::from("timer:tick")
2236            .route_id("test-route")
2237            .set_body("hello")
2238            .build()
2239            .unwrap();
2240
2241        assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2242    }
2243
2244    #[test]
2245    fn test_builder_set_header_fn_adds_processor() {
2246        let definition = RouteBuilder::from("timer:tick")
2247            .route_id("test-route")
2248            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2249            .build()
2250            .unwrap();
2251        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2252    }
2253
2254    #[tokio::test]
2255    async fn test_set_body_static_processor_works() {
2256        use camel_core::route::{CompiledStep, compose_pipeline};
2257        let def = RouteBuilder::from("t:t")
2258            .route_id("test-route")
2259            .set_body("replaced")
2260            .build()
2261            .unwrap();
2262        let pipeline = compose_pipeline(
2263            def.steps()
2264                .iter()
2265                .filter_map(|s| {
2266                    if let BuilderStep::Processor(p) = s {
2267                        Some(p.clone())
2268                    } else {
2269                        None
2270                    }
2271                })
2272                .map(|p| CompiledStep::Process {
2273                    processor: p,
2274                    body_contract: None,
2275                    lifecycle: None,
2276                })
2277                .collect(),
2278        );
2279        let exchange = Exchange::new(Message::new("original"));
2280        let result = pipeline.oneshot(exchange).await.unwrap();
2281        assert_eq!(result.input.body.as_text(), Some("replaced"));
2282    }
2283
2284    #[tokio::test]
2285    async fn test_set_body_fn_processor_works() {
2286        use camel_core::route::{CompiledStep, compose_pipeline};
2287        let def = RouteBuilder::from("t:t")
2288            .route_id("test-route")
2289            .set_body_fn(|ex: &Exchange| {
2290                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2291            })
2292            .build()
2293            .unwrap();
2294        let pipeline = compose_pipeline(
2295            def.steps()
2296                .iter()
2297                .filter_map(|s| {
2298                    if let BuilderStep::Processor(p) = s {
2299                        Some(p.clone())
2300                    } else {
2301                        None
2302                    }
2303                })
2304                .map(|p| CompiledStep::Process {
2305                    processor: p,
2306                    body_contract: None,
2307                    lifecycle: None,
2308                })
2309                .collect(),
2310        );
2311        let exchange = Exchange::new(Message::new("hello"));
2312        let result = pipeline.oneshot(exchange).await.unwrap();
2313        assert_eq!(result.input.body.as_text(), Some("HELLO"));
2314    }
2315
2316    #[tokio::test]
2317    async fn test_set_header_fn_processor_works() {
2318        use camel_core::route::{CompiledStep, compose_pipeline};
2319        let def = RouteBuilder::from("t:t")
2320            .route_id("test-route")
2321            .set_header_fn("echo", |ex: &Exchange| {
2322                ex.input
2323                    .body
2324                    .as_text()
2325                    .map(|t| Value::String(t.into()))
2326                    .unwrap_or(Value::Null)
2327            })
2328            .build()
2329            .unwrap();
2330        let pipeline = compose_pipeline(
2331            def.steps()
2332                .iter()
2333                .filter_map(|s| {
2334                    if let BuilderStep::Processor(p) = s {
2335                        Some(p.clone())
2336                    } else {
2337                        None
2338                    }
2339                })
2340                .map(|p| CompiledStep::Process {
2341                    processor: p,
2342                    body_contract: None,
2343                    lifecycle: None,
2344                })
2345                .collect(),
2346        );
2347        let exchange = Exchange::new(Message::new("ping"));
2348        let result = pipeline.oneshot(exchange).await.unwrap();
2349        assert_eq!(
2350            result.input.header("echo"),
2351            Some(&Value::String("ping".into()))
2352        );
2353    }
2354
2355    // ── FilterBuilder typestate tests ─────────────────────────────────────
2356
2357    #[test]
2358    fn test_filter_builder_typestate() {
2359        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2360            .route_id("test-route")
2361            .filter(|_ex| true)
2362            .to("mock:inner")
2363            .end_filter()
2364            .to("mock:outer")
2365            .build();
2366        assert!(result.is_ok());
2367    }
2368
2369    #[test]
2370    fn test_filter_builder_steps_collected() {
2371        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2372            .route_id("test-route")
2373            .filter(|_ex| true)
2374            .to("mock:inner")
2375            .end_filter()
2376            .build()
2377            .unwrap();
2378
2379        assert_eq!(definition.steps().len(), 1);
2380        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2381    }
2382
2383    #[test]
2384    fn test_wire_tap_builder_adds_step() {
2385        let definition = RouteBuilder::from("timer:tick")
2386            .route_id("test-route")
2387            .wire_tap("mock:tap")
2388            .to("mock:result")
2389            .build()
2390            .unwrap();
2391
2392        assert_eq!(definition.steps().len(), 2);
2393        assert!(
2394            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2395        );
2396        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2397    }
2398
2399    // ── MulticastBuilder typestate tests ─────────────────────────────────────
2400
2401    #[test]
2402    fn test_multicast_builder_typestate() {
2403        let definition = RouteBuilder::from("timer:tick")
2404            .route_id("test-route")
2405            .multicast()
2406            .to("direct:a")
2407            .to("direct:b")
2408            .end_multicast()
2409            .to("mock:result")
2410            .build()
2411            .unwrap();
2412
2413        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
2414    }
2415
2416    #[test]
2417    fn test_multicast_builder_steps_collected() {
2418        let definition = RouteBuilder::from("timer:tick")
2419            .route_id("test-route")
2420            .multicast()
2421            .to("direct:a")
2422            .to("direct:b")
2423            .end_multicast()
2424            .build()
2425            .unwrap();
2426
2427        match &definition.steps()[0] {
2428            BuilderStep::Multicast { steps, .. } => {
2429                assert_eq!(steps.len(), 2);
2430            }
2431            other => panic!("Expected Multicast, got {:?}", other),
2432        }
2433    }
2434
2435    // ── Concurrency builder tests ─────────────────────────────────────
2436
2437    #[test]
2438    fn test_builder_concurrent_sets_concurrency() {
2439        use camel_component_api::ConcurrencyModel;
2440
2441        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2442            .route_id("test-route")
2443            .concurrent(16)
2444            .to("log:info")
2445            .build()
2446            .unwrap();
2447
2448        assert_eq!(
2449            definition.concurrency_override(),
2450            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2451        );
2452    }
2453
2454    #[test]
2455    fn test_builder_concurrent_zero_means_unbounded() {
2456        use camel_component_api::ConcurrencyModel;
2457
2458        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2459            .route_id("test-route")
2460            .concurrent(0)
2461            .to("log:info")
2462            .build()
2463            .unwrap();
2464
2465        assert_eq!(
2466            definition.concurrency_override(),
2467            Some(&ConcurrencyModel::Concurrent { max: None })
2468        );
2469    }
2470
2471    #[test]
2472    fn test_builder_sequential_sets_concurrency() {
2473        use camel_component_api::ConcurrencyModel;
2474
2475        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2476            .route_id("test-route")
2477            .sequential()
2478            .to("log:info")
2479            .build()
2480            .unwrap();
2481
2482        assert_eq!(
2483            definition.concurrency_override(),
2484            Some(&ConcurrencyModel::Sequential)
2485        );
2486    }
2487
2488    #[test]
2489    fn test_builder_default_concurrency_is_none() {
2490        let definition = RouteBuilder::from("timer:tick")
2491            .route_id("test-route")
2492            .to("log:info")
2493            .build()
2494            .unwrap();
2495
2496        assert_eq!(definition.concurrency_override(), None);
2497    }
2498
2499    // ── Route lifecycle builder tests ─────────────────────────────────────
2500
2501    #[test]
2502    fn test_builder_route_id_sets_id() {
2503        let definition = RouteBuilder::from("timer:tick")
2504            .route_id("my-route")
2505            .build()
2506            .unwrap();
2507
2508        assert_eq!(definition.route_id(), "my-route");
2509    }
2510
2511    #[test]
2512    fn test_build_without_route_id_fails() {
2513        let result = RouteBuilder::from("timer:tick?period=1000")
2514            .to("log:info")
2515            .build();
2516        let err = match result {
2517            Err(e) => e.to_string(),
2518            Ok(_) => panic!("build() should fail without route_id"),
2519        };
2520        assert!(
2521            err.contains("route_id"),
2522            "error should mention route_id, got: {}",
2523            err
2524        );
2525    }
2526
2527    #[test]
2528    fn test_builder_empty_route_id_rejected() {
2529        let result = RouteBuilder::from("timer:tick").route_id("").build();
2530        let err = result.err().expect("empty route_id should be rejected");
2531        assert!(matches!(err, CamelError::RouteError(_)));
2532    }
2533
2534    #[test]
2535    fn test_builder_whitespace_route_id_rejected() {
2536        let result = RouteBuilder::from("timer:tick").route_id("   ").build();
2537        assert!(result.is_err());
2538    }
2539
2540    #[test]
2541    fn test_builder_auto_startup_false() {
2542        let definition = RouteBuilder::from("timer:tick")
2543            .route_id("test-route")
2544            .auto_startup(false)
2545            .build()
2546            .unwrap();
2547
2548        assert!(!definition.auto_startup());
2549    }
2550
2551    #[test]
2552    fn test_builder_startup_order_custom() {
2553        let definition = RouteBuilder::from("timer:tick")
2554            .route_id("test-route")
2555            .startup_order(50)
2556            .build()
2557            .unwrap();
2558
2559        assert_eq!(definition.startup_order(), 50);
2560    }
2561
2562    #[test]
2563    fn test_builder_defaults() {
2564        let definition = RouteBuilder::from("timer:tick")
2565            .route_id("test-route")
2566            .build()
2567            .unwrap();
2568
2569        assert_eq!(definition.route_id(), "test-route");
2570        assert!(definition.auto_startup());
2571        assert_eq!(definition.startup_order(), 1000);
2572    }
2573
2574    // ── Choice typestate tests ──────────────────────────────────────────────────
2575
2576    #[test]
2577    fn test_choice_builder_single_when() {
2578        let definition = RouteBuilder::from("timer:tick")
2579            .route_id("test-route")
2580            .choice()
2581            .when(|ex: &Exchange| ex.input.header("type").is_some())
2582            .to("mock:typed")
2583            .end_when()
2584            .end_choice()
2585            .build()
2586            .unwrap();
2587        assert_eq!(definition.steps().len(), 1);
2588        assert!(
2589            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2590            if whens.len() == 1 && otherwise.is_none())
2591        );
2592    }
2593
2594    #[test]
2595    fn test_choice_builder_when_otherwise() {
2596        let definition = RouteBuilder::from("timer:tick")
2597            .route_id("test-route")
2598            .choice()
2599            .when(|ex: &Exchange| ex.input.header("a").is_some())
2600            .to("mock:a")
2601            .end_when()
2602            .otherwise()
2603            .to("mock:fallback")
2604            .end_otherwise()
2605            .end_choice()
2606            .build()
2607            .unwrap();
2608        assert!(
2609            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2610            if whens.len() == 1 && otherwise.is_some())
2611        );
2612    }
2613
2614    #[test]
2615    fn test_choice_builder_multiple_whens() {
2616        let definition = RouteBuilder::from("timer:tick")
2617            .route_id("test-route")
2618            .choice()
2619            .when(|ex: &Exchange| ex.input.header("a").is_some())
2620            .to("mock:a")
2621            .end_when()
2622            .when(|ex: &Exchange| ex.input.header("b").is_some())
2623            .to("mock:b")
2624            .end_when()
2625            .end_choice()
2626            .build()
2627            .unwrap();
2628        assert!(
2629            matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2630            if whens.len() == 2)
2631        );
2632    }
2633
2634    #[test]
2635    fn test_choice_step_after_choice() {
2636        // Steps after end_choice() are added to the outer pipeline, not inside choice.
2637        let definition = RouteBuilder::from("timer:tick")
2638            .route_id("test-route")
2639            .choice()
2640            .when(|_ex: &Exchange| true)
2641            .to("mock:inner")
2642            .end_when()
2643            .end_choice()
2644            .to("mock:outer") // must be step[1], not inside choice
2645            .build()
2646            .unwrap();
2647        assert_eq!(definition.steps().len(), 2);
2648        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2649    }
2650
2651    // ── Throttle typestate tests ──────────────────────────────────────────────────
2652
2653    #[test]
2654    fn test_throttle_builder_typestate() {
2655        let definition = RouteBuilder::from("timer:tick")
2656            .route_id("test-route")
2657            .throttle(10, std::time::Duration::from_secs(1))
2658            .to("mock:result")
2659            .end_throttle()
2660            .build()
2661            .unwrap();
2662
2663        assert_eq!(definition.steps().len(), 1);
2664        assert!(matches!(
2665            &definition.steps()[0],
2666            BuilderStep::Throttle { .. }
2667        ));
2668    }
2669
2670    #[test]
2671    fn test_throttle_builder_with_strategy() {
2672        let definition = RouteBuilder::from("timer:tick")
2673            .route_id("test-route")
2674            .throttle(10, std::time::Duration::from_secs(1))
2675            .strategy(ThrottleStrategy::Reject)
2676            .to("mock:result")
2677            .end_throttle()
2678            .build()
2679            .unwrap();
2680
2681        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2682            assert_eq!(config.strategy, ThrottleStrategy::Reject);
2683        } else {
2684            panic!("Expected Throttle step");
2685        }
2686    }
2687
2688    #[test]
2689    fn test_throttle_builder_steps_collected() {
2690        let definition = RouteBuilder::from("timer:tick")
2691            .route_id("test-route")
2692            .throttle(5, std::time::Duration::from_secs(1))
2693            .set_header("throttled", Value::Bool(true))
2694            .to("mock:throttled")
2695            .end_throttle()
2696            .build()
2697            .unwrap();
2698
2699        match &definition.steps()[0] {
2700            BuilderStep::Throttle { steps, .. } => {
2701                assert_eq!(steps.len(), 2); // SetHeader + To
2702            }
2703            other => panic!("Expected Throttle, got {:?}", other),
2704        }
2705    }
2706
2707    #[test]
2708    fn test_throttle_step_after_throttle() {
2709        // Steps after end_throttle() are added to the outer pipeline, not inside throttle.
2710        let definition = RouteBuilder::from("timer:tick")
2711            .route_id("test-route")
2712            .throttle(10, std::time::Duration::from_secs(1))
2713            .to("mock:inner")
2714            .end_throttle()
2715            .to("mock:outer")
2716            .build()
2717            .unwrap();
2718
2719        assert_eq!(definition.steps().len(), 2);
2720        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2721    }
2722
2723    // ── LoadBalance typestate tests ──────────────────────────────────────────────────
2724
2725    #[test]
2726    fn test_load_balance_builder_typestate() {
2727        let definition = RouteBuilder::from("timer:tick")
2728            .route_id("test-route")
2729            .load_balance()
2730            .round_robin()
2731            .to("mock:a")
2732            .to("mock:b")
2733            .end_load_balance()
2734            .build()
2735            .unwrap();
2736
2737        assert_eq!(definition.steps().len(), 1);
2738        assert!(matches!(
2739            &definition.steps()[0],
2740            BuilderStep::LoadBalance { .. }
2741        ));
2742    }
2743
2744    #[test]
2745    fn test_load_balance_builder_with_strategy() {
2746        let definition = RouteBuilder::from("timer:tick")
2747            .route_id("test-route")
2748            .load_balance()
2749            .random()
2750            .to("mock:result")
2751            .end_load_balance()
2752            .build()
2753            .unwrap();
2754
2755        if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2756            assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2757        } else {
2758            panic!("Expected LoadBalance step");
2759        }
2760    }
2761
2762    #[test]
2763    fn test_load_balance_builder_steps_collected() {
2764        let definition = RouteBuilder::from("timer:tick")
2765            .route_id("test-route")
2766            .load_balance()
2767            .set_header("lb", Value::Bool(true))
2768            .to("mock:a")
2769            .end_load_balance()
2770            .build()
2771            .unwrap();
2772
2773        match &definition.steps()[0] {
2774            BuilderStep::LoadBalance { steps, .. } => {
2775                assert_eq!(steps.len(), 2); // SetHeader + To
2776            }
2777            other => panic!("Expected LoadBalance, got {:?}", other),
2778        }
2779    }
2780
2781    #[test]
2782    fn test_load_balance_step_after_load_balance() {
2783        // Steps after end_load_balance() are added to the outer pipeline, not inside load_balance.
2784        let definition = RouteBuilder::from("timer:tick")
2785            .route_id("test-route")
2786            .load_balance()
2787            .to("mock:inner")
2788            .end_load_balance()
2789            .to("mock:outer")
2790            .build()
2791            .unwrap();
2792
2793        assert_eq!(definition.steps().len(), 2);
2794        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2795    }
2796
2797    // ── DynamicRouter typestate tests ──────────────────────────────────────────────────
2798
2799    #[test]
2800    fn test_dynamic_router_builder() {
2801        let definition = RouteBuilder::from("timer:tick")
2802            .route_id("test-route")
2803            .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2804            .build()
2805            .unwrap();
2806
2807        assert_eq!(definition.steps().len(), 1);
2808        assert!(matches!(
2809            &definition.steps()[0],
2810            BuilderStep::DynamicRouter { .. }
2811        ));
2812    }
2813
2814    #[test]
2815    fn test_dynamic_router_builder_with_config() {
2816        let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2817            .max_iterations(100)
2818            .cache_size(500);
2819
2820        let definition = RouteBuilder::from("timer:tick")
2821            .route_id("test-route")
2822            .dynamic_router_with_config(config)
2823            .build()
2824            .unwrap();
2825
2826        assert_eq!(definition.steps().len(), 1);
2827        if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2828            assert_eq!(config.max_iterations, 100);
2829            assert_eq!(config.cache_size, 500);
2830        } else {
2831            panic!("Expected DynamicRouter step");
2832        }
2833    }
2834
2835    #[test]
2836    fn test_dynamic_router_step_after_router() {
2837        // Steps after dynamic_router() are added to the outer pipeline.
2838        let definition = RouteBuilder::from("timer:tick")
2839            .route_id("test-route")
2840            .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2841            .to("mock:outer")
2842            .build()
2843            .unwrap();
2844
2845        assert_eq!(definition.steps().len(), 2);
2846        assert!(matches!(
2847            &definition.steps()[0],
2848            BuilderStep::DynamicRouter { .. }
2849        ));
2850        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2851    }
2852
2853    #[test]
2854    fn routing_slip_builder_creates_step() {
2855        use camel_api::RoutingSlipExpression;
2856
2857        let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2858
2859        let route = RouteBuilder::from("direct:start")
2860            .route_id("routing-slip-test")
2861            .routing_slip(expression)
2862            .build()
2863            .unwrap();
2864
2865        assert!(
2866            matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2867            "Expected RoutingSlip step"
2868        );
2869    }
2870
2871    #[test]
2872    fn routing_slip_with_config_builder_creates_step() {
2873        use camel_api::RoutingSlipConfig;
2874
2875        let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2876            .uri_delimiter("|")
2877            .cache_size(50)
2878            .ignore_invalid_endpoints(true);
2879
2880        let route = RouteBuilder::from("direct:start")
2881            .route_id("routing-slip-config-test")
2882            .routing_slip_with_config(config)
2883            .build()
2884            .unwrap();
2885
2886        if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2887            assert_eq!(config.uri_delimiter, "|");
2888            assert_eq!(config.cache_size, 50);
2889            assert!(config.ignore_invalid_endpoints);
2890        } else {
2891            panic!("Expected RoutingSlip step");
2892        }
2893    }
2894
2895    #[test]
2896    fn test_builder_marshal_adds_processor_step() {
2897        let definition = RouteBuilder::from("timer:tick")
2898            .route_id("test-route")
2899            .marshal("json")
2900            .unwrap()
2901            .build()
2902            .unwrap();
2903        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2904    }
2905
2906    #[test]
2907    fn test_builder_unmarshal_adds_processor_step() {
2908        let definition = RouteBuilder::from("timer:tick")
2909            .route_id("test-route")
2910            .unmarshal("json")
2911            .unwrap()
2912            .build()
2913            .unwrap();
2914        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2915    }
2916
2917    #[test]
2918    fn test_builder_stream_cache_adds_processor_step() {
2919        let definition = RouteBuilder::from("timer:tick")
2920            .route_id("test-route")
2921            .stream_cache(1024)
2922            .build()
2923            .unwrap();
2924        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2925    }
2926
2927    #[test]
2928    fn validate_adds_validate_step() {
2929        let def = RouteBuilder::from("direct:in")
2930            .route_id("test")
2931            .validate("schemas/order.xsd")
2932            .build()
2933            .unwrap();
2934        let steps = def.steps();
2935        assert_eq!(steps.len(), 1);
2936        assert!(
2937            matches!(&steps[0], BuilderStep::Validate { predicate } if predicate.language == "simple" && predicate.source == "schemas/order.xsd"),
2938            "got: {:?}",
2939            steps[0]
2940        );
2941    }
2942
2943    #[test]
2944    fn test_builder_marshal_returns_err_for_unknown_format() {
2945        let result = RouteBuilder::from("timer:tick")
2946            .route_id("test-route")
2947            .marshal("csv");
2948        let err = match result {
2949            Err(e) => e,
2950            Ok(_) => panic!("marshal with unknown format should return Err"),
2951        };
2952        let msg = err.to_string();
2953        assert!(
2954            msg.contains("unknown data format"),
2955            "error should mention unknown format, got: {msg}"
2956        );
2957        assert!(
2958            msg.contains("csv"),
2959            "error should mention format name, got: {msg}"
2960        );
2961    }
2962
2963    #[test]
2964    fn test_builder_unmarshal_returns_err_for_unknown_format() {
2965        let result = RouteBuilder::from("timer:tick")
2966            .route_id("test-route")
2967            .unmarshal("csv");
2968        let err = match result {
2969            Err(e) => e,
2970            Ok(_) => panic!("unmarshal with unknown format should return Err"),
2971        };
2972        let msg = err.to_string();
2973        assert!(
2974            msg.contains("unknown data format"),
2975            "error should mention unknown format, got: {msg}"
2976        );
2977        assert!(
2978            msg.contains("csv"),
2979            "error should mention format name, got: {msg}"
2980        );
2981    }
2982
2983    #[test]
2984    fn test_builder_recipient_list_creates_step() {
2985        let route = RouteBuilder::from("direct:start")
2986            .route_id("recipient-list-test")
2987            .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2988            .build()
2989            .unwrap();
2990
2991        assert!(matches!(
2992            &route.steps()[0],
2993            BuilderStep::RecipientList { .. }
2994        ));
2995    }
2996
2997    #[test]
2998    fn test_builder_recipient_list_with_config_creates_step() {
2999        let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
3000
3001        let route = RouteBuilder::from("direct:start")
3002            .route_id("recipient-list-config-test")
3003            .recipient_list_with_config(config)
3004            .build()
3005            .unwrap();
3006
3007        assert!(matches!(
3008            &route.steps()[0],
3009            BuilderStep::RecipientList { .. }
3010        ));
3011    }
3012
3013    #[test]
3014    fn test_builder_script_adds_script_step() {
3015        let route = RouteBuilder::from("direct:start")
3016            .route_id("script-test")
3017            .script("rhai", "headers[\"x\"] = \"y\"")
3018            .build()
3019            .unwrap();
3020
3021        assert!(matches!(
3022            &route.steps()[0],
3023            BuilderStep::Script { language, script }
3024            if language == "rhai" && script == "headers[\"x\"] = \"y\""
3025        ));
3026    }
3027
3028    #[test]
3029    fn test_builder_delay_and_delay_with_header_add_steps() {
3030        let route = RouteBuilder::from("direct:start")
3031            .route_id("delay-test")
3032            .delay(Duration::from_millis(250))
3033            .delay_with_header(Duration::from_millis(500), "x-delay")
3034            .build()
3035            .unwrap();
3036
3037        assert_eq!(route.steps().len(), 2);
3038        assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
3039        assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
3040    }
3041
3042    #[test]
3043    fn test_builder_log_and_stop_add_steps_in_order() {
3044        let route = RouteBuilder::from("direct:start")
3045            .route_id("log-stop-test")
3046            .log("hello", LogLevel::Info)
3047            .stop()
3048            .to("mock:after")
3049            .build()
3050            .unwrap();
3051
3052        assert_eq!(route.steps().len(), 3);
3053        assert!(matches!(
3054            &route.steps()[0],
3055            BuilderStep::Log { message, .. } if message == "hello"
3056        ));
3057        assert!(matches!(&route.steps()[1], BuilderStep::Stop));
3058        assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
3059    }
3060
3061    #[test]
3062    fn test_builder_stream_cache_default_adds_processor_step() {
3063        let route = RouteBuilder::from("direct:start")
3064            .route_id("stream-cache-default-test")
3065            .stream_cache_default()
3066            .build()
3067            .unwrap();
3068
3069        assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
3070    }
3071
3072    #[test]
3073    fn test_validate_creates_validate_step_with_expression() {
3074        let route = RouteBuilder::from("direct:in")
3075            .route_id("validate-prefix-test")
3076            .validate("${body.size()} > 0")
3077            .build()
3078            .unwrap();
3079
3080        assert!(matches!(
3081            &route.steps()[0],
3082            BuilderStep::Validate { predicate } if predicate.language == "simple" && predicate.source == "${body.size()} > 0"
3083        ));
3084    }
3085
3086    #[test]
3087    fn test_load_balance_builder_weighted_failover_config() {
3088        let route = RouteBuilder::from("direct:start")
3089            .route_id("lb-weighted-failover")
3090            .load_balance()
3091            .weighted(vec![
3092                ("direct:a".to_string(), 3),
3093                ("direct:b".to_string(), 1),
3094            ])
3095            .failover()
3096            .to("mock:result")
3097            .end_load_balance()
3098            .build()
3099            .unwrap();
3100
3101        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3102            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3103        } else {
3104            panic!("Expected LoadBalance step");
3105        }
3106    }
3107
3108    #[test]
3109    fn test_multicast_builder_all_config_setters() {
3110        let route = RouteBuilder::from("direct:start")
3111            .route_id("multicast-config-test")
3112            .multicast()
3113            .parallel(true)
3114            .parallel_limit(4)
3115            .stop_on_exception(true)
3116            .timeout(Duration::from_millis(300))
3117            .aggregation(MulticastStrategy::Original)
3118            .to("mock:a")
3119            .end_multicast()
3120            .build()
3121            .unwrap();
3122
3123        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3124            assert!(config.parallel);
3125            assert_eq!(config.parallel_limit, Some(4));
3126            assert!(config.stop_on_exception);
3127            assert_eq!(config.timeout, Some(Duration::from_millis(300)));
3128            assert!(matches!(config.aggregation, MulticastStrategy::Original));
3129        } else {
3130            panic!("Expected Multicast step");
3131        }
3132    }
3133
3134    #[test]
3135    fn test_build_canonical_rejects_unsupported_processor_step() {
3136        let err = RouteBuilder::from("direct:start")
3137            .route_id("canonical-reject")
3138            .set_header("k", Value::String("v".into()))
3139            .build_canonical()
3140            .unwrap_err();
3141
3142        assert!(format!("{err}").contains("does not support step `processor`"));
3143    }
3144
3145    // ── LoadBalance strategy-specific tests ─────────────────────────────────────
3146
3147    #[test]
3148    fn test_load_balance_builder_weighted_strategy() {
3149        let route = RouteBuilder::from("direct:start")
3150            .route_id("lb-weighted")
3151            .load_balance()
3152            .weighted(vec![
3153                ("direct:a".to_string(), 5),
3154                ("direct:b".to_string(), 2),
3155                ("direct:c".to_string(), 1),
3156            ])
3157            .to("mock:result")
3158            .end_load_balance()
3159            .build()
3160            .unwrap();
3161
3162        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3163            assert!(matches!(config.strategy, LoadBalanceStrategy::Weighted(_)));
3164        } else {
3165            panic!("Expected LoadBalance step");
3166        }
3167    }
3168
3169    #[test]
3170    fn test_load_balance_builder_failover_strategy() {
3171        let route = RouteBuilder::from("direct:start")
3172            .route_id("lb-failover")
3173            .load_balance()
3174            .failover()
3175            .to("mock:primary")
3176            .end_load_balance()
3177            .build()
3178            .unwrap();
3179
3180        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3181            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3182        } else {
3183            panic!("Expected LoadBalance step");
3184        }
3185    }
3186
3187    // ── FilterInSplitBuilder tests ──────────────────────────────────────────────
3188
3189    #[test]
3190    fn test_filter_in_split_builder_typestate() {
3191        use camel_api::splitter::{SplitterConfig, split_body_lines};
3192
3193        let definition = RouteBuilder::from("timer:test")
3194            .route_id("filter-in-split")
3195            .split(SplitterConfig::new(split_body_lines()))
3196            .filter(|_ex| true)
3197            .to("mock:filtered")
3198            .end_filter()
3199            .end_split()
3200            .build()
3201            .unwrap();
3202
3203        assert_eq!(definition.steps().len(), 1);
3204        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3205            assert_eq!(steps.len(), 1);
3206            assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3207        } else {
3208            panic!("Expected Split step");
3209        }
3210    }
3211
3212    #[test]
3213    fn test_filter_in_split_builder_multiple_steps() {
3214        use camel_api::splitter::{SplitterConfig, split_body_lines};
3215
3216        let definition = RouteBuilder::from("timer:test")
3217            .route_id("filter-in-split-multi")
3218            .split(SplitterConfig::new(split_body_lines()))
3219            .to("mock:before-filter")
3220            .filter(|_ex| true)
3221            .to("mock:inside-filter")
3222            .end_filter()
3223            .to("mock:after-filter")
3224            .end_split()
3225            .build()
3226            .unwrap();
3227
3228        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3229            // To("before-filter") + Filter{...} + To("after-filter") = 3
3230            assert_eq!(steps.len(), 3);
3231        } else {
3232            panic!("Expected Split step");
3233        }
3234    }
3235
3236    // ── build_canonical tests ───────────────────────────────────────────────────
3237
3238    #[test]
3239    fn test_build_canonical_with_circuit_breaker() {
3240        use camel_api::circuit_breaker::CircuitBreakerConfig;
3241
3242        let spec = RouteBuilder::from("direct:start")
3243            .route_id("canonical-cb")
3244            .circuit_breaker(CircuitBreakerConfig::new().failure_threshold(10))
3245            .to("mock:result")
3246            .build_canonical()
3247            .unwrap();
3248
3249        let cb = spec.circuit_breaker.expect("circuit breaker should be set");
3250        assert_eq!(cb.failure_threshold, 10);
3251    }
3252
3253    #[test]
3254    fn test_build_canonical_rejects_custom_split_aggregation() {
3255        use camel_api::splitter::{SplitterConfig, split_body_lines};
3256
3257        let err = RouteBuilder::from("direct:start")
3258            .route_id("canonical-custom-split")
3259            .split(SplitterConfig::new(split_body_lines()).aggregation(
3260                camel_api::splitter::AggregationStrategy::Custom(Arc::new(|_, ex| ex)),
3261            ))
3262            .to("mock:frag")
3263            .end_split()
3264            .build_canonical()
3265            .unwrap_err();
3266
3267        // Split with closure-based expression is rejected in canonical v1.
3268        assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3269    }
3270
3271    #[test]
3272    fn test_build_canonical_rejects_custom_aggregate_strategy() {
3273        let err = RouteBuilder::from("direct:start")
3274            .route_id("canonical-custom-agg")
3275            .aggregate(
3276                AggregatorConfig::correlate_by("key")
3277                    .complete_when_size(2)
3278                    .strategy(AggregationStrategy::Custom(Arc::new(|_, ex| ex)))
3279                    .build()
3280                    .unwrap(),
3281            )
3282            .build_canonical()
3283            .unwrap_err();
3284
3285        assert!(format!("{err}").contains("custom aggregate strategy"));
3286    }
3287
3288    #[test]
3289    fn test_build_canonical_rejects_fn_correlation_strategy() {
3290        let err = RouteBuilder::from("direct:start")
3291            .route_id("canonical-fn-corr")
3292            .aggregate(AggregatorConfig {
3293                header_name: "key".to_string(),
3294                completion: CompletionMode::Single(CompletionCondition::Size(1)),
3295                correlation: CorrelationStrategy::Fn(Arc::new(|_| Some("key".to_string()))),
3296                strategy: AggregationStrategy::CollectAll,
3297                max_buckets: None,
3298                bucket_ttl: None,
3299                force_completion_on_stop: false,
3300                discard_on_timeout: false,
3301            })
3302            .build_canonical()
3303            .unwrap_err();
3304
3305        assert!(format!("{err}").contains("Fn correlation strategy"));
3306    }
3307
3308    #[test]
3309    fn test_build_canonical_rejects_predicate_completion() {
3310        let err = RouteBuilder::from("direct:start")
3311            .route_id("canonical-pred-completion")
3312            .aggregate(AggregatorConfig {
3313                header_name: "key".to_string(),
3314                completion: CompletionMode::Single(CompletionCondition::Predicate(Arc::new(
3315                    |_| false,
3316                ))),
3317                correlation: CorrelationStrategy::HeaderName("key".to_string()),
3318                strategy: AggregationStrategy::CollectAll,
3319                max_buckets: None,
3320                bucket_ttl: None,
3321                force_completion_on_stop: false,
3322                discard_on_timeout: false,
3323            })
3324            .build_canonical()
3325            .unwrap_err();
3326
3327        assert!(format!("{err}").contains("predicate completion"));
3328    }
3329
3330    #[test]
3331    fn test_build_canonical_with_expression_correlation() {
3332        let spec = RouteBuilder::from("direct:start")
3333            .route_id("canonical-expr-corr")
3334            .aggregate(AggregatorConfig {
3335                header_name: "key".to_string(),
3336                completion: CompletionMode::Single(CompletionCondition::Size(1)),
3337                correlation: CorrelationStrategy::Expression {
3338                    expr: "header.key".to_string(),
3339                    language: "simple".to_string(),
3340                },
3341                strategy: AggregationStrategy::CollectAll,
3342                max_buckets: None,
3343                bucket_ttl: None,
3344                force_completion_on_stop: false,
3345                discard_on_timeout: false,
3346            })
3347            .build_canonical()
3348            .unwrap();
3349
3350        assert!(spec.steps.iter().any(|s| matches!(s, CanonicalStepSpec::Aggregate(a) if a.correlation_key == Some("header.key".to_string()))));
3351    }
3352
3353    #[test]
3354    fn test_build_canonical_split_rejected_with_closure_expression() {
3355        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
3356
3357        // Builder-based split uses closure expressions, which are not serializable.
3358        let err = RouteBuilder::from("direct:start")
3359            .route_id("canonical-split-last")
3360            .split(
3361                SplitterConfig::new(split_body_lines()).aggregation(AggregationStrategy::LastWins),
3362            )
3363            .to("mock:frag")
3364            .end_split()
3365            .build_canonical()
3366            .unwrap_err();
3367
3368        assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3369    }
3370
3371    // ── OnExceptionBuilder full chain tests ─────────────────────────────────────
3372
3373    #[test]
3374    fn test_on_exception_full_chain_retry_backoff_jitter_handled_by() {
3375        let definition = RouteBuilder::from("direct:start")
3376            .route_id("on-exception-full")
3377            .dead_letter_channel("log:dlc")
3378            .on_exception(|e| matches!(e, CamelError::Io(_)))
3379            .retry(5)
3380            .with_backoff(Duration::from_millis(10), 2.0, Duration::from_millis(500))
3381            .with_jitter(0.3)
3382            .handled_by("log:io-handler")
3383            .end_on_exception()
3384            .to("mock:out")
3385            .build()
3386            .unwrap();
3387
3388        let cfg = definition
3389            .error_handler_config()
3390            .expect("error handler should be set");
3391        assert_eq!(cfg.policies.len(), 1);
3392        let policy = &cfg.policies[0];
3393        let retry = policy.retry.as_ref().expect("retry should be set");
3394        assert_eq!(retry.max_attempts, 5);
3395        assert_eq!(retry.initial_delay, Duration::from_millis(10));
3396        assert_eq!(retry.multiplier, 2.0);
3397        assert_eq!(retry.max_delay, Duration::from_millis(500));
3398        assert!((retry.jitter_factor - 0.3).abs() < f64::EPSILON);
3399        assert_eq!(policy.handled_by.as_deref(), Some("log:io-handler"));
3400    }
3401
3402    #[test]
3403    fn test_on_exception_jitter_clamped_to_valid_range() {
3404        let definition = RouteBuilder::from("direct:start")
3405            .route_id("jitter-clamp")
3406            .on_exception(|_e| true)
3407            .retry(1)
3408            .with_jitter(5.0)
3409            .end_on_exception()
3410            .to("mock:out")
3411            .build()
3412            .unwrap();
3413
3414        let cfg = definition.error_handler_config().unwrap();
3415        let retry = cfg.policies[0].retry.as_ref().unwrap();
3416        assert!((retry.jitter_factor - 1.0).abs() < f64::EPSILON);
3417    }
3418
3419    // ── StepAccumulator: process_fn, convert_body_to, bean ──────────────────────
3420
3421    #[test]
3422    fn test_builder_process_fn_adds_processor_step() {
3423        use camel_api::BoxProcessorExt;
3424        let processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
3425        let definition = RouteBuilder::from("timer:tick")
3426            .route_id("process-fn-test")
3427            .process_fn(processor)
3428            .build()
3429            .unwrap();
3430
3431        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3432    }
3433
3434    #[test]
3435    fn test_builder_convert_body_to_adds_processor_step() {
3436        let definition = RouteBuilder::from("timer:tick")
3437            .route_id("convert-body-test")
3438            .convert_body_to(BodyType::Json)
3439            .build()
3440            .unwrap();
3441
3442        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3443    }
3444
3445    #[test]
3446    fn test_builder_bean_adds_bean_step() {
3447        let definition = RouteBuilder::from("timer:tick")
3448            .route_id("bean-test")
3449            .bean("myBean", "process")
3450            .build()
3451            .unwrap();
3452
3453        assert!(
3454            matches!(&definition.steps()[0], BuilderStep::Bean { name, method }
3455            if name == "myBean" && method == "process")
3456        );
3457    }
3458
3459    // ── Throttle strategy-specific tests ────────────────────────────────────────
3460
3461    #[test]
3462    fn test_throttle_builder_delay_strategy() {
3463        let definition = RouteBuilder::from("timer:tick")
3464            .route_id("throttle-delay")
3465            .throttle(10, Duration::from_secs(1))
3466            .strategy(ThrottleStrategy::Delay)
3467            .to("mock:result")
3468            .end_throttle()
3469            .build()
3470            .unwrap();
3471
3472        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3473            assert_eq!(config.strategy, ThrottleStrategy::Delay);
3474        } else {
3475            panic!("Expected Throttle step");
3476        }
3477    }
3478
3479    #[test]
3480    fn test_throttle_builder_drop_strategy() {
3481        let definition = RouteBuilder::from("timer:tick")
3482            .route_id("throttle-drop")
3483            .throttle(10, Duration::from_secs(1))
3484            .strategy(ThrottleStrategy::Drop)
3485            .to("mock:result")
3486            .end_throttle()
3487            .build()
3488            .unwrap();
3489
3490        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3491            assert_eq!(config.strategy, ThrottleStrategy::Drop);
3492        } else {
3493            panic!("Expected Throttle step");
3494        }
3495    }
3496
3497    // ── LoopInLoopBuilder with loop_while ───────────────────────────────────────
3498
3499    #[test]
3500    fn test_nested_loop_while_builder() {
3501        use camel_api::loop_eip::LoopMode;
3502
3503        let def = RouteBuilder::from("direct:start")
3504            .route_id("nested-loop-while")
3505            .loop_count(2)
3506            .to("mock:outer")
3507            .loop_while(|_ex| true)
3508            .to("mock:inner")
3509            .end_loop()
3510            .end_loop()
3511            .build()
3512            .unwrap();
3513
3514        assert_eq!(def.steps().len(), 1);
3515        if let BuilderStep::Loop { steps, .. } = &def.steps()[0] {
3516            assert_eq!(steps.len(), 2);
3517            if let BuilderStep::Loop { config, .. } = &steps[1] {
3518                assert!(matches!(config.mode, LoopMode::While(_)));
3519            } else {
3520                panic!("Expected inner Loop step");
3521            }
3522        } else {
3523            panic!("Expected outer Loop step");
3524        }
3525    }
3526
3527    // ── Choice with multiple whens + otherwise ──────────────────────────────────
3528
3529    #[test]
3530    fn test_choice_builder_multiple_whens_with_otherwise() {
3531        let definition = RouteBuilder::from("timer:tick")
3532            .route_id("choice-multi-otherwise")
3533            .choice()
3534            .when(|ex: &Exchange| ex.input.header("a").is_some())
3535            .to("mock:a")
3536            .end_when()
3537            .when(|ex: &Exchange| ex.input.header("b").is_some())
3538            .to("mock:b")
3539            .end_when()
3540            .when(|ex: &Exchange| ex.input.header("c").is_some())
3541            .to("mock:c")
3542            .end_when()
3543            .otherwise()
3544            .to("mock:fallback")
3545            .end_otherwise()
3546            .end_choice()
3547            .build()
3548            .unwrap();
3549
3550        if let BuilderStep::Choice { whens, otherwise } = &definition.steps()[0] {
3551            assert_eq!(whens.len(), 3);
3552            assert!(otherwise.is_some());
3553            assert_eq!(otherwise.as_ref().unwrap().len(), 1);
3554        } else {
3555            panic!("Expected Choice step");
3556        }
3557    }
3558
3559    // ── Multicast individual config tests ───────────────────────────────────────
3560
3561    #[test]
3562    fn test_multicast_builder_parallel_only() {
3563        let route = RouteBuilder::from("direct:start")
3564            .route_id("multicast-parallel")
3565            .multicast()
3566            .parallel(true)
3567            .to("mock:a")
3568            .end_multicast()
3569            .build()
3570            .unwrap();
3571
3572        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3573            assert!(config.parallel);
3574            assert_eq!(config.parallel_limit, None);
3575        } else {
3576            panic!("Expected Multicast step");
3577        }
3578    }
3579
3580    #[test]
3581    fn test_multicast_builder_timeout_only() {
3582        let route = RouteBuilder::from("direct:start")
3583            .route_id("multicast-timeout")
3584            .multicast()
3585            .timeout(Duration::from_secs(5))
3586            .to("mock:a")
3587            .end_multicast()
3588            .build()
3589            .unwrap();
3590
3591        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3592            assert_eq!(config.timeout, Some(Duration::from_secs(5)));
3593        } else {
3594            panic!("Expected Multicast step");
3595        }
3596    }
3597
3598    #[test]
3599    fn test_multicast_builder_aggregation_collect_all() {
3600        let route = RouteBuilder::from("direct:start")
3601            .route_id("multicast-collect")
3602            .multicast()
3603            .aggregation(MulticastStrategy::CollectAll)
3604            .to("mock:a")
3605            .end_multicast()
3606            .build()
3607            .unwrap();
3608
3609        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3610            assert!(matches!(config.aggregation, MulticastStrategy::CollectAll));
3611        } else {
3612            panic!("Expected Multicast step");
3613        }
3614    }
3615
3616    // ── extract_completion_fields: Any mode with multiple conditions ────────────
3617
3618    #[test]
3619    fn test_build_canonical_aggregate_any_completion_mode() {
3620        let spec = RouteBuilder::from("direct:start")
3621            .route_id("canonical-any-completion")
3622            .aggregate(
3623                AggregatorConfig::correlate_by("key")
3624                    .complete_on_size_or_timeout(10, Duration::from_secs(30))
3625                    .build()
3626                    .unwrap(),
3627            )
3628            .build_canonical()
3629            .unwrap();
3630
3631        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3632            assert_eq!(agg.completion_size, Some(10));
3633            assert_eq!(agg.completion_timeout_ms, Some(30_000));
3634        } else {
3635            panic!("Expected Aggregate step");
3636        }
3637    }
3638
3639    #[test]
3640    fn test_build_canonical_aggregate_timeout_completion() {
3641        let spec = RouteBuilder::from("direct:start")
3642            .route_id("canonical-timeout-completion")
3643            .aggregate(
3644                AggregatorConfig::correlate_by("key")
3645                    .complete_on_timeout(Duration::from_millis(500))
3646                    .build()
3647                    .unwrap(),
3648            )
3649            .build_canonical()
3650            .unwrap();
3651
3652        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3653            assert_eq!(agg.completion_size, None);
3654            assert_eq!(agg.completion_timeout_ms, Some(500));
3655        } else {
3656            panic!("Expected Aggregate step");
3657        }
3658    }
3659
3660    // ── canonicalize_aggregate: discard_on_timeout and force_completion_on_stop ─
3661
3662    #[test]
3663    fn test_build_canonical_aggregate_discard_on_timeout() {
3664        use camel_api::aggregator::AggregatorConfig;
3665
3666        let spec = RouteBuilder::from("direct:start")
3667            .route_id("canonical-discard-timeout")
3668            .aggregate(
3669                AggregatorConfig::correlate_by("key")
3670                    .complete_when_size(1)
3671                    .discard_on_timeout(true)
3672                    .build()
3673                    .unwrap(),
3674            )
3675            .build_canonical()
3676            .unwrap();
3677
3678        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3679            assert_eq!(agg.discard_on_timeout, Some(true));
3680        } else {
3681            panic!("Expected Aggregate step");
3682        }
3683    }
3684
3685    #[test]
3686    fn test_build_canonical_aggregate_force_completion_on_stop() {
3687        use camel_api::aggregator::AggregatorConfig;
3688
3689        let spec = RouteBuilder::from("direct:start")
3690            .route_id("canonical-force-stop")
3691            .aggregate(
3692                AggregatorConfig::correlate_by("key")
3693                    .complete_when_size(1)
3694                    .force_completion_on_stop(true)
3695                    .build()
3696                    .unwrap(),
3697            )
3698            .build_canonical()
3699            .unwrap();
3700
3701        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3702            assert_eq!(agg.force_completion_on_stop, Some(true));
3703        } else {
3704            panic!("Expected Aggregate step");
3705        }
3706    }
3707
3708    // ── build_canonical: max_buckets and bucket_ttl ─────────────────────────────
3709
3710    #[test]
3711    fn test_build_canonical_aggregate_max_buckets_and_ttl() {
3712        use camel_api::aggregator::AggregatorConfig;
3713
3714        let spec = RouteBuilder::from("direct:start")
3715            .route_id("canonical-buckets-ttl")
3716            .aggregate(
3717                AggregatorConfig::correlate_by("key")
3718                    .complete_when_size(1)
3719                    .max_buckets(100)
3720                    .bucket_ttl(Duration::from_secs(60))
3721                    .build()
3722                    .unwrap(),
3723            )
3724            .build_canonical()
3725            .unwrap();
3726
3727        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3728            assert_eq!(agg.max_buckets, Some(100));
3729            assert_eq!(agg.bucket_ttl_ms, Some(60_000));
3730        } else {
3731            panic!("Expected Aggregate step");
3732        }
3733    }
3734
3735    // ── SplitBuilder with filter inside ─────────────────────────────────────────
3736
3737    #[test]
3738    fn test_split_builder_with_filter_inside() {
3739        use camel_api::splitter::{SplitterConfig, split_body_lines};
3740
3741        let definition = RouteBuilder::from("timer:test")
3742            .route_id("split-with-filter")
3743            .split(SplitterConfig::new(split_body_lines()))
3744            .filter(|_ex| true)
3745            .to("mock:filtered-frag")
3746            .end_filter()
3747            .end_split()
3748            .build()
3749            .unwrap();
3750
3751        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3752            assert_eq!(steps.len(), 1);
3753            assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3754        } else {
3755            panic!("Expected Split step");
3756        }
3757    }
3758
3759    // ── WireTap additional tests ────────────────────────────────────────────────
3760
3761    #[test]
3762    fn test_wire_tap_multiple_taps() {
3763        let definition = RouteBuilder::from("timer:tick")
3764            .route_id("multi-wire-tap")
3765            .wire_tap("mock:tap1")
3766            .wire_tap("mock:tap2")
3767            .to("mock:result")
3768            .build()
3769            .unwrap();
3770
3771        assert_eq!(definition.steps().len(), 3);
3772        assert!(
3773            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap1")
3774        );
3775        assert!(
3776            matches!(&definition.steps()[1], BuilderStep::WireTap { uri } if uri == "mock:tap2")
3777        );
3778    }
3779
3780    // ── Error handler: explicit config after shorthand → Mixed mode ─────────────
3781
3782    #[test]
3783    fn test_builder_shorthand_then_explicit_mixed_mode() {
3784        let result = RouteBuilder::from("direct:start")
3785            .route_id("mixed-mode-2")
3786            .dead_letter_channel("log:dlc")
3787            .error_handler(ErrorHandlerConfig::log_only())
3788            .to("mock:out")
3789            .build();
3790
3791        let err = result.err().expect("mixed mode should fail");
3792        assert!(format!("{err}").contains("mixed error handler modes"));
3793    }
3794
3795    // ── build_canonical: empty from_uri error ───────────────────────────────────
3796
3797    #[test]
3798    fn test_build_canonical_empty_from_uri_errors() {
3799        let result = RouteBuilder::from("").route_id("test").build_canonical();
3800        assert!(result.is_err());
3801    }
3802
3803    #[test]
3804    fn test_build_canonical_missing_route_id_errors() {
3805        let result = RouteBuilder::from("direct:start").build_canonical();
3806        assert!(result.is_err());
3807        let err = result.unwrap_err().to_string();
3808        assert!(err.contains("route_id"));
3809    }
3810
3811    // ── SplitBuilder: aggregate inside split ────────────────────────────────────
3812
3813    #[test]
3814    fn test_split_builder_with_aggregate_inside() {
3815        use camel_api::aggregator::AggregatorConfig;
3816        use camel_api::splitter::{SplitterConfig, split_body_lines};
3817
3818        let definition = RouteBuilder::from("timer:test")
3819            .route_id("split-agg")
3820            .split(SplitterConfig::new(split_body_lines()))
3821            .aggregate(
3822                AggregatorConfig::correlate_by("frag-key")
3823                    .complete_when_size(3)
3824                    .build()
3825                    .unwrap(),
3826            )
3827            .end_split()
3828            .build()
3829            .unwrap();
3830
3831        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3832            assert_eq!(steps.len(), 1);
3833            assert!(matches!(&steps[0], BuilderStep::Aggregate { .. }));
3834        } else {
3835            panic!("Expected Split step");
3836        }
3837    }
3838
3839    // ── Throttle: steps collected inside throttle scope ─────────────────────────
3840
3841    #[test]
3842    fn test_throttle_builder_with_steps_inside() {
3843        let definition = RouteBuilder::from("timer:tick")
3844            .route_id("throttle-steps")
3845            .throttle(10, Duration::from_secs(1))
3846            .set_header("throttled", Value::Bool(true))
3847            .to("mock:throttled")
3848            .end_throttle()
3849            .build()
3850            .unwrap();
3851
3852        if let BuilderStep::Throttle { steps, .. } = &definition.steps()[0] {
3853            assert_eq!(steps.len(), 2);
3854        } else {
3855            panic!("Expected Throttle step");
3856        }
3857    }
3858
3859    // ── LoadBalance: steps collected inside scope ───────────────────────────────
3860
3861    #[test]
3862    fn test_load_balance_builder_with_steps_inside() {
3863        let definition = RouteBuilder::from("timer:tick")
3864            .route_id("lb-steps")
3865            .load_balance()
3866            .round_robin()
3867            .set_header("lb", Value::Bool(true))
3868            .to("mock:lb")
3869            .end_load_balance()
3870            .build()
3871            .unwrap();
3872
3873        if let BuilderStep::LoadBalance { steps, .. } = &definition.steps()[0] {
3874            assert_eq!(steps.len(), 2);
3875        } else {
3876            panic!("Expected LoadBalance step");
3877        }
3878    }
3879
3880    // ── Multicast: steps collected inside scope ─────────────────────────────────
3881
3882    #[test]
3883    fn test_multicast_builder_with_steps_inside() {
3884        let definition = RouteBuilder::from("timer:tick")
3885            .route_id("multicast-steps")
3886            .multicast()
3887            .set_header("mc", Value::Bool(true))
3888            .to("mock:multicast")
3889            .end_multicast()
3890            .build()
3891            .unwrap();
3892
3893        if let BuilderStep::Multicast { steps, .. } = &definition.steps()[0] {
3894            assert_eq!(steps.len(), 2);
3895        } else {
3896            panic!("Expected Multicast step");
3897        }
3898    }
3899
3900    // ── LoopBuilder: steps collected inside loop scope ──────────────────────────
3901
3902    #[test]
3903    fn test_loop_builder_with_steps_inside() {
3904        let definition = RouteBuilder::from("timer:tick")
3905            .route_id("loop-steps")
3906            .loop_count(3)
3907            .set_header("loop", Value::Bool(true))
3908            .to("mock:loop")
3909            .end_loop()
3910            .build()
3911            .unwrap();
3912
3913        if let BuilderStep::Loop { steps, .. } = &definition.steps()[0] {
3914            assert_eq!(steps.len(), 2);
3915        } else {
3916            panic!("Expected Loop step");
3917        }
3918    }
3919
3920    // ── canonical_step_name coverage for remaining variants ─────────────────────
3921
3922    #[test]
3923    fn test_build_canonical_rejects_loop_step() {
3924        let err = RouteBuilder::from("direct:start")
3925            .route_id("canonical-loop")
3926            .loop_count(3)
3927            .to("mock:loop")
3928            .end_loop()
3929            .build_canonical()
3930            .unwrap_err();
3931
3932        assert!(format!("{err}").contains("does not support step `loop`"));
3933    }
3934
3935    #[test]
3936    fn test_build_canonical_rejects_multicast_step() {
3937        let err = RouteBuilder::from("direct:start")
3938            .route_id("canonical-multicast")
3939            .multicast()
3940            .to("mock:a")
3941            .end_multicast()
3942            .build_canonical()
3943            .unwrap_err();
3944
3945        assert!(format!("{err}").contains("does not support step `multicast`"));
3946    }
3947
3948    #[test]
3949    fn test_build_canonical_rejects_throttle_step() {
3950        let err = RouteBuilder::from("direct:start")
3951            .route_id("canonical-throttle")
3952            .throttle(10, Duration::from_secs(1))
3953            .to("mock:result")
3954            .end_throttle()
3955            .build_canonical()
3956            .unwrap_err();
3957
3958        assert!(format!("{err}").contains("does not support step `throttle`"));
3959    }
3960
3961    #[test]
3962    fn test_build_canonical_rejects_load_balancer_step() {
3963        let err = RouteBuilder::from("direct:start")
3964            .route_id("canonical-lb")
3965            .load_balance()
3966            .round_robin()
3967            .to("mock:result")
3968            .end_load_balance()
3969            .build_canonical()
3970            .unwrap_err();
3971
3972        assert!(format!("{err}").contains("does not support step `load_balancer`"));
3973    }
3974
3975    #[test]
3976    fn test_build_canonical_rejects_bean_step() {
3977        let err = RouteBuilder::from("direct:start")
3978            .route_id("canonical-bean")
3979            .bean("myBean", "process")
3980            .build_canonical()
3981            .unwrap_err();
3982
3983        assert!(format!("{err}").contains("does not support step `bean`"));
3984    }
3985
3986    #[test]
3987    fn test_build_canonical_rejects_script_step() {
3988        let err = RouteBuilder::from("direct:start")
3989            .route_id("canonical-script")
3990            .script("rhai", "x = 1")
3991            .build_canonical()
3992            .unwrap_err();
3993
3994        assert!(format!("{err}").contains("does not support step `script`"));
3995    }
3996
3997    #[test]
3998    fn test_build_canonical_accepts_delay_step() {
3999        let spec = RouteBuilder::from("direct:start")
4000            .route_id("canonical-delay")
4001            .delay(Duration::from_millis(100))
4002            .build_canonical()
4003            .unwrap();
4004
4005        assert!(
4006            spec.steps.iter().any(
4007                |s| matches!(s, CanonicalStepSpec::Delay { delay_ms, .. } if *delay_ms == 100)
4008            )
4009        );
4010    }
4011
4012    #[test]
4013    fn test_build_canonical_accepts_wire_tap_step() {
4014        let spec = RouteBuilder::from("direct:start")
4015            .route_id("canonical-wiretap")
4016            .wire_tap("mock:tap")
4017            .build_canonical()
4018            .unwrap();
4019
4020        assert!(
4021            spec.steps
4022                .iter()
4023                .any(|s| matches!(s, CanonicalStepSpec::WireTap { uri } if uri == "mock:tap"))
4024        );
4025    }
4026
4027    #[test]
4028    fn test_build_canonical_rejects_dynamic_router_step() {
4029        let err = RouteBuilder::from("direct:start")
4030            .route_id("canonical-dyn-router")
4031            .dynamic_router(Arc::new(|_| Some("mock:a".to_string())))
4032            .build_canonical()
4033            .unwrap_err();
4034
4035        assert!(format!("{err}").contains("does not support step `dynamic_router`"));
4036    }
4037
4038    #[test]
4039    fn test_build_canonical_rejects_routing_slip_step() {
4040        let err = RouteBuilder::from("direct:start")
4041            .route_id("canonical-routing-slip")
4042            .routing_slip(Arc::new(|_| Some("mock:a".to_string())))
4043            .build_canonical()
4044            .unwrap_err();
4045
4046        assert!(format!("{err}").contains("does not support step `routing_slip`"));
4047    }
4048
4049    #[test]
4050    fn test_build_canonical_rejects_recipient_list_step() {
4051        let err = RouteBuilder::from("direct:start")
4052            .route_id("canonical-recipient")
4053            .recipient_list(Arc::new(|_| "mock:a".to_string()))
4054            .build_canonical()
4055            .unwrap_err();
4056
4057        assert!(format!("{err}").contains("does not support step `recipient_list`"));
4058    }
4059
4060    // ── extract_completion_fields: Any mode with predicate → error ──────────────
4061
4062    #[test]
4063    fn test_build_canonical_rejects_any_mode_with_predicate() {
4064        let err = RouteBuilder::from("direct:start")
4065            .route_id("canonical-any-pred")
4066            .aggregate(AggregatorConfig {
4067                header_name: "key".to_string(),
4068                completion: CompletionMode::Any(vec![
4069                    CompletionCondition::Size(5),
4070                    CompletionCondition::Predicate(Arc::new(|_| false)),
4071                ]),
4072                correlation: CorrelationStrategy::HeaderName("key".to_string()),
4073                strategy: AggregationStrategy::CollectAll,
4074                max_buckets: None,
4075                bucket_ttl: None,
4076                force_completion_on_stop: false,
4077                discard_on_timeout: false,
4078            })
4079            .build_canonical()
4080            .unwrap_err();
4081
4082        assert!(format!("{err}").contains("predicate completion"));
4083    }
4084
4085    // ── BUILDER-004: Validation errors for missing required fields ────────────
4086
4087    #[test]
4088    fn test_builder_validation_missing_from_uri() {
4089        let result = RouteBuilder::from("")
4090            .route_id("missing-uri-route")
4091            .to("log:info")
4092            .build();
4093        assert!(result.is_err(), "empty from URI should fail validation");
4094        let err = result.err().unwrap().to_string();
4095        assert!(
4096            err.contains("'from'") || err.contains("URI"),
4097            "error should mention from/URI, got: {err}"
4098        );
4099    }
4100
4101    #[test]
4102    fn test_builder_validation_invalid_step_uri_scheme() {
4103        let result = RouteBuilder::from("timer:tick")
4104            .route_id("bad-step-route")
4105            .to("not-a-valid-uri") // no scheme
4106            .build();
4107        // The builder itself accepts any URI string; validation happens at
4108        // resolution time. Verify the build succeeds (step URI is deferred).
4109        assert!(
4110            result.is_ok(),
4111            "builder should accept opaque step URIs; resolution happens later"
4112        );
4113    }
4114
4115    // ── BUILDER-006: Duplicate route IDs ──────────────────────────────────────
4116
4117    #[test]
4118    fn test_builder_duplicate_route_ids_produce_identical_definitions() {
4119        // The builder itself doesn't check for duplicates (that's context-level).
4120        // Verify both builds succeed with the same ID — detection is TODO(BUILDER-006).
4121        let route1 = RouteBuilder::from("direct:a")
4122            .route_id("dup-route")
4123            .to("mock:out")
4124            .build();
4125        let route2 = RouteBuilder::from("direct:b")
4126            .route_id("dup-route")
4127            .to("mock:out")
4128            .build();
4129
4130        assert!(route1.is_ok());
4131        assert!(route2.is_ok());
4132        assert_eq!(route1.unwrap().route_id(), route2.unwrap().route_id());
4133    }
4134}