Skip to main content

camel_builder/
lib.rs

1//! Fluent builder API for constructing Camel routes programmatically with EIP patterns.
2//!
3//! Main types: `RouteBuilder`, `StepAccumulator`, `SplitBuilder`, `ChoiceBuilder`, `MulticastBuilder`,
4//! `ThrottleBuilder`, `LoopBuilder`, `LoadBalancerBuilder`, `OnExceptionBuilder`.
5
6use camel_api::DelayConfig;
7use camel_api::aggregator::{
8    AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
9};
10use camel_api::body::Body;
11use camel_api::body_converter::BodyType;
12use camel_api::circuit_breaker::CircuitBreakerConfig;
13use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
14use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
15use camel_api::load_balancer::LoadBalancerConfig;
16use camel_api::loop_eip::{LoopConfig, LoopMode};
17use camel_api::multicast::{MulticastConfig, MulticastStrategy};
18use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
19use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
20use camel_api::splitter::SplitterConfig;
21use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
22use camel_api::{
23    BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
24    ProcessorFn, Value,
25    runtime::{
26        CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
27        CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
28        CanonicalWhenSpec,
29    },
30};
31use camel_component_api::ConcurrencyModel;
32use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
33use camel_processor::{
34    ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
35    StreamCacheService, UnmarshalService, builtin_data_format,
36};
37
38// ── Module declarations ─────────────────────────────────────────────────────
39pub mod do_try;
40pub use do_try::{DoCatchBuilder, DoFinallyBuilder, DoTryBuilder};
41
42/// Shared step-accumulation methods for all builder types.
43///
44/// Implementors provide `steps_mut()` and get step-adding methods for free.
45/// `filter()` and other branching methods are NOT included — they return
46/// different types per builder and stay as per-builder methods.
47pub trait StepAccumulator: Sized {
48    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
49
50    fn to(mut self, endpoint: impl Into<String>) -> Self {
51        self.steps_mut().push(BuilderStep::To(endpoint.into()));
52        self
53    }
54
55    fn process<F, Fut>(mut self, f: F) -> Self
56    where
57        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
58        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
59    {
60        let svc = ProcessorFn::new(f);
61        self.steps_mut()
62            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
63        self
64    }
65
66    fn process_fn(mut self, processor: BoxProcessor) -> Self {
67        self.steps_mut().push(BuilderStep::Processor(processor));
68        self
69    }
70
71    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
72        let svc = SetHeader::new(IdentityProcessor, key, value);
73        self.steps_mut()
74            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
75        self
76    }
77
78    fn map_body<F>(mut self, mapper: F) -> Self
79    where
80        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
81    {
82        let svc = MapBody::new(IdentityProcessor, mapper);
83        self.steps_mut()
84            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
85        self
86    }
87
88    fn set_body<B>(mut self, body: B) -> Self
89    where
90        B: Into<Body> + Clone + Send + Sync + 'static,
91    {
92        let body: Body = body.into();
93        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
94        self.steps_mut()
95            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
96        self
97    }
98
99    /// Apache Camel-compatible alias for [`set_body`](Self::set_body).
100    ///
101    /// Transforms the message body using the given value. Semantically identical
102    /// to `set_body` — provided for familiarity with Apache Camel route DSLs.
103    fn transform<B>(self, body: B) -> Self
104    where
105        B: Into<Body> + Clone + Send + Sync + 'static,
106    {
107        self.set_body(body)
108    }
109
110    fn set_body_fn<F>(mut self, expr: F) -> Self
111    where
112        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
113    {
114        let svc = SetBody::new(IdentityProcessor, expr);
115        self.steps_mut()
116            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
117        self
118    }
119
120    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
121    where
122        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
123    {
124        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
125        self.steps_mut()
126            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
127        self
128    }
129
130    fn aggregate(mut self, config: AggregatorConfig) -> Self {
131        self.steps_mut().push(BuilderStep::Aggregate { config });
132        self
133    }
134
135    /// Stop processing this exchange immediately. No further steps in the
136    /// current pipeline will run.
137    ///
138    /// Can be used at any point in the route: directly on RouteBuilder,
139    /// inside `.filter()`, inside `.split()`, etc.
140    fn stop(mut self) -> Self {
141        self.steps_mut().push(BuilderStep::Stop);
142        self
143    }
144
145    fn delay(mut self, duration: std::time::Duration) -> Self {
146        self.steps_mut().push(BuilderStep::Delay {
147            config: DelayConfig::from_duration(duration),
148        });
149        self
150    }
151
152    fn delay_with_header(
153        mut self,
154        duration: std::time::Duration,
155        header: impl Into<String>,
156    ) -> Self {
157        self.steps_mut().push(BuilderStep::Delay {
158            config: DelayConfig::from_duration_with_header(duration, header),
159        });
160        self
161    }
162
163    /// Log a message at the specified level.
164    ///
165    /// The message will be logged when an exchange passes through this step.
166    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
167        self.steps_mut().push(BuilderStep::Log {
168            level,
169            message: message.into(),
170        });
171        self
172    }
173
174    /// Convert the message body to the target type.
175    ///
176    /// Supported: Text ↔ Json ↔ Bytes. `Body::Stream` always fails.
177    /// Returns `TypeConversionFailed` if conversion is not possible.
178    ///
179    /// # Example
180    /// ```ignore
181    /// route.set_body(Value::String(r#"{"x":1}"#.into()))
182    ///      .convert_body_to(BodyType::Json)
183    ///      .to("direct:next")
184    /// ```
185    fn convert_body_to(mut self, target: BodyType) -> Self {
186        let svc = ConvertBodyTo::new(IdentityProcessor, target);
187        self.steps_mut()
188            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
189        self
190    }
191
192    fn stream_cache(mut self, threshold: usize) -> Self {
193        let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
194        let svc = StreamCacheService::new(IdentityProcessor, config);
195        self.steps_mut()
196            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
197        self
198    }
199
200    /// Materialize `Body::Stream` into `Body::Bytes` using the default threshold (128 KB).
201    ///
202    /// Equivalent to `.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)`.
203    fn stream_cache_default(self) -> Self {
204        self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
205    }
206
207    /// Marshal the message body using the specified data format.
208    ///
209    /// Supported formats: `"json"`, `"xml"`. Returns `Err(CamelError::Config)` if
210    /// the format name is unknown.
211    /// Converts a structured body (e.g., `Body::Json`) to a wire-format body (e.g., `Body::Text`).
212    ///
213    /// # Example
214    /// ```ignore
215    /// route.marshal("json")?.to("direct:next")
216    /// ```
217    fn marshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
218        let name = format.into();
219        let df = builtin_data_format(&name)
220            .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
221        let svc = MarshalService::new(IdentityProcessor, df);
222        self.steps_mut()
223            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
224        Ok(self)
225    }
226
227    /// Unmarshal the message body using the specified data format.
228    ///
229    /// Supported formats: `"json"`, `"xml"`. Returns `Err(CamelError::Config)` if
230    /// the format name is unknown.
231    /// Converts a wire-format body (e.g., `Body::Text`) to a structured body (e.g., `Body::Json`).
232    ///
233    /// # Example
234    /// ```ignore
235    /// route.unmarshal("json")?.to("direct:next")
236    /// ```
237    fn unmarshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
238        let name = format.into();
239        let df = builtin_data_format(&name)
240            .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
241        let svc = UnmarshalService::new(IdentityProcessor, df);
242        self.steps_mut()
243            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
244        Ok(self)
245    }
246
247    /// Validate the exchange body against a schema file.
248    ///
249    /// Shorthand for `to("validator:path")`. Supports XSD, JSON Schema, and YAML.
250    ///
251    /// # Example
252    /// ```ignore
253    /// route.validate("schemas/order.xsd").to("direct:out")
254    /// ```
255    fn validate(mut self, schema_path: impl Into<String>) -> Self {
256        let path = schema_path.into();
257        let uri = if path.starts_with("validator:") {
258            path
259        } else {
260            format!("validator:{path}")
261        };
262        self.steps_mut().push(BuilderStep::To(uri));
263        self
264    }
265
266    /// Execute a script that can modify the exchange (headers, properties, body).
267    ///
268    /// The script has access to `headers`, `properties`, and `body` variables
269    /// and can modify them with assignment syntax: `headers["k"] = v`.
270    ///
271    /// # Example
272    /// ```ignore
273    /// // ignore: requires full CamelContext setup with registered language
274    /// route.script("rhai", r#"headers["tenant"] = "acme"; body = body + "_processed""#)
275    /// ```
276    fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
277        self.steps_mut().push(BuilderStep::Script {
278            language: language.into(),
279            script: script.into(),
280        });
281        self
282    }
283
284    /// EIP-7 enrich: synchronous content enrichment via a resolved producer.
285    ///
286    /// Calls the given endpoint URI as a producer, then merges the response
287    /// back into the original exchange body using the default `UseEnrichedBody`
288    /// strategy (original headers/properties are preserved).
289    fn enrich(mut self, uri: impl Into<String>) -> Self {
290        self.steps_mut().push(BuilderStep::Enrich {
291            uri: uri.into(),
292            strategy: None,
293            timeout_ms: None,
294        });
295        self
296    }
297
298    /// EIP-7 pollEnrich: blocking poll of a PollingConsumer with timeout.
299    ///
300    /// Reads from a polling endpoint (e.g., file) and merges the result
301    /// into the exchange body using the default `UseEnrichedBody` strategy.
302    /// `timeout_ms` controls how long to wait for data.
303    fn poll_enrich(mut self, uri: impl Into<String>, timeout_ms: u64) -> Self {
304        self.steps_mut().push(BuilderStep::PollEnrich {
305            uri: uri.into(),
306            strategy: None,
307            timeout_ms: Some(timeout_ms),
308        });
309        self
310    }
311
312    fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
313        self.steps_mut().push(BuilderStep::Bean {
314            name: name.into(),
315            method: method.into(),
316        });
317        self
318    }
319}
320
321/// A fluent builder for constructing routes.
322///
323/// # Example
324///
325/// ```ignore
326/// let definition = RouteBuilder::from("timer:tick?period=1000")
327///     .set_header("source", Value::String("timer".into()))
328///     .filter(|ex| ex.input.body.as_text().is_some())
329///     .to("log:info?showHeaders=true")
330///     .build()?;
331/// ```
332// TODO(BUILDER-003): RouteBuilder does not support clone-and-reuse semantics.
333// Once built, the builder is consumed. Add `Clone` or a `fork()` method to allow
334// reusing a partially-built route as a template for multiple routes.
335pub struct RouteBuilder {
336    from_uri: String,
337    steps: Vec<BuilderStep>,
338    error_handler: Option<ErrorHandlerConfig>,
339    error_handler_mode: ErrorHandlerMode,
340    circuit_breaker_config: Option<CircuitBreakerConfig>,
341    security_policy_config: Option<camel_api::security_policy::SecurityPolicyConfig>,
342    security_authenticator: Option<std::sync::Arc<dyn camel_auth::TokenAuthenticator>>,
343    concurrency: Option<ConcurrencyModel>,
344    route_id: Option<String>,
345    auto_startup: Option<bool>,
346    startup_order: Option<i32>,
347}
348
349#[derive(Default)]
350enum ErrorHandlerMode {
351    #[default]
352    None,
353    ExplicitConfig,
354    Shorthand {
355        dlc_uri: Option<String>,
356        specs: Vec<OnExceptionSpec>,
357    },
358    Mixed,
359}
360
361#[derive(Clone)]
362struct OnExceptionSpec {
363    matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
364    retry: Option<RedeliveryPolicy>,
365    handled_by: Option<String>,
366}
367
368impl RouteBuilder {
369    /// Start building a route from the given source endpoint URI.
370    pub fn from(endpoint: &str) -> Self {
371        Self {
372            from_uri: endpoint.to_string(),
373            steps: Vec::new(),
374            error_handler: None,
375            error_handler_mode: ErrorHandlerMode::None,
376            circuit_breaker_config: None,
377            security_policy_config: None,
378            security_authenticator: None,
379            concurrency: None,
380            route_id: None,
381            auto_startup: None,
382            startup_order: None,
383        }
384    }
385
386    /// Open a filter scope. Only exchanges matching `predicate` will be processed
387    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
388    /// and continue to steps after `.end_filter()`.
389    pub fn filter<F>(self, predicate: F) -> FilterBuilder
390    where
391        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
392    {
393        FilterBuilder {
394            parent: self,
395            predicate: std::sync::Arc::new(predicate),
396            steps: vec![],
397        }
398    }
399
400    /// Open a choice scope for content-based routing.
401    ///
402    /// Within the choice, you can define multiple `.when()` clauses and an
403    /// optional `.otherwise()` clause. The first matching `when` predicate
404    /// determines which sub-pipeline executes.
405    pub fn choice(self) -> ChoiceBuilder {
406        ChoiceBuilder {
407            parent: self,
408            whens: vec![],
409            _otherwise: None,
410        }
411    }
412
413    /// Add a WireTap step that sends a clone of the exchange to the given
414    /// endpoint URI (fire-and-forget). The original exchange continues
415    /// downstream unchanged.
416    pub fn wire_tap(mut self, endpoint: &str) -> Self {
417        self.steps.push(BuilderStep::WireTap {
418            uri: endpoint.to_string(),
419        });
420        self
421    }
422
423    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
424    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
425        self.error_handler_mode = match self.error_handler_mode {
426            ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
427                ErrorHandlerMode::ExplicitConfig
428            }
429            ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
430        };
431        self.error_handler = Some(config);
432        self
433    }
434
435    /// Set a dead letter channel URI for shorthand error handler mode.
436    pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
437        let uri = uri.into();
438        self.error_handler_mode = match self.error_handler_mode {
439            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
440                dlc_uri: Some(uri),
441                specs: Vec::new(),
442            },
443            ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
444                dlc_uri: Some(uri),
445                specs,
446            },
447            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
448        };
449        self
450    }
451
452    /// Add a shorthand exception policy scope. Call `.end_on_exception()` to return to route builder.
453    pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
454    where
455        F: Fn(&CamelError) -> bool + Send + Sync + 'static,
456    {
457        self.error_handler_mode = match self.error_handler_mode {
458            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
459                dlc_uri: None,
460                specs: Vec::new(),
461            },
462            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
463            shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
464        };
465
466        OnExceptionBuilder {
467            parent: self,
468            policy: OnExceptionSpec {
469                matches: std::sync::Arc::new(matches),
470                retry: None,
471                handled_by: None,
472            },
473        }
474    }
475
476    /// Set a circuit breaker for this route.
477    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
478        self.circuit_breaker_config = Some(config);
479        self
480    }
481
482    pub fn security_policy(
483        mut self,
484        config: camel_api::security_policy::SecurityPolicyConfig,
485    ) -> Self {
486        self.security_policy_config = Some(config);
487        self
488    }
489
490    pub fn security_authenticator(
491        mut self,
492        auth: std::sync::Arc<dyn camel_auth::TokenAuthenticator>,
493    ) -> Self {
494        self.security_authenticator = Some(auth);
495        self
496    }
497
498    /// Override the consumer's default concurrency model.
499    ///
500    /// When set, the pipeline spawns a task per exchange, processing them
501    /// concurrently. `max` limits the number of simultaneously active
502    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
503    ///
504    /// # Example
505    /// ```ignore
506    /// RouteBuilder::from("http://0.0.0.0:8080/api")
507    ///     .concurrent(16)  // max 16 in-flight pipeline executions
508    ///     .process(handle_request)
509    ///     .build()
510    /// ```
511    pub fn concurrent(mut self, max: usize) -> Self {
512        let max = if max == 0 { None } else { Some(max) };
513        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
514        self
515    }
516
517    /// Force sequential processing, overriding a concurrent-capable consumer.
518    ///
519    /// Useful for HTTP routes that mutate shared state and need ordering
520    /// guarantees.
521    pub fn sequential(mut self) -> Self {
522        self.concurrency = Some(ConcurrencyModel::Sequential);
523        self
524    }
525
526    /// Set the route ID for this route.
527    ///
528    /// If not set, the route will be assigned an auto-generated ID.
529    pub fn route_id(mut self, id: impl Into<String>) -> Self {
530        self.route_id = Some(id.into());
531        self
532    }
533
534    /// Set whether this route should automatically start when the context starts.
535    ///
536    /// Default is `true`.
537    pub fn auto_startup(mut self, auto: bool) -> Self {
538        self.auto_startup = Some(auto);
539        self
540    }
541
542    /// Set the startup order for this route.
543    ///
544    /// Routes with lower values start first. Default is 1000.
545    pub fn startup_order(mut self, order: i32) -> Self {
546        self.startup_order = Some(order);
547        self
548    }
549
550    /// Begin a Splitter sub-pipeline. Steps added after this call (until
551    /// `.end_split()`) will be executed per-fragment.
552    ///
553    /// Returns a `SplitBuilder` — you cannot call `.build()` until
554    /// `.end_split()` closes the split scope (enforced by the type system).
555    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
556        SplitBuilder {
557            parent: self,
558            config,
559            steps: Vec::new(),
560        }
561    }
562
563    /// Begin a Multicast sub-pipeline. Steps added after this call (until
564    /// `.end_multicast()`) will each receive a copy of the exchange.
565    ///
566    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
567    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
568    pub fn multicast(self) -> MulticastBuilder {
569        MulticastBuilder {
570            parent: self,
571            steps: Vec::new(),
572            config: MulticastConfig::new(),
573        }
574    }
575
576    /// Begin a Throttle sub-pipeline. Rate limits message processing to at most
577    /// `max_requests` per `period`. Steps inside the throttle scope are only
578    /// executed when the rate limit allows.
579    ///
580    /// Returns a `ThrottleBuilder` — you cannot call `.build()` until
581    /// `.end_throttle()` closes the throttle scope (enforced by the type system).
582    pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
583        ThrottleBuilder {
584            parent: self,
585            config: ThrottlerConfig::new(max_requests, period),
586            steps: Vec::new(),
587        }
588    }
589
590    /// Begin a Loop sub-pipeline that iterates a fixed number of times.
591    pub fn loop_count(self, count: usize) -> LoopBuilder {
592        LoopBuilder {
593            parent: self,
594            config: LoopConfig {
595                mode: LoopMode::Count(count),
596            },
597            steps: vec![],
598        }
599    }
600
601    /// Begin a Loop sub-pipeline that iterates while a predicate is true.
602    pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
603    where
604        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
605    {
606        LoopBuilder {
607            parent: self,
608            config: LoopConfig {
609                mode: LoopMode::While(std::sync::Arc::new(predicate)),
610            },
611            steps: vec![],
612        }
613    }
614
615    /// Begin a LoadBalance sub-pipeline. Distributes exchanges across multiple
616    /// endpoints using a configurable strategy (round-robin, random, weighted, failover).
617    ///
618    /// Returns a `LoadBalancerBuilder` — you cannot call `.build()` until
619    /// `.end_load_balance()` closes the load balance scope (enforced by the type system).
620    pub fn load_balance(self) -> LoadBalancerBuilder {
621        LoadBalancerBuilder {
622            parent: self,
623            config: LoadBalancerConfig::round_robin(),
624            steps: Vec::new(),
625        }
626    }
627
628    /// Add a dynamic router step that routes exchanges dynamically based on
629    /// expression evaluation at runtime.
630    ///
631    /// The expression receives the exchange and returns `Some(uri)` to route to
632    /// the next endpoint, or `None` to stop routing.
633    ///
634    /// # Example
635    /// ```ignore
636    /// RouteBuilder::from("timer:tick")
637    ///     .route_id("test-route")
638    ///     .dynamic_router(|ex| {
639    ///         ex.input.header("dest").and_then(|v| v.as_str().map(|s| s.to_string()))
640    ///     })
641    ///     .build()
642    /// ```
643    pub fn dynamic_router(self, expression: RouterExpression) -> Self {
644        self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
645    }
646
647    /// Add a dynamic router step with full configuration.
648    ///
649    /// Allows customization of URI delimiter, cache size, timeout, and other options.
650    pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
651        self.steps.push(BuilderStep::DynamicRouter { config });
652        self
653    }
654
655    pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
656        self.routing_slip_with_config(RoutingSlipConfig::new(expression))
657    }
658
659    pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
660        self.steps.push(BuilderStep::RoutingSlip { config });
661        self
662    }
663
664    pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
665        self.recipient_list_with_config(RecipientListConfig::new(expression))
666    }
667
668    pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
669        self.steps.push(BuilderStep::RecipientList { config });
670        self
671    }
672
673    /// Consume the builder and produce a [`RouteDefinition`].
674    // TODO(BUILDER-006): Validate duplicate route IDs. When a route with the same
675    // ID is already registered in the context, return `Err(CamelError::RouteError)`.
676    // Currently, duplicate IDs are silently accepted; detection should happen at
677    // `CamelContext::add_route_definition` time.
678    pub fn build(self) -> Result<RouteDefinition, CamelError> {
679        validate_uri(&self.from_uri)?;
680        let route_id = self
681            .route_id
682            .filter(|s| !s.trim().is_empty())
683            .ok_or_else(|| {
684                CamelError::RouteError(
685                    "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
686                        .to_string(),
687                )
688            })?;
689        let resolved_error_handler = match self.error_handler_mode {
690            ErrorHandlerMode::None => self.error_handler,
691            ErrorHandlerMode::ExplicitConfig => self.error_handler,
692            ErrorHandlerMode::Mixed => {
693                return Err(CamelError::RouteError(
694                    "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
695                ));
696            }
697            ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
698                let mut config = if let Some(uri) = dlc_uri {
699                    ErrorHandlerConfig::dead_letter_channel(uri)
700                } else {
701                    ErrorHandlerConfig::log_only()
702                };
703
704                for spec in specs {
705                    let matcher = spec.matches.clone();
706                    let mut builder = config.on_exception(move |e| matcher(e));
707
708                    if let Some(retry) = spec.retry {
709                        builder = builder.retry(retry.max_attempts).with_backoff(
710                            retry.initial_delay,
711                            retry.multiplier,
712                            retry.max_delay,
713                        );
714                        if retry.jitter_factor > 0.0 {
715                            builder = builder.with_jitter(retry.jitter_factor);
716                        }
717                    }
718
719                    if let Some(uri) = spec.handled_by {
720                        builder = builder.handled_by(uri);
721                    }
722
723                    config = builder.build();
724                }
725
726                Some(config)
727            }
728        };
729
730        let definition = RouteDefinition::new(self.from_uri, self.steps);
731        let definition = if let Some(eh) = resolved_error_handler {
732            definition.with_error_handler(eh)
733        } else {
734            definition
735        };
736        let definition = if let Some(cb) = self.circuit_breaker_config {
737            definition.with_circuit_breaker(cb)
738        } else {
739            definition
740        };
741        let definition = if let Some(sp) = self.security_policy_config {
742            definition.with_security_policy(sp)
743        } else {
744            definition
745        };
746        let definition = if let Some(auth) = self.security_authenticator {
747            definition.with_security_authenticator(auth)
748        } else {
749            definition
750        };
751        let definition = if let Some(concurrency) = self.concurrency {
752            definition.with_concurrency(concurrency)
753        } else {
754            definition
755        };
756        let definition = definition.with_route_id(route_id);
757        let definition = if let Some(auto) = self.auto_startup {
758            definition.with_auto_startup(auto)
759        } else {
760            definition
761        };
762        let definition = if let Some(order) = self.startup_order {
763            definition.with_startup_order(order)
764        } else {
765            definition
766        };
767        Ok(definition)
768    }
769
770    /// Compile this builder route into canonical spec.
771    pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
772        validate_uri(&self.from_uri)?;
773        let route_id = self
774            .route_id
775            .filter(|s| !s.trim().is_empty())
776            .ok_or_else(|| {
777                CamelError::RouteError(
778                    "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
779                        .to_string(),
780                )
781            })?;
782
783        let steps = canonicalize_steps(self.steps)?;
784        let circuit_breaker = self
785            .circuit_breaker_config
786            .map(canonicalize_circuit_breaker);
787
788        if self.security_policy_config.is_some() {
789            return Err(CamelError::RouteError(
790                "routes with security_policy cannot use the canonical/hot-reload path (not yet supported)"
791                    .into(),
792            ));
793        }
794
795        let spec = CanonicalRouteSpec {
796            route_id,
797            from: self.from_uri,
798            steps,
799            circuit_breaker,
800            auto_startup: None,
801            startup_order: None,
802            concurrency: None,
803            version: camel_api::CANONICAL_CONTRACT_VERSION,
804        };
805        spec.validate_contract()?;
806        Ok(spec)
807    }
808}
809
810pub struct OnExceptionBuilder {
811    parent: RouteBuilder,
812    policy: OnExceptionSpec,
813}
814
815impl OnExceptionBuilder {
816    pub fn retry(mut self, max_attempts: u32) -> Self {
817        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
818        self
819    }
820
821    pub fn with_backoff(
822        mut self,
823        initial: std::time::Duration,
824        multiplier: f64,
825        max: std::time::Duration,
826    ) -> Self {
827        if let Some(ref mut retry) = self.policy.retry {
828            retry.initial_delay = initial;
829            retry.multiplier = multiplier;
830            retry.max_delay = max;
831        } else {
832            tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
833        }
834        self
835    }
836
837    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
838        if let Some(ref mut retry) = self.policy.retry {
839            retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
840        } else {
841            tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
842        }
843        self
844    }
845
846    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
847        self.policy.handled_by = Some(uri.into());
848        self
849    }
850
851    pub fn end_on_exception(mut self) -> RouteBuilder {
852        if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
853            specs.push(self.policy);
854        }
855        self.parent
856    }
857}
858
859/// Validate that a URI is non-empty and contains a scheme component.
860fn validate_uri(uri: &str) -> Result<(), CamelError> {
861    let trimmed = uri.trim();
862    if trimmed.is_empty() {
863        return Err(CamelError::RouteError(
864            "route must have a 'from' URI".to_string(),
865        ));
866    }
867    if !trimmed.contains(':') {
868        return Err(CamelError::RouteError(
869            "URI must have a scheme (e.g. 'timer:tick')".to_string(),
870        ));
871    }
872    let scheme = trimmed.split(':').next().unwrap_or("");
873    if scheme.trim().is_empty() {
874        return Err(CamelError::RouteError(
875            "URI scheme must not be empty".to_string(),
876        ));
877    }
878    Ok(())
879}
880
881fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
882    let mut canonical = Vec::with_capacity(steps.len());
883    for step in steps {
884        canonical.push(canonicalize_step(step)?);
885    }
886    Ok(canonical)
887}
888
889fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
890    match step {
891        BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
892        BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
893        BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
894        BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
895        BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
896            delay_ms: config.delay_ms,
897            dynamic_header: config.dynamic_header,
898        }),
899        BuilderStep::DeclarativeScript { expression } => {
900            Ok(CanonicalStepSpec::Script { expression })
901        }
902        BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
903            predicate,
904            steps: canonicalize_steps(steps)?,
905        }),
906        BuilderStep::DeclarativeChoice { whens, otherwise } => {
907            let mut canonical_whens = Vec::with_capacity(whens.len());
908            for DeclarativeWhenStep { predicate, steps } in whens {
909                canonical_whens.push(CanonicalWhenSpec {
910                    predicate,
911                    steps: canonicalize_steps(steps)?,
912                });
913            }
914            let otherwise = match otherwise {
915                Some(steps) => Some(canonicalize_steps(steps)?),
916                None => None,
917            };
918            Ok(CanonicalStepSpec::Choice {
919                whens: canonical_whens,
920                otherwise,
921            })
922        }
923        BuilderStep::DeclarativeSplit {
924            expression,
925            aggregation,
926            parallel,
927            parallel_limit,
928            stop_on_exception,
929            steps,
930        } => Ok(CanonicalStepSpec::Split {
931            expression: CanonicalSplitExpressionSpec::Language(expression),
932            aggregation: canonicalize_split_aggregation(aggregation)?,
933            parallel,
934            parallel_limit,
935            stop_on_exception,
936            steps: canonicalize_steps(steps)?,
937        }),
938        BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate(
939            canonicalize_aggregate(config)?,
940        )),
941        other => {
942            let step_name = canonical_step_name(&other);
943            let detail = camel_api::canonical_contract_rejection_reason(step_name)
944                .unwrap_or("not included in canonical v1");
945            Err(CamelError::RouteError(format!(
946                "canonical v1 does not support step `{step_name}`: {detail}"
947            )))
948        }
949    }
950}
951
952fn canonicalize_split_aggregation(
953    strategy: camel_api::splitter::AggregationStrategy,
954) -> Result<CanonicalSplitAggregationSpec, CamelError> {
955    match strategy {
956        camel_api::splitter::AggregationStrategy::LastWins => {
957            Ok(CanonicalSplitAggregationSpec::LastWins)
958        }
959        camel_api::splitter::AggregationStrategy::CollectAll => {
960            Ok(CanonicalSplitAggregationSpec::CollectAll)
961        }
962        camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
963            "canonical v1 does not support custom split aggregation".to_string(),
964        )),
965        camel_api::splitter::AggregationStrategy::Original => {
966            Ok(CanonicalSplitAggregationSpec::Original)
967        }
968    }
969}
970
971fn extract_completion_fields(
972    mode: &CompletionMode,
973) -> Result<(Option<usize>, Option<u64>), CamelError> {
974    match mode {
975        CompletionMode::Single(cond) => match cond {
976            CompletionCondition::Size(n) => Ok((Some(*n), None)),
977            CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
978            CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
979                "canonical v1 does not support aggregate predicate completion".to_string(),
980            )),
981        },
982        CompletionMode::Any(conds) => {
983            let mut size = None;
984            let mut timeout_ms = None;
985            for cond in conds {
986                match cond {
987                    CompletionCondition::Size(n) => size = Some(*n),
988                    CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
989                    CompletionCondition::Predicate(_) => {
990                        return Err(CamelError::RouteError(
991                            "canonical v1 does not support aggregate predicate completion"
992                                .to_string(),
993                        ));
994                    }
995                }
996            }
997            Ok((size, timeout_ms))
998        }
999    }
1000}
1001
1002fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
1003    let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
1004
1005    let header = match &config.correlation {
1006        CorrelationStrategy::HeaderName(h) => h.clone(),
1007        CorrelationStrategy::Expression { expr, .. } => expr.clone(),
1008        CorrelationStrategy::Fn(_) => {
1009            return Err(CamelError::RouteError(
1010                "canonical v1 does not support Fn correlation strategy".to_string(),
1011            ));
1012        }
1013    };
1014
1015    let correlation_key = match &config.correlation {
1016        CorrelationStrategy::HeaderName(_) => None,
1017        CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
1018        CorrelationStrategy::Fn(_) => unreachable!(),
1019    };
1020
1021    let strategy = match config.strategy {
1022        AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
1023        AggregationStrategy::Custom(_) => {
1024            return Err(CamelError::RouteError(
1025                "canonical v1 does not support custom aggregate strategy".to_string(),
1026            ));
1027        }
1028    };
1029    let bucket_ttl_ms = config
1030        .bucket_ttl
1031        .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
1032
1033    Ok(CanonicalAggregateSpec {
1034        header,
1035        completion_size,
1036        completion_timeout_ms,
1037        correlation_key,
1038        force_completion_on_stop: if config.force_completion_on_stop {
1039            Some(true)
1040        } else {
1041            None
1042        },
1043        discard_on_timeout: if config.discard_on_timeout {
1044            Some(true)
1045        } else {
1046            None
1047        },
1048        strategy,
1049        max_buckets: config.max_buckets,
1050        bucket_ttl_ms,
1051    })
1052}
1053
1054fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
1055    CanonicalCircuitBreakerSpec {
1056        failure_threshold: config.failure_threshold,
1057        open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
1058    }
1059}
1060
1061fn canonical_step_name(step: &BuilderStep) -> &'static str {
1062    match step {
1063        BuilderStep::Processor(_) => "processor",
1064        BuilderStep::To(_) => "to",
1065        BuilderStep::Stop => "stop",
1066        BuilderStep::Log { .. } => "log",
1067        BuilderStep::DeclarativeSetHeader { .. } => "set_header",
1068        BuilderStep::DeclarativeSetBody { .. } => "set_body",
1069        BuilderStep::DeclarativeFilter { .. } => "filter",
1070        BuilderStep::DeclarativeChoice { .. } => "choice",
1071        BuilderStep::DeclarativeScript { .. } => "script",
1072        BuilderStep::DeclarativeFunction { .. } => "function",
1073        BuilderStep::DeclarativeSplit { .. } => "split",
1074        BuilderStep::Split { .. } => "split",
1075        BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
1076        BuilderStep::Aggregate { .. } => "aggregate",
1077        BuilderStep::Filter { .. } => "filter",
1078        BuilderStep::Choice { .. } => "choice",
1079        BuilderStep::WireTap { .. } => "wire_tap",
1080        BuilderStep::Delay { .. } => "delay",
1081        BuilderStep::Multicast { .. } => "multicast",
1082        BuilderStep::DeclarativeLog { .. } => "log",
1083        BuilderStep::Bean { .. } => "bean",
1084        BuilderStep::Script { .. } => "script",
1085        BuilderStep::Throttle { .. } => "throttle",
1086        BuilderStep::LoadBalance { .. } => "load_balancer",
1087        BuilderStep::DynamicRouter { .. } => "dynamic_router",
1088        BuilderStep::RoutingSlip { .. } => "routing_slip",
1089        BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
1090        BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
1091        BuilderStep::RecipientList { .. } => "recipient_list",
1092        BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
1093        BuilderStep::DeclarativeSetProperty { .. } => "set_property",
1094        BuilderStep::DeclarativeStreamSplit { .. } => "stream_split",
1095        BuilderStep::Enrich { .. } => "enrich",
1096        BuilderStep::PollEnrich { .. } => "poll_enrich",
1097        BuilderStep::DeclarativeDoTry { .. } => "do_try",
1098    }
1099}
1100
1101impl StepAccumulator for RouteBuilder {
1102    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1103        &mut self.steps
1104    }
1105}
1106
1107/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
1108///
1109/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1110/// but NOT `.build()` and NOT `.split()` (no nested splits).
1111///
1112/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
1113/// and returns the parent `RouteBuilder`.
1114pub struct SplitBuilder {
1115    parent: RouteBuilder,
1116    config: SplitterConfig,
1117    steps: Vec<BuilderStep>,
1118}
1119
1120impl SplitBuilder {
1121    /// Open a filter scope within the split sub-pipeline.
1122    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1123    where
1124        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1125    {
1126        FilterInSplitBuilder {
1127            parent: self,
1128            predicate: std::sync::Arc::new(predicate),
1129            steps: vec![],
1130        }
1131    }
1132
1133    /// Close the split scope. Packages the accumulated sub-steps into a
1134    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
1135    pub fn end_split(mut self) -> RouteBuilder {
1136        let split_step = BuilderStep::Split {
1137            config: self.config,
1138            steps: self.steps,
1139        };
1140        self.parent.steps.push(split_step);
1141        self.parent
1142    }
1143}
1144
1145impl StepAccumulator for SplitBuilder {
1146    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1147        &mut self.steps
1148    }
1149}
1150
1151/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
1152pub struct FilterBuilder {
1153    parent: RouteBuilder,
1154    predicate: FilterPredicate,
1155    steps: Vec<BuilderStep>,
1156}
1157
1158impl FilterBuilder {
1159    /// Close the filter scope. Packages the accumulated sub-steps into a
1160    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
1161    pub fn end_filter(mut self) -> RouteBuilder {
1162        let step = BuilderStep::Filter {
1163            predicate: self.predicate,
1164            steps: self.steps,
1165        };
1166        self.parent.steps.push(step);
1167        self.parent
1168    }
1169}
1170
1171impl StepAccumulator for FilterBuilder {
1172    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1173        &mut self.steps
1174    }
1175}
1176
1177/// Builder for a filter scope nested inside a `.split()` block.
1178pub struct FilterInSplitBuilder {
1179    parent: SplitBuilder,
1180    predicate: FilterPredicate,
1181    steps: Vec<BuilderStep>,
1182}
1183
1184impl FilterInSplitBuilder {
1185    /// Close the filter scope and return the parent `SplitBuilder`.
1186    pub fn end_filter(mut self) -> SplitBuilder {
1187        let step = BuilderStep::Filter {
1188            predicate: self.predicate,
1189            steps: self.steps,
1190        };
1191        self.parent.steps.push(step);
1192        self.parent
1193    }
1194}
1195
1196impl StepAccumulator for FilterInSplitBuilder {
1197    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1198        &mut self.steps
1199    }
1200}
1201
1202// ── Choice/When/Otherwise builders ─────────────────────────────────────────
1203
1204/// Builder for a `.choice()` ... `.end_choice()` block.
1205///
1206/// Accumulates `when` clauses and an optional `otherwise` clause.
1207/// Cannot call `.build()` until `.end_choice()` is called.
1208pub struct ChoiceBuilder {
1209    parent: RouteBuilder,
1210    whens: Vec<WhenStep>,
1211    _otherwise: Option<Vec<BuilderStep>>,
1212}
1213
1214impl ChoiceBuilder {
1215    /// Open a `when` clause. Only exchanges matching `predicate` will be
1216    /// processed by the steps inside the `.when()` ... `.end_when()` scope.
1217    pub fn when<F>(self, predicate: F) -> WhenBuilder
1218    where
1219        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1220    {
1221        WhenBuilder {
1222            parent: self,
1223            predicate: std::sync::Arc::new(predicate),
1224            steps: vec![],
1225        }
1226    }
1227
1228    /// Open an `otherwise` clause. Executed when no `when` predicate matched.
1229    ///
1230    /// Only one `otherwise` is allowed per `choice`. Call this after all `.when()` clauses.
1231    pub fn otherwise(self) -> OtherwiseBuilder {
1232        OtherwiseBuilder {
1233            parent: self,
1234            steps: vec![],
1235        }
1236    }
1237
1238    /// Close the choice scope. Packages all accumulated `when` clauses and
1239    /// optional `otherwise` into a `BuilderStep::Choice` and returns the
1240    /// parent `RouteBuilder`.
1241    pub fn end_choice(mut self) -> RouteBuilder {
1242        let step = BuilderStep::Choice {
1243            whens: self.whens,
1244            otherwise: self._otherwise,
1245        };
1246        self.parent.steps.push(step);
1247        self.parent
1248    }
1249}
1250
1251/// Builder for the sub-pipeline within a `.when()` ... `.end_when()` block.
1252pub struct WhenBuilder {
1253    parent: ChoiceBuilder,
1254    predicate: camel_api::FilterPredicate,
1255    steps: Vec<BuilderStep>,
1256}
1257
1258impl WhenBuilder {
1259    /// Close the when scope. Packages the accumulated sub-steps into a
1260    /// `WhenStep` and returns the parent `ChoiceBuilder`.
1261    pub fn end_when(mut self) -> ChoiceBuilder {
1262        self.parent.whens.push(WhenStep {
1263            predicate: self.predicate,
1264            steps: self.steps,
1265        });
1266        self.parent
1267    }
1268}
1269
1270impl StepAccumulator for WhenBuilder {
1271    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1272        &mut self.steps
1273    }
1274}
1275
1276/// Builder for the sub-pipeline within an `.otherwise()` ... `.end_otherwise()` block.
1277pub struct OtherwiseBuilder {
1278    parent: ChoiceBuilder,
1279    steps: Vec<BuilderStep>,
1280}
1281
1282impl OtherwiseBuilder {
1283    /// Close the otherwise scope and return the parent `ChoiceBuilder`.
1284    pub fn end_otherwise(self) -> ChoiceBuilder {
1285        let OtherwiseBuilder { mut parent, steps } = self;
1286        parent._otherwise = Some(steps);
1287        parent
1288    }
1289}
1290
1291impl StepAccumulator for OtherwiseBuilder {
1292    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1293        &mut self.steps
1294    }
1295}
1296
1297/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
1298///
1299/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1300/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
1301///
1302/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
1303/// and returns the parent `RouteBuilder`.
1304pub struct MulticastBuilder {
1305    parent: RouteBuilder,
1306    steps: Vec<BuilderStep>,
1307    config: MulticastConfig,
1308}
1309
1310impl MulticastBuilder {
1311    pub fn parallel(mut self, parallel: bool) -> Self {
1312        self.config = self.config.parallel(parallel);
1313        self
1314    }
1315
1316    pub fn parallel_limit(mut self, limit: usize) -> Self {
1317        self.config = self.config.parallel_limit(limit);
1318        self
1319    }
1320
1321    pub fn stop_on_exception(mut self, stop: bool) -> Self {
1322        self.config = self.config.stop_on_exception(stop);
1323        self
1324    }
1325
1326    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1327        self.config = self.config.timeout(duration);
1328        self
1329    }
1330
1331    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1332        self.config = self.config.aggregation(strategy);
1333        self
1334    }
1335
1336    pub fn end_multicast(mut self) -> RouteBuilder {
1337        let step = BuilderStep::Multicast {
1338            steps: self.steps,
1339            config: self.config,
1340        };
1341        self.parent.steps.push(step);
1342        self.parent
1343    }
1344}
1345
1346impl StepAccumulator for MulticastBuilder {
1347    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1348        &mut self.steps
1349    }
1350}
1351
1352/// Builder for the sub-pipeline within a `.throttle()` ... `.end_throttle()` block.
1353///
1354/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1355/// but NOT `.build()` and NOT `.throttle()` (no nested throttles).
1356///
1357/// Calling `.end_throttle()` packages the sub-steps into a `BuilderStep::Throttle`
1358/// and returns the parent `RouteBuilder`.
1359pub struct ThrottleBuilder {
1360    parent: RouteBuilder,
1361    config: ThrottlerConfig,
1362    steps: Vec<BuilderStep>,
1363}
1364
1365impl ThrottleBuilder {
1366    /// Set the throttle strategy. Default is `Delay`.
1367    ///
1368    /// - `Delay`: Queue messages until capacity available
1369    /// - `Reject`: Return error immediately when throttled
1370    /// - `Drop`: Silently discard excess messages
1371    pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1372        self.config = self.config.strategy(strategy);
1373        self
1374    }
1375
1376    /// Close the throttle scope. Packages the accumulated sub-steps into a
1377    /// `BuilderStep::Throttle` and returns the parent `RouteBuilder`.
1378    pub fn end_throttle(mut self) -> RouteBuilder {
1379        let step = BuilderStep::Throttle {
1380            config: self.config,
1381            steps: self.steps,
1382        };
1383        self.parent.steps.push(step);
1384        self.parent
1385    }
1386}
1387
1388impl StepAccumulator for ThrottleBuilder {
1389    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1390        &mut self.steps
1391    }
1392}
1393
1394/// Builder for the sub-pipeline within a `.loop_count()` / `.loop_while()` ... `.end_loop()` block.
1395pub struct LoopBuilder {
1396    parent: RouteBuilder,
1397    config: LoopConfig,
1398    steps: Vec<BuilderStep>,
1399}
1400
1401impl LoopBuilder {
1402    pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1403        LoopInLoopBuilder {
1404            parent: self,
1405            config: LoopConfig {
1406                mode: LoopMode::Count(count),
1407            },
1408            steps: vec![],
1409        }
1410    }
1411
1412    pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1413    where
1414        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1415    {
1416        LoopInLoopBuilder {
1417            parent: self,
1418            config: LoopConfig {
1419                mode: LoopMode::While(std::sync::Arc::new(predicate)),
1420            },
1421            steps: vec![],
1422        }
1423    }
1424
1425    pub fn end_loop(mut self) -> RouteBuilder {
1426        let step = BuilderStep::Loop {
1427            config: self.config,
1428            steps: self.steps,
1429        };
1430        self.parent.steps.push(step);
1431        self.parent
1432    }
1433}
1434
1435impl StepAccumulator for LoopBuilder {
1436    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1437        &mut self.steps
1438    }
1439}
1440
1441pub struct LoopInLoopBuilder {
1442    parent: LoopBuilder,
1443    config: LoopConfig,
1444    steps: Vec<BuilderStep>,
1445}
1446
1447impl LoopInLoopBuilder {
1448    pub fn end_loop(mut self) -> LoopBuilder {
1449        let step = BuilderStep::Loop {
1450            config: self.config,
1451            steps: self.steps,
1452        };
1453        self.parent.steps.push(step);
1454        self.parent
1455    }
1456}
1457
1458impl StepAccumulator for LoopInLoopBuilder {
1459    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1460        &mut self.steps
1461    }
1462}
1463
1464/// Builder for the sub-pipeline within a `.load_balance()` ... `.end_load_balance()` block.
1465///
1466/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1467/// but NOT `.build()` and NOT `.load_balance()` (no nested load balancers).
1468///
1469/// Calling `.end_load_balance()` packages the sub-steps into a `BuilderStep::LoadBalance`
1470/// and returns the parent `RouteBuilder`.
1471pub struct LoadBalancerBuilder {
1472    parent: RouteBuilder,
1473    config: LoadBalancerConfig,
1474    steps: Vec<BuilderStep>,
1475}
1476
1477impl LoadBalancerBuilder {
1478    /// Set the load balance strategy to round-robin (default).
1479    pub fn round_robin(mut self) -> Self {
1480        self.config = LoadBalancerConfig::round_robin();
1481        self
1482    }
1483
1484    /// Set the load balance strategy to random selection.
1485    pub fn random(mut self) -> Self {
1486        self.config = LoadBalancerConfig::random();
1487        self
1488    }
1489
1490    /// Set the load balance strategy to weighted selection.
1491    ///
1492    /// Each endpoint is assigned a weight that determines its probability
1493    /// of being selected.
1494    pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1495        self.config = LoadBalancerConfig::weighted(weights);
1496        self
1497    }
1498
1499    /// Set the load balance strategy to failover.
1500    ///
1501    /// Exchanges are sent to the first endpoint; on failure, the next endpoint
1502    /// is tried.
1503    pub fn failover(mut self) -> Self {
1504        self.config = LoadBalancerConfig::failover();
1505        self
1506    }
1507
1508    /// Enable or disable parallel execution of endpoints.
1509    ///
1510    /// When enabled, all endpoints receive the exchange simultaneously.
1511    /// When disabled (default), only one endpoint is selected per exchange.
1512    pub fn parallel(mut self, parallel: bool) -> Self {
1513        self.config = self.config.parallel(parallel);
1514        self
1515    }
1516
1517    /// Close the load balance scope. Packages the accumulated sub-steps into a
1518    /// `BuilderStep::LoadBalance` and returns the parent `RouteBuilder`.
1519    pub fn end_load_balance(mut self) -> RouteBuilder {
1520        let step = BuilderStep::LoadBalance {
1521            config: self.config,
1522            steps: self.steps,
1523        };
1524        self.parent.steps.push(step);
1525        self.parent
1526    }
1527}
1528
1529impl StepAccumulator for LoadBalancerBuilder {
1530    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1531        &mut self.steps
1532    }
1533}
1534
1535// ---------------------------------------------------------------------------
1536// Tests
1537// ---------------------------------------------------------------------------
1538
1539#[cfg(test)]
1540mod tests {
1541    use super::*;
1542    use camel_api::error_handler::ErrorHandlerConfig;
1543    use camel_api::load_balancer::LoadBalanceStrategy;
1544    use camel_api::{Exchange, Message};
1545    use camel_core::route::BuilderStep;
1546    use std::sync::Arc;
1547    use std::time::Duration;
1548    use tower::{Service, ServiceExt};
1549
1550    #[test]
1551    fn test_builder_from_creates_definition() {
1552        let definition = RouteBuilder::from("timer:tick")
1553            .route_id("test-route")
1554            .build()
1555            .unwrap();
1556        assert_eq!(definition.from_uri(), "timer:tick");
1557    }
1558
1559    #[test]
1560    fn test_builder_empty_from_uri_errors() {
1561        let result = RouteBuilder::from("").route_id("test-route").build();
1562        assert!(result.is_err());
1563    }
1564
1565    #[test]
1566    fn test_build_rejects_schemeless_uri() {
1567        let result = RouteBuilder::from("no-scheme-here")
1568            .route_id("test-route")
1569            .build();
1570        match result {
1571            Err(err) => {
1572                let err_msg = format!("{err}");
1573                assert!(
1574                    err_msg.contains("scheme"),
1575                    "expected scheme-related error, got: {err_msg}"
1576                );
1577            }
1578            Ok(_) => panic!("schemeless URI should fail"),
1579        }
1580    }
1581
1582    #[test]
1583    fn test_build_rejects_empty_scheme_uri() {
1584        let result = RouteBuilder::from(":missing-scheme")
1585            .route_id("test-route")
1586            .build();
1587        match result {
1588            Err(err) => {
1589                let err_msg = format!("{err}");
1590                assert!(
1591                    err_msg.contains("scheme"),
1592                    "expected scheme-related error, got: {err_msg}"
1593                );
1594            }
1595            Ok(_) => panic!("empty-scheme URI should fail"),
1596        }
1597    }
1598
1599    #[test]
1600    fn test_build_accepts_valid_uri() {
1601        let result = RouteBuilder::from("timer:tick")
1602            .route_id("test-route")
1603            .build();
1604        assert!(result.is_ok());
1605    }
1606
1607    #[test]
1608    fn test_build_canonical_rejects_schemeless_uri() {
1609        let result = RouteBuilder::from("no-scheme-here")
1610            .route_id("test-route")
1611            .build_canonical();
1612        assert!(result.is_err());
1613    }
1614
1615    #[test]
1616    fn test_builder_to_adds_step() {
1617        let definition = RouteBuilder::from("timer:tick")
1618            .route_id("test-route")
1619            .to("log:info")
1620            .build()
1621            .unwrap();
1622
1623        assert_eq!(definition.from_uri(), "timer:tick");
1624        // We can verify steps were added by checking the structure
1625        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1626    }
1627
1628    #[test]
1629    fn test_builder_filter_adds_filter_step() {
1630        let definition = RouteBuilder::from("timer:tick")
1631            .route_id("test-route")
1632            .filter(|_ex| true)
1633            .to("mock:result")
1634            .end_filter()
1635            .build()
1636            .unwrap();
1637
1638        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1639    }
1640
1641    #[test]
1642    fn test_builder_set_header_adds_processor_step() {
1643        let definition = RouteBuilder::from("timer:tick")
1644            .route_id("test-route")
1645            .set_header("key", Value::String("value".into()))
1646            .build()
1647            .unwrap();
1648
1649        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1650    }
1651
1652    #[test]
1653    fn test_builder_map_body_adds_processor_step() {
1654        let definition = RouteBuilder::from("timer:tick")
1655            .route_id("test-route")
1656            .map_body(|body| body)
1657            .build()
1658            .unwrap();
1659
1660        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1661    }
1662
1663    #[test]
1664    fn test_builder_process_adds_processor_step() {
1665        let definition = RouteBuilder::from("timer:tick")
1666            .route_id("test-route")
1667            .process(|ex| async move { Ok(ex) })
1668            .build()
1669            .unwrap();
1670
1671        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1672    }
1673
1674    #[test]
1675    fn test_builder_chain_multiple_steps() {
1676        let definition = RouteBuilder::from("timer:tick")
1677            .route_id("test-route")
1678            .set_header("source", Value::String("timer".into()))
1679            .filter(|ex| ex.input.header("source").is_some())
1680            .to("log:info")
1681            .end_filter()
1682            .to("mock:result")
1683            .build()
1684            .unwrap();
1685
1686        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
1687        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
1688        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
1689        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1690    }
1691
1692    #[test]
1693    fn test_loop_count_builder() {
1694        use camel_api::loop_eip::LoopMode;
1695
1696        let def = RouteBuilder::from("direct:start")
1697            .route_id("loop-test")
1698            .loop_count(3)
1699            .to("mock:inside")
1700            .end_loop()
1701            .to("mock:after")
1702            .build()
1703            .unwrap();
1704
1705        assert_eq!(def.steps().len(), 2);
1706        match &def.steps()[0] {
1707            BuilderStep::Loop { config, steps } => {
1708                assert!(matches!(config.mode, LoopMode::Count(3)));
1709                assert_eq!(steps.len(), 1);
1710            }
1711            other => panic!("Expected Loop, got {:?}", other),
1712        }
1713        assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1714    }
1715
1716    #[test]
1717    fn test_loop_while_builder() {
1718        use camel_api::loop_eip::LoopMode;
1719
1720        let def = RouteBuilder::from("direct:start")
1721            .route_id("loop-while-test")
1722            .loop_while(|_ex| true)
1723            .to("mock:retry")
1724            .end_loop()
1725            .build()
1726            .unwrap();
1727
1728        assert_eq!(def.steps().len(), 1);
1729        match &def.steps()[0] {
1730            BuilderStep::Loop { config, steps } => {
1731                assert!(matches!(config.mode, LoopMode::While(_)));
1732                assert_eq!(steps.len(), 1);
1733            }
1734            other => panic!("Expected Loop, got {:?}", other),
1735        }
1736    }
1737
1738    #[test]
1739    fn test_nested_loop_builder() {
1740        use camel_api::loop_eip::LoopMode;
1741
1742        let def = RouteBuilder::from("direct:start")
1743            .route_id("nested-loop-test")
1744            .loop_count(2)
1745            .to("mock:outer")
1746            .loop_count(3)
1747            .to("mock:inner")
1748            .end_loop()
1749            .end_loop()
1750            .to("mock:after")
1751            .build()
1752            .unwrap();
1753
1754        assert_eq!(def.steps().len(), 2);
1755        match &def.steps()[0] {
1756            BuilderStep::Loop { steps, .. } => {
1757                assert_eq!(steps.len(), 2);
1758                match &steps[1] {
1759                    BuilderStep::Loop {
1760                        config,
1761                        steps: inner_steps,
1762                    } => {
1763                        assert!(matches!(config.mode, LoopMode::Count(3)));
1764                        assert_eq!(inner_steps.len(), 1);
1765                    }
1766                    other => panic!("Expected nested Loop, got {:?}", other),
1767                }
1768            }
1769            other => panic!("Expected outer Loop, got {:?}", other),
1770        }
1771    }
1772
1773    // -----------------------------------------------------------------------
1774    // Processor behavior tests — exercise the real Tower services directly
1775    // -----------------------------------------------------------------------
1776
1777    #[tokio::test]
1778    async fn test_set_header_processor_works() {
1779        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1780        let exchange = Exchange::new(Message::new("test"));
1781        let result = svc.call(exchange).await.unwrap();
1782        assert_eq!(
1783            result.input.header("greeting"),
1784            Some(&Value::String("hello".into()))
1785        );
1786    }
1787
1788    #[tokio::test]
1789    async fn test_filter_processor_passes() {
1790        use camel_api::BoxProcessorExt;
1791        use camel_processor::FilterService;
1792
1793        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1794        let mut svc =
1795            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1796        let exchange = Exchange::new(Message::new("pass"));
1797        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1798        assert_eq!(result.input.body.as_text(), Some("pass"));
1799    }
1800
1801    #[tokio::test]
1802    async fn test_filter_processor_blocks() {
1803        use camel_api::BoxProcessorExt;
1804        use camel_processor::FilterService;
1805
1806        let sub = BoxProcessor::from_fn(|_ex| {
1807            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1808        });
1809        let mut svc =
1810            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1811        let exchange = Exchange::new(Message::new("reject"));
1812        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1813        assert_eq!(result.input.body.as_text(), Some("reject"));
1814    }
1815
1816    #[tokio::test]
1817    async fn test_map_body_processor_works() {
1818        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1819            if let Some(text) = body.as_text() {
1820                Body::Text(text.to_uppercase())
1821            } else {
1822                body
1823            }
1824        });
1825        let exchange = Exchange::new(Message::new("hello"));
1826        let result = mapper.oneshot(exchange).await.unwrap();
1827        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1828    }
1829
1830    #[tokio::test]
1831    async fn test_process_custom_processor_works() {
1832        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1833            ex.set_property("custom", Value::Bool(true));
1834            Ok(ex)
1835        });
1836        let exchange = Exchange::new(Message::default());
1837        let result = processor.oneshot(exchange).await.unwrap();
1838        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1839    }
1840
1841    // -----------------------------------------------------------------------
1842    // Sequential pipeline test
1843    // -----------------------------------------------------------------------
1844
1845    #[tokio::test]
1846    async fn test_compose_pipeline_runs_steps_in_order() {
1847        use camel_core::route::{CompiledStep, compose_pipeline};
1848
1849        let processors = vec![
1850            CompiledStep::Process {
1851                processor: BoxProcessor::new(SetHeader::new(
1852                    IdentityProcessor,
1853                    "step",
1854                    Value::String("one".into()),
1855                )),
1856                body_contract: None,
1857            },
1858            CompiledStep::Process {
1859                processor: BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1860                    if let Some(text) = body.as_text() {
1861                        Body::Text(format!("{}-processed", text))
1862                    } else {
1863                        body
1864                    }
1865                })),
1866                body_contract: None,
1867            },
1868        ];
1869
1870        let pipeline = compose_pipeline(processors);
1871        let exchange = Exchange::new(Message::new("hello"));
1872        let result = pipeline.oneshot(exchange).await.unwrap();
1873
1874        assert_eq!(
1875            result.input.header("step"),
1876            Some(&Value::String("one".into()))
1877        );
1878        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1879    }
1880
1881    #[tokio::test]
1882    async fn test_compose_pipeline_empty_is_identity() {
1883        use camel_core::route::compose_pipeline;
1884
1885        let pipeline = compose_pipeline(vec![]);
1886        let exchange = Exchange::new(Message::new("unchanged"));
1887        let result = pipeline.oneshot(exchange).await.unwrap();
1888        assert_eq!(result.input.body.as_text(), Some("unchanged"));
1889    }
1890
1891    // -----------------------------------------------------------------------
1892    // Circuit breaker builder tests
1893    // -----------------------------------------------------------------------
1894
1895    #[test]
1896    fn test_builder_circuit_breaker_sets_config() {
1897        use camel_api::circuit_breaker::CircuitBreakerConfig;
1898
1899        let config = CircuitBreakerConfig::new().failure_threshold(5);
1900        let definition = RouteBuilder::from("timer:tick")
1901            .route_id("test-route")
1902            .circuit_breaker(config)
1903            .build()
1904            .unwrap();
1905
1906        let cb = definition
1907            .circuit_breaker_config()
1908            .expect("circuit breaker should be set");
1909        assert_eq!(cb.failure_threshold, 5);
1910    }
1911
1912    #[test]
1913    fn test_builder_circuit_breaker_with_error_handler() {
1914        use camel_api::circuit_breaker::CircuitBreakerConfig;
1915        use camel_api::error_handler::ErrorHandlerConfig;
1916
1917        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1918        let eh_config = ErrorHandlerConfig::log_only();
1919
1920        let definition = RouteBuilder::from("timer:tick")
1921            .route_id("test-route")
1922            .to("log:info")
1923            .circuit_breaker(cb_config)
1924            .error_handler(eh_config)
1925            .build()
1926            .unwrap();
1927
1928        assert!(
1929            definition.circuit_breaker_config().is_some(),
1930            "circuit breaker config should be set"
1931        );
1932        // Route definition was built successfully with both configs.
1933    }
1934
1935    #[test]
1936    fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1937        let definition = RouteBuilder::from("direct:start")
1938            .route_id("test-route")
1939            .dead_letter_channel("log:dlc")
1940            .on_exception(|e| matches!(e, CamelError::Io(_)))
1941            .retry(3)
1942            .handled_by("log:io")
1943            .end_on_exception()
1944            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1945            .retry(1)
1946            .end_on_exception()
1947            .to("mock:out")
1948            .build()
1949            .expect("route should build");
1950
1951        let cfg = definition
1952            .error_handler_config()
1953            .expect("error handler should be set");
1954        assert_eq!(cfg.policies.len(), 2);
1955        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1956        assert_eq!(
1957            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1958            Some(3)
1959        );
1960        assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1961        assert_eq!(
1962            cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1963            Some(1)
1964        );
1965    }
1966
1967    #[test]
1968    fn test_builder_on_exception_mixed_mode_rejected() {
1969        let result = RouteBuilder::from("direct:start")
1970            .route_id("test-route")
1971            .error_handler(ErrorHandlerConfig::log_only())
1972            .on_exception(|_e| true)
1973            .end_on_exception()
1974            .to("mock:out")
1975            .build();
1976
1977        let err = result.err().expect("mixed mode should fail with an error");
1978
1979        assert!(
1980            format!("{err}").contains("mixed error handler modes"),
1981            "unexpected error: {err}"
1982        );
1983    }
1984
1985    #[test]
1986    fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1987        let definition = RouteBuilder::from("direct:start")
1988            .route_id("test-route")
1989            .on_exception(|_e| true)
1990            .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1991            .with_jitter(0.5)
1992            .end_on_exception()
1993            .to("mock:out")
1994            .build()
1995            .expect("route should build");
1996
1997        let cfg = definition
1998            .error_handler_config()
1999            .expect("error handler should be set");
2000        assert_eq!(cfg.policies.len(), 1);
2001        assert!(cfg.policies[0].retry.is_none());
2002    }
2003
2004    #[test]
2005    fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
2006        let definition = RouteBuilder::from("direct:start")
2007            .route_id("test-route")
2008            .dead_letter_channel("log:dlc")
2009            .to("mock:out")
2010            .build()
2011            .expect("route should build");
2012
2013        let cfg = definition
2014            .error_handler_config()
2015            .expect("error handler should be set");
2016        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
2017        assert!(cfg.policies.is_empty());
2018    }
2019
2020    #[test]
2021    fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
2022        let definition = RouteBuilder::from("direct:start")
2023            .route_id("test-route")
2024            .dead_letter_channel("log:first")
2025            .on_exception(|e| matches!(e, CamelError::Io(_)))
2026            .retry(2)
2027            .end_on_exception()
2028            .dead_letter_channel("log:second")
2029            .to("mock:out")
2030            .build()
2031            .expect("route should build");
2032
2033        let cfg = definition
2034            .error_handler_config()
2035            .expect("error handler should be set");
2036        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2037        assert_eq!(cfg.policies.len(), 1);
2038        assert_eq!(
2039            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
2040            Some(2)
2041        );
2042    }
2043
2044    #[test]
2045    fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
2046        let definition = RouteBuilder::from("direct:start")
2047            .route_id("test-route")
2048            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
2049            .retry(1)
2050            .end_on_exception()
2051            .to("mock:out")
2052            .build()
2053            .expect("route should build");
2054
2055        let cfg = definition
2056            .error_handler_config()
2057            .expect("error handler should be set");
2058        assert!(cfg.dlc_uri.is_none());
2059        assert_eq!(cfg.policies.len(), 1);
2060    }
2061
2062    #[test]
2063    fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
2064        let first = ErrorHandlerConfig::dead_letter_channel("log:first");
2065        let second = ErrorHandlerConfig::dead_letter_channel("log:second");
2066
2067        let definition = RouteBuilder::from("direct:start")
2068            .route_id("test-route")
2069            .error_handler(first)
2070            .error_handler(second)
2071            .to("mock:out")
2072            .build()
2073            .expect("route should build");
2074
2075        let cfg = definition
2076            .error_handler_config()
2077            .expect("error handler should be set");
2078        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2079    }
2080
2081    // --- Splitter builder tests ---
2082
2083    #[test]
2084    fn test_split_builder_typestate() {
2085        use camel_api::splitter::{SplitterConfig, split_body_lines};
2086
2087        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
2088        let definition = RouteBuilder::from("timer:test?period=1000")
2089            .route_id("test-route")
2090            .split(SplitterConfig::new(split_body_lines()))
2091            .to("mock:per-fragment")
2092            .end_split()
2093            .to("mock:final")
2094            .build()
2095            .unwrap();
2096
2097        // Should have 2 top-level steps: Split + To("mock:final")
2098        assert_eq!(definition.steps().len(), 2);
2099    }
2100
2101    #[test]
2102    fn test_split_builder_steps_collected() {
2103        use camel_api::splitter::{SplitterConfig, split_body_lines};
2104
2105        let definition = RouteBuilder::from("timer:test?period=1000")
2106            .route_id("test-route")
2107            .split(SplitterConfig::new(split_body_lines()))
2108            .set_header("fragment", Value::String("yes".into()))
2109            .to("mock:per-fragment")
2110            .end_split()
2111            .build()
2112            .unwrap();
2113
2114        // Should have 1 top-level step: Split (containing 2 sub-steps)
2115        assert_eq!(definition.steps().len(), 1);
2116        match &definition.steps()[0] {
2117            BuilderStep::Split { steps, .. } => {
2118                assert_eq!(steps.len(), 2); // SetHeader + To
2119            }
2120            other => panic!("Expected Split, got {:?}", other),
2121        }
2122    }
2123
2124    #[test]
2125    fn test_split_builder_config_propagated() {
2126        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
2127
2128        let definition = RouteBuilder::from("timer:test?period=1000")
2129            .route_id("test-route")
2130            .split(
2131                SplitterConfig::new(split_body_lines())
2132                    .parallel(true)
2133                    .parallel_limit(4)
2134                    .aggregation(AggregationStrategy::CollectAll),
2135            )
2136            .to("mock:per-fragment")
2137            .end_split()
2138            .build()
2139            .unwrap();
2140
2141        match &definition.steps()[0] {
2142            BuilderStep::Split { config, .. } => {
2143                assert!(config.parallel);
2144                assert_eq!(config.parallel_limit, Some(4));
2145                assert!(matches!(
2146                    config.aggregation,
2147                    AggregationStrategy::CollectAll
2148                ));
2149            }
2150            other => panic!("Expected Split, got {:?}", other),
2151        }
2152    }
2153
2154    #[test]
2155    fn test_aggregate_builder_adds_step() {
2156        use camel_api::aggregator::AggregatorConfig;
2157        use camel_core::route::BuilderStep;
2158
2159        let definition = RouteBuilder::from("timer:tick")
2160            .route_id("test-route")
2161            .aggregate(
2162                AggregatorConfig::correlate_by("key")
2163                    .complete_when_size(2)
2164                    .build()
2165                    .unwrap(),
2166            )
2167            .build()
2168            .unwrap();
2169
2170        assert_eq!(definition.steps().len(), 1);
2171        assert!(matches!(
2172            definition.steps()[0],
2173            BuilderStep::Aggregate { .. }
2174        ));
2175    }
2176
2177    #[test]
2178    fn test_aggregate_in_split_builder() {
2179        use camel_api::aggregator::AggregatorConfig;
2180        use camel_api::splitter::{SplitterConfig, split_body_lines};
2181        use camel_core::route::BuilderStep;
2182
2183        let definition = RouteBuilder::from("timer:tick")
2184            .route_id("test-route")
2185            .split(SplitterConfig::new(split_body_lines()))
2186            .aggregate(
2187                AggregatorConfig::correlate_by("key")
2188                    .complete_when_size(1)
2189                    .build()
2190                    .unwrap(),
2191            )
2192            .end_split()
2193            .build()
2194            .unwrap();
2195
2196        assert_eq!(definition.steps().len(), 1);
2197        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2198            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2199        } else {
2200            panic!("expected Split step");
2201        }
2202    }
2203
2204    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
2205
2206    #[test]
2207    fn test_builder_set_body_static_adds_processor() {
2208        let definition = RouteBuilder::from("timer:tick")
2209            .route_id("test-route")
2210            .set_body("fixed")
2211            .build()
2212            .unwrap();
2213        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2214    }
2215
2216    #[test]
2217    fn test_builder_set_body_fn_adds_processor() {
2218        let definition = RouteBuilder::from("timer:tick")
2219            .route_id("test-route")
2220            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2221            .build()
2222            .unwrap();
2223        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2224    }
2225
2226    #[test]
2227    fn transform_alias_produces_same_as_set_body() {
2228        let route_transform = RouteBuilder::from("timer:tick")
2229            .route_id("test-route")
2230            .transform("hello")
2231            .build()
2232            .unwrap();
2233
2234        let route_set_body = RouteBuilder::from("timer:tick")
2235            .route_id("test-route")
2236            .set_body("hello")
2237            .build()
2238            .unwrap();
2239
2240        assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2241    }
2242
2243    #[test]
2244    fn test_builder_set_header_fn_adds_processor() {
2245        let definition = RouteBuilder::from("timer:tick")
2246            .route_id("test-route")
2247            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2248            .build()
2249            .unwrap();
2250        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2251    }
2252
2253    #[tokio::test]
2254    async fn test_set_body_static_processor_works() {
2255        use camel_core::route::{CompiledStep, compose_pipeline};
2256        let def = RouteBuilder::from("t:t")
2257            .route_id("test-route")
2258            .set_body("replaced")
2259            .build()
2260            .unwrap();
2261        let pipeline = compose_pipeline(
2262            def.steps()
2263                .iter()
2264                .filter_map(|s| {
2265                    if let BuilderStep::Processor(p) = s {
2266                        Some(p.clone())
2267                    } else {
2268                        None
2269                    }
2270                })
2271                .map(|p| CompiledStep::Process {
2272                    processor: p,
2273                    body_contract: None,
2274                })
2275                .collect(),
2276        );
2277        let exchange = Exchange::new(Message::new("original"));
2278        let result = pipeline.oneshot(exchange).await.unwrap();
2279        assert_eq!(result.input.body.as_text(), Some("replaced"));
2280    }
2281
2282    #[tokio::test]
2283    async fn test_set_body_fn_processor_works() {
2284        use camel_core::route::{CompiledStep, compose_pipeline};
2285        let def = RouteBuilder::from("t:t")
2286            .route_id("test-route")
2287            .set_body_fn(|ex: &Exchange| {
2288                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2289            })
2290            .build()
2291            .unwrap();
2292        let pipeline = compose_pipeline(
2293            def.steps()
2294                .iter()
2295                .filter_map(|s| {
2296                    if let BuilderStep::Processor(p) = s {
2297                        Some(p.clone())
2298                    } else {
2299                        None
2300                    }
2301                })
2302                .map(|p| CompiledStep::Process {
2303                    processor: p,
2304                    body_contract: None,
2305                })
2306                .collect(),
2307        );
2308        let exchange = Exchange::new(Message::new("hello"));
2309        let result = pipeline.oneshot(exchange).await.unwrap();
2310        assert_eq!(result.input.body.as_text(), Some("HELLO"));
2311    }
2312
2313    #[tokio::test]
2314    async fn test_set_header_fn_processor_works() {
2315        use camel_core::route::{CompiledStep, compose_pipeline};
2316        let def = RouteBuilder::from("t:t")
2317            .route_id("test-route")
2318            .set_header_fn("echo", |ex: &Exchange| {
2319                ex.input
2320                    .body
2321                    .as_text()
2322                    .map(|t| Value::String(t.into()))
2323                    .unwrap_or(Value::Null)
2324            })
2325            .build()
2326            .unwrap();
2327        let pipeline = compose_pipeline(
2328            def.steps()
2329                .iter()
2330                .filter_map(|s| {
2331                    if let BuilderStep::Processor(p) = s {
2332                        Some(p.clone())
2333                    } else {
2334                        None
2335                    }
2336                })
2337                .map(|p| CompiledStep::Process {
2338                    processor: p,
2339                    body_contract: None,
2340                })
2341                .collect(),
2342        );
2343        let exchange = Exchange::new(Message::new("ping"));
2344        let result = pipeline.oneshot(exchange).await.unwrap();
2345        assert_eq!(
2346            result.input.header("echo"),
2347            Some(&Value::String("ping".into()))
2348        );
2349    }
2350
2351    // ── FilterBuilder typestate tests ─────────────────────────────────────
2352
2353    #[test]
2354    fn test_filter_builder_typestate() {
2355        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2356            .route_id("test-route")
2357            .filter(|_ex| true)
2358            .to("mock:inner")
2359            .end_filter()
2360            .to("mock:outer")
2361            .build();
2362        assert!(result.is_ok());
2363    }
2364
2365    #[test]
2366    fn test_filter_builder_steps_collected() {
2367        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2368            .route_id("test-route")
2369            .filter(|_ex| true)
2370            .to("mock:inner")
2371            .end_filter()
2372            .build()
2373            .unwrap();
2374
2375        assert_eq!(definition.steps().len(), 1);
2376        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2377    }
2378
2379    #[test]
2380    fn test_wire_tap_builder_adds_step() {
2381        let definition = RouteBuilder::from("timer:tick")
2382            .route_id("test-route")
2383            .wire_tap("mock:tap")
2384            .to("mock:result")
2385            .build()
2386            .unwrap();
2387
2388        assert_eq!(definition.steps().len(), 2);
2389        assert!(
2390            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2391        );
2392        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2393    }
2394
2395    // ── MulticastBuilder typestate tests ─────────────────────────────────────
2396
2397    #[test]
2398    fn test_multicast_builder_typestate() {
2399        let definition = RouteBuilder::from("timer:tick")
2400            .route_id("test-route")
2401            .multicast()
2402            .to("direct:a")
2403            .to("direct:b")
2404            .end_multicast()
2405            .to("mock:result")
2406            .build()
2407            .unwrap();
2408
2409        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
2410    }
2411
2412    #[test]
2413    fn test_multicast_builder_steps_collected() {
2414        let definition = RouteBuilder::from("timer:tick")
2415            .route_id("test-route")
2416            .multicast()
2417            .to("direct:a")
2418            .to("direct:b")
2419            .end_multicast()
2420            .build()
2421            .unwrap();
2422
2423        match &definition.steps()[0] {
2424            BuilderStep::Multicast { steps, .. } => {
2425                assert_eq!(steps.len(), 2);
2426            }
2427            other => panic!("Expected Multicast, got {:?}", other),
2428        }
2429    }
2430
2431    // ── Concurrency builder tests ─────────────────────────────────────
2432
2433    #[test]
2434    fn test_builder_concurrent_sets_concurrency() {
2435        use camel_component_api::ConcurrencyModel;
2436
2437        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2438            .route_id("test-route")
2439            .concurrent(16)
2440            .to("log:info")
2441            .build()
2442            .unwrap();
2443
2444        assert_eq!(
2445            definition.concurrency_override(),
2446            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2447        );
2448    }
2449
2450    #[test]
2451    fn test_builder_concurrent_zero_means_unbounded() {
2452        use camel_component_api::ConcurrencyModel;
2453
2454        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2455            .route_id("test-route")
2456            .concurrent(0)
2457            .to("log:info")
2458            .build()
2459            .unwrap();
2460
2461        assert_eq!(
2462            definition.concurrency_override(),
2463            Some(&ConcurrencyModel::Concurrent { max: None })
2464        );
2465    }
2466
2467    #[test]
2468    fn test_builder_sequential_sets_concurrency() {
2469        use camel_component_api::ConcurrencyModel;
2470
2471        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2472            .route_id("test-route")
2473            .sequential()
2474            .to("log:info")
2475            .build()
2476            .unwrap();
2477
2478        assert_eq!(
2479            definition.concurrency_override(),
2480            Some(&ConcurrencyModel::Sequential)
2481        );
2482    }
2483
2484    #[test]
2485    fn test_builder_default_concurrency_is_none() {
2486        let definition = RouteBuilder::from("timer:tick")
2487            .route_id("test-route")
2488            .to("log:info")
2489            .build()
2490            .unwrap();
2491
2492        assert_eq!(definition.concurrency_override(), None);
2493    }
2494
2495    // ── Route lifecycle builder tests ─────────────────────────────────────
2496
2497    #[test]
2498    fn test_builder_route_id_sets_id() {
2499        let definition = RouteBuilder::from("timer:tick")
2500            .route_id("my-route")
2501            .build()
2502            .unwrap();
2503
2504        assert_eq!(definition.route_id(), "my-route");
2505    }
2506
2507    #[test]
2508    fn test_build_without_route_id_fails() {
2509        let result = RouteBuilder::from("timer:tick?period=1000")
2510            .to("log:info")
2511            .build();
2512        let err = match result {
2513            Err(e) => e.to_string(),
2514            Ok(_) => panic!("build() should fail without route_id"),
2515        };
2516        assert!(
2517            err.contains("route_id"),
2518            "error should mention route_id, got: {}",
2519            err
2520        );
2521    }
2522
2523    #[test]
2524    fn test_builder_empty_route_id_rejected() {
2525        let result = RouteBuilder::from("timer:tick").route_id("").build();
2526        let err = result.err().expect("empty route_id should be rejected");
2527        assert!(matches!(err, CamelError::RouteError(_)));
2528    }
2529
2530    #[test]
2531    fn test_builder_whitespace_route_id_rejected() {
2532        let result = RouteBuilder::from("timer:tick").route_id("   ").build();
2533        assert!(result.is_err());
2534    }
2535
2536    #[test]
2537    fn test_builder_auto_startup_false() {
2538        let definition = RouteBuilder::from("timer:tick")
2539            .route_id("test-route")
2540            .auto_startup(false)
2541            .build()
2542            .unwrap();
2543
2544        assert!(!definition.auto_startup());
2545    }
2546
2547    #[test]
2548    fn test_builder_startup_order_custom() {
2549        let definition = RouteBuilder::from("timer:tick")
2550            .route_id("test-route")
2551            .startup_order(50)
2552            .build()
2553            .unwrap();
2554
2555        assert_eq!(definition.startup_order(), 50);
2556    }
2557
2558    #[test]
2559    fn test_builder_defaults() {
2560        let definition = RouteBuilder::from("timer:tick")
2561            .route_id("test-route")
2562            .build()
2563            .unwrap();
2564
2565        assert_eq!(definition.route_id(), "test-route");
2566        assert!(definition.auto_startup());
2567        assert_eq!(definition.startup_order(), 1000);
2568    }
2569
2570    // ── Choice typestate tests ──────────────────────────────────────────────────
2571
2572    #[test]
2573    fn test_choice_builder_single_when() {
2574        let definition = RouteBuilder::from("timer:tick")
2575            .route_id("test-route")
2576            .choice()
2577            .when(|ex: &Exchange| ex.input.header("type").is_some())
2578            .to("mock:typed")
2579            .end_when()
2580            .end_choice()
2581            .build()
2582            .unwrap();
2583        assert_eq!(definition.steps().len(), 1);
2584        assert!(
2585            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2586            if whens.len() == 1 && otherwise.is_none())
2587        );
2588    }
2589
2590    #[test]
2591    fn test_choice_builder_when_otherwise() {
2592        let definition = RouteBuilder::from("timer:tick")
2593            .route_id("test-route")
2594            .choice()
2595            .when(|ex: &Exchange| ex.input.header("a").is_some())
2596            .to("mock:a")
2597            .end_when()
2598            .otherwise()
2599            .to("mock:fallback")
2600            .end_otherwise()
2601            .end_choice()
2602            .build()
2603            .unwrap();
2604        assert!(
2605            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2606            if whens.len() == 1 && otherwise.is_some())
2607        );
2608    }
2609
2610    #[test]
2611    fn test_choice_builder_multiple_whens() {
2612        let definition = RouteBuilder::from("timer:tick")
2613            .route_id("test-route")
2614            .choice()
2615            .when(|ex: &Exchange| ex.input.header("a").is_some())
2616            .to("mock:a")
2617            .end_when()
2618            .when(|ex: &Exchange| ex.input.header("b").is_some())
2619            .to("mock:b")
2620            .end_when()
2621            .end_choice()
2622            .build()
2623            .unwrap();
2624        assert!(
2625            matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2626            if whens.len() == 2)
2627        );
2628    }
2629
2630    #[test]
2631    fn test_choice_step_after_choice() {
2632        // Steps after end_choice() are added to the outer pipeline, not inside choice.
2633        let definition = RouteBuilder::from("timer:tick")
2634            .route_id("test-route")
2635            .choice()
2636            .when(|_ex: &Exchange| true)
2637            .to("mock:inner")
2638            .end_when()
2639            .end_choice()
2640            .to("mock:outer") // must be step[1], not inside choice
2641            .build()
2642            .unwrap();
2643        assert_eq!(definition.steps().len(), 2);
2644        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2645    }
2646
2647    // ── Throttle typestate tests ──────────────────────────────────────────────────
2648
2649    #[test]
2650    fn test_throttle_builder_typestate() {
2651        let definition = RouteBuilder::from("timer:tick")
2652            .route_id("test-route")
2653            .throttle(10, std::time::Duration::from_secs(1))
2654            .to("mock:result")
2655            .end_throttle()
2656            .build()
2657            .unwrap();
2658
2659        assert_eq!(definition.steps().len(), 1);
2660        assert!(matches!(
2661            &definition.steps()[0],
2662            BuilderStep::Throttle { .. }
2663        ));
2664    }
2665
2666    #[test]
2667    fn test_throttle_builder_with_strategy() {
2668        let definition = RouteBuilder::from("timer:tick")
2669            .route_id("test-route")
2670            .throttle(10, std::time::Duration::from_secs(1))
2671            .strategy(ThrottleStrategy::Reject)
2672            .to("mock:result")
2673            .end_throttle()
2674            .build()
2675            .unwrap();
2676
2677        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2678            assert_eq!(config.strategy, ThrottleStrategy::Reject);
2679        } else {
2680            panic!("Expected Throttle step");
2681        }
2682    }
2683
2684    #[test]
2685    fn test_throttle_builder_steps_collected() {
2686        let definition = RouteBuilder::from("timer:tick")
2687            .route_id("test-route")
2688            .throttle(5, std::time::Duration::from_secs(1))
2689            .set_header("throttled", Value::Bool(true))
2690            .to("mock:throttled")
2691            .end_throttle()
2692            .build()
2693            .unwrap();
2694
2695        match &definition.steps()[0] {
2696            BuilderStep::Throttle { steps, .. } => {
2697                assert_eq!(steps.len(), 2); // SetHeader + To
2698            }
2699            other => panic!("Expected Throttle, got {:?}", other),
2700        }
2701    }
2702
2703    #[test]
2704    fn test_throttle_step_after_throttle() {
2705        // Steps after end_throttle() are added to the outer pipeline, not inside throttle.
2706        let definition = RouteBuilder::from("timer:tick")
2707            .route_id("test-route")
2708            .throttle(10, std::time::Duration::from_secs(1))
2709            .to("mock:inner")
2710            .end_throttle()
2711            .to("mock:outer")
2712            .build()
2713            .unwrap();
2714
2715        assert_eq!(definition.steps().len(), 2);
2716        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2717    }
2718
2719    // ── LoadBalance typestate tests ──────────────────────────────────────────────────
2720
2721    #[test]
2722    fn test_load_balance_builder_typestate() {
2723        let definition = RouteBuilder::from("timer:tick")
2724            .route_id("test-route")
2725            .load_balance()
2726            .round_robin()
2727            .to("mock:a")
2728            .to("mock:b")
2729            .end_load_balance()
2730            .build()
2731            .unwrap();
2732
2733        assert_eq!(definition.steps().len(), 1);
2734        assert!(matches!(
2735            &definition.steps()[0],
2736            BuilderStep::LoadBalance { .. }
2737        ));
2738    }
2739
2740    #[test]
2741    fn test_load_balance_builder_with_strategy() {
2742        let definition = RouteBuilder::from("timer:tick")
2743            .route_id("test-route")
2744            .load_balance()
2745            .random()
2746            .to("mock:result")
2747            .end_load_balance()
2748            .build()
2749            .unwrap();
2750
2751        if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2752            assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2753        } else {
2754            panic!("Expected LoadBalance step");
2755        }
2756    }
2757
2758    #[test]
2759    fn test_load_balance_builder_steps_collected() {
2760        let definition = RouteBuilder::from("timer:tick")
2761            .route_id("test-route")
2762            .load_balance()
2763            .set_header("lb", Value::Bool(true))
2764            .to("mock:a")
2765            .end_load_balance()
2766            .build()
2767            .unwrap();
2768
2769        match &definition.steps()[0] {
2770            BuilderStep::LoadBalance { steps, .. } => {
2771                assert_eq!(steps.len(), 2); // SetHeader + To
2772            }
2773            other => panic!("Expected LoadBalance, got {:?}", other),
2774        }
2775    }
2776
2777    #[test]
2778    fn test_load_balance_step_after_load_balance() {
2779        // Steps after end_load_balance() are added to the outer pipeline, not inside load_balance.
2780        let definition = RouteBuilder::from("timer:tick")
2781            .route_id("test-route")
2782            .load_balance()
2783            .to("mock:inner")
2784            .end_load_balance()
2785            .to("mock:outer")
2786            .build()
2787            .unwrap();
2788
2789        assert_eq!(definition.steps().len(), 2);
2790        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2791    }
2792
2793    // ── DynamicRouter typestate tests ──────────────────────────────────────────────────
2794
2795    #[test]
2796    fn test_dynamic_router_builder() {
2797        let definition = RouteBuilder::from("timer:tick")
2798            .route_id("test-route")
2799            .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2800            .build()
2801            .unwrap();
2802
2803        assert_eq!(definition.steps().len(), 1);
2804        assert!(matches!(
2805            &definition.steps()[0],
2806            BuilderStep::DynamicRouter { .. }
2807        ));
2808    }
2809
2810    #[test]
2811    fn test_dynamic_router_builder_with_config() {
2812        let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2813            .max_iterations(100)
2814            .cache_size(500);
2815
2816        let definition = RouteBuilder::from("timer:tick")
2817            .route_id("test-route")
2818            .dynamic_router_with_config(config)
2819            .build()
2820            .unwrap();
2821
2822        assert_eq!(definition.steps().len(), 1);
2823        if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2824            assert_eq!(config.max_iterations, 100);
2825            assert_eq!(config.cache_size, 500);
2826        } else {
2827            panic!("Expected DynamicRouter step");
2828        }
2829    }
2830
2831    #[test]
2832    fn test_dynamic_router_step_after_router() {
2833        // Steps after dynamic_router() are added to the outer pipeline.
2834        let definition = RouteBuilder::from("timer:tick")
2835            .route_id("test-route")
2836            .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2837            .to("mock:outer")
2838            .build()
2839            .unwrap();
2840
2841        assert_eq!(definition.steps().len(), 2);
2842        assert!(matches!(
2843            &definition.steps()[0],
2844            BuilderStep::DynamicRouter { .. }
2845        ));
2846        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2847    }
2848
2849    #[test]
2850    fn routing_slip_builder_creates_step() {
2851        use camel_api::RoutingSlipExpression;
2852
2853        let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2854
2855        let route = RouteBuilder::from("direct:start")
2856            .route_id("routing-slip-test")
2857            .routing_slip(expression)
2858            .build()
2859            .unwrap();
2860
2861        assert!(
2862            matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2863            "Expected RoutingSlip step"
2864        );
2865    }
2866
2867    #[test]
2868    fn routing_slip_with_config_builder_creates_step() {
2869        use camel_api::RoutingSlipConfig;
2870
2871        let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2872            .uri_delimiter("|")
2873            .cache_size(50)
2874            .ignore_invalid_endpoints(true);
2875
2876        let route = RouteBuilder::from("direct:start")
2877            .route_id("routing-slip-config-test")
2878            .routing_slip_with_config(config)
2879            .build()
2880            .unwrap();
2881
2882        if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2883            assert_eq!(config.uri_delimiter, "|");
2884            assert_eq!(config.cache_size, 50);
2885            assert!(config.ignore_invalid_endpoints);
2886        } else {
2887            panic!("Expected RoutingSlip step");
2888        }
2889    }
2890
2891    #[test]
2892    fn test_builder_marshal_adds_processor_step() {
2893        let definition = RouteBuilder::from("timer:tick")
2894            .route_id("test-route")
2895            .marshal("json")
2896            .unwrap()
2897            .build()
2898            .unwrap();
2899        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2900    }
2901
2902    #[test]
2903    fn test_builder_unmarshal_adds_processor_step() {
2904        let definition = RouteBuilder::from("timer:tick")
2905            .route_id("test-route")
2906            .unmarshal("json")
2907            .unwrap()
2908            .build()
2909            .unwrap();
2910        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2911    }
2912
2913    #[test]
2914    fn test_builder_stream_cache_adds_processor_step() {
2915        let definition = RouteBuilder::from("timer:tick")
2916            .route_id("test-route")
2917            .stream_cache(1024)
2918            .build()
2919            .unwrap();
2920        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2921    }
2922
2923    #[test]
2924    fn validate_adds_to_step_with_validator_uri() {
2925        let def = RouteBuilder::from("direct:in")
2926            .route_id("test")
2927            .validate("schemas/order.xsd")
2928            .build()
2929            .unwrap();
2930        let steps = def.steps();
2931        assert_eq!(steps.len(), 1);
2932        assert!(
2933            matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2934            "got: {:?}",
2935            steps[0]
2936        );
2937    }
2938
2939    #[test]
2940    fn test_builder_marshal_returns_err_for_unknown_format() {
2941        let result = RouteBuilder::from("timer:tick")
2942            .route_id("test-route")
2943            .marshal("csv");
2944        let err = match result {
2945            Err(e) => e,
2946            Ok(_) => panic!("marshal with unknown format should return Err"),
2947        };
2948        let msg = err.to_string();
2949        assert!(
2950            msg.contains("unknown data format"),
2951            "error should mention unknown format, got: {msg}"
2952        );
2953        assert!(
2954            msg.contains("csv"),
2955            "error should mention format name, got: {msg}"
2956        );
2957    }
2958
2959    #[test]
2960    fn test_builder_unmarshal_returns_err_for_unknown_format() {
2961        let result = RouteBuilder::from("timer:tick")
2962            .route_id("test-route")
2963            .unmarshal("csv");
2964        let err = match result {
2965            Err(e) => e,
2966            Ok(_) => panic!("unmarshal with unknown format should return Err"),
2967        };
2968        let msg = err.to_string();
2969        assert!(
2970            msg.contains("unknown data format"),
2971            "error should mention unknown format, got: {msg}"
2972        );
2973        assert!(
2974            msg.contains("csv"),
2975            "error should mention format name, got: {msg}"
2976        );
2977    }
2978
2979    #[test]
2980    fn test_builder_recipient_list_creates_step() {
2981        let route = RouteBuilder::from("direct:start")
2982            .route_id("recipient-list-test")
2983            .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2984            .build()
2985            .unwrap();
2986
2987        assert!(matches!(
2988            &route.steps()[0],
2989            BuilderStep::RecipientList { .. }
2990        ));
2991    }
2992
2993    #[test]
2994    fn test_builder_recipient_list_with_config_creates_step() {
2995        let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2996
2997        let route = RouteBuilder::from("direct:start")
2998            .route_id("recipient-list-config-test")
2999            .recipient_list_with_config(config)
3000            .build()
3001            .unwrap();
3002
3003        assert!(matches!(
3004            &route.steps()[0],
3005            BuilderStep::RecipientList { .. }
3006        ));
3007    }
3008
3009    #[test]
3010    fn test_builder_script_adds_script_step() {
3011        let route = RouteBuilder::from("direct:start")
3012            .route_id("script-test")
3013            .script("rhai", "headers[\"x\"] = \"y\"")
3014            .build()
3015            .unwrap();
3016
3017        assert!(matches!(
3018            &route.steps()[0],
3019            BuilderStep::Script { language, script }
3020            if language == "rhai" && script == "headers[\"x\"] = \"y\""
3021        ));
3022    }
3023
3024    #[test]
3025    fn test_builder_delay_and_delay_with_header_add_steps() {
3026        let route = RouteBuilder::from("direct:start")
3027            .route_id("delay-test")
3028            .delay(Duration::from_millis(250))
3029            .delay_with_header(Duration::from_millis(500), "x-delay")
3030            .build()
3031            .unwrap();
3032
3033        assert_eq!(route.steps().len(), 2);
3034        assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
3035        assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
3036    }
3037
3038    #[test]
3039    fn test_builder_log_and_stop_add_steps_in_order() {
3040        let route = RouteBuilder::from("direct:start")
3041            .route_id("log-stop-test")
3042            .log("hello", LogLevel::Info)
3043            .stop()
3044            .to("mock:after")
3045            .build()
3046            .unwrap();
3047
3048        assert_eq!(route.steps().len(), 3);
3049        assert!(matches!(
3050            &route.steps()[0],
3051            BuilderStep::Log { message, .. } if message == "hello"
3052        ));
3053        assert!(matches!(&route.steps()[1], BuilderStep::Stop));
3054        assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
3055    }
3056
3057    #[test]
3058    fn test_builder_stream_cache_default_adds_processor_step() {
3059        let route = RouteBuilder::from("direct:start")
3060            .route_id("stream-cache-default-test")
3061            .stream_cache_default()
3062            .build()
3063            .unwrap();
3064
3065        assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
3066    }
3067
3068    #[test]
3069    fn test_validate_preserves_existing_validator_prefix() {
3070        let route = RouteBuilder::from("direct:in")
3071            .route_id("validate-prefix-test")
3072            .validate("validator:schemas/order.xsd")
3073            .build()
3074            .unwrap();
3075
3076        assert!(matches!(
3077            &route.steps()[0],
3078            BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
3079        ));
3080    }
3081
3082    #[test]
3083    fn test_load_balance_builder_weighted_failover_parallel_config() {
3084        let route = RouteBuilder::from("direct:start")
3085            .route_id("lb-weighted-failover-parallel")
3086            .load_balance()
3087            .weighted(vec![
3088                ("direct:a".to_string(), 3),
3089                ("direct:b".to_string(), 1),
3090            ])
3091            .failover()
3092            .parallel(true)
3093            .to("mock:result")
3094            .end_load_balance()
3095            .build()
3096            .unwrap();
3097
3098        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3099            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3100            assert!(config.parallel);
3101        } else {
3102            panic!("Expected LoadBalance step");
3103        }
3104    }
3105
3106    #[test]
3107    fn test_multicast_builder_all_config_setters() {
3108        let route = RouteBuilder::from("direct:start")
3109            .route_id("multicast-config-test")
3110            .multicast()
3111            .parallel(true)
3112            .parallel_limit(4)
3113            .stop_on_exception(true)
3114            .timeout(Duration::from_millis(300))
3115            .aggregation(MulticastStrategy::Original)
3116            .to("mock:a")
3117            .end_multicast()
3118            .build()
3119            .unwrap();
3120
3121        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3122            assert!(config.parallel);
3123            assert_eq!(config.parallel_limit, Some(4));
3124            assert!(config.stop_on_exception);
3125            assert_eq!(config.timeout, Some(Duration::from_millis(300)));
3126            assert!(matches!(config.aggregation, MulticastStrategy::Original));
3127        } else {
3128            panic!("Expected Multicast step");
3129        }
3130    }
3131
3132    #[test]
3133    fn test_build_canonical_rejects_unsupported_processor_step() {
3134        let err = RouteBuilder::from("direct:start")
3135            .route_id("canonical-reject")
3136            .set_header("k", Value::String("v".into()))
3137            .build_canonical()
3138            .unwrap_err();
3139
3140        assert!(format!("{err}").contains("does not support step `processor`"));
3141    }
3142
3143    // ── LoadBalance strategy-specific tests ─────────────────────────────────────
3144
3145    #[test]
3146    fn test_load_balance_builder_weighted_strategy() {
3147        let route = RouteBuilder::from("direct:start")
3148            .route_id("lb-weighted")
3149            .load_balance()
3150            .weighted(vec![
3151                ("direct:a".to_string(), 5),
3152                ("direct:b".to_string(), 2),
3153                ("direct:c".to_string(), 1),
3154            ])
3155            .to("mock:result")
3156            .end_load_balance()
3157            .build()
3158            .unwrap();
3159
3160        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3161            assert!(matches!(config.strategy, LoadBalanceStrategy::Weighted(_)));
3162        } else {
3163            panic!("Expected LoadBalance step");
3164        }
3165    }
3166
3167    #[test]
3168    fn test_load_balance_builder_failover_strategy() {
3169        let route = RouteBuilder::from("direct:start")
3170            .route_id("lb-failover")
3171            .load_balance()
3172            .failover()
3173            .to("mock:primary")
3174            .end_load_balance()
3175            .build()
3176            .unwrap();
3177
3178        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3179            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3180            assert!(!config.parallel);
3181        } else {
3182            panic!("Expected LoadBalance step");
3183        }
3184    }
3185
3186    #[test]
3187    fn test_load_balance_builder_parallel_false_explicit() {
3188        let route = RouteBuilder::from("direct:start")
3189            .route_id("lb-parallel-false")
3190            .load_balance()
3191            .round_robin()
3192            .parallel(false)
3193            .to("mock:result")
3194            .end_load_balance()
3195            .build()
3196            .unwrap();
3197
3198        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3199            assert!(!config.parallel);
3200        } else {
3201            panic!("Expected LoadBalance step");
3202        }
3203    }
3204
3205    // ── FilterInSplitBuilder tests ──────────────────────────────────────────────
3206
3207    #[test]
3208    fn test_filter_in_split_builder_typestate() {
3209        use camel_api::splitter::{SplitterConfig, split_body_lines};
3210
3211        let definition = RouteBuilder::from("timer:test")
3212            .route_id("filter-in-split")
3213            .split(SplitterConfig::new(split_body_lines()))
3214            .filter(|_ex| true)
3215            .to("mock:filtered")
3216            .end_filter()
3217            .end_split()
3218            .build()
3219            .unwrap();
3220
3221        assert_eq!(definition.steps().len(), 1);
3222        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3223            assert_eq!(steps.len(), 1);
3224            assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3225        } else {
3226            panic!("Expected Split step");
3227        }
3228    }
3229
3230    #[test]
3231    fn test_filter_in_split_builder_multiple_steps() {
3232        use camel_api::splitter::{SplitterConfig, split_body_lines};
3233
3234        let definition = RouteBuilder::from("timer:test")
3235            .route_id("filter-in-split-multi")
3236            .split(SplitterConfig::new(split_body_lines()))
3237            .to("mock:before-filter")
3238            .filter(|_ex| true)
3239            .to("mock:inside-filter")
3240            .end_filter()
3241            .to("mock:after-filter")
3242            .end_split()
3243            .build()
3244            .unwrap();
3245
3246        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3247            // To("before-filter") + Filter{...} + To("after-filter") = 3
3248            assert_eq!(steps.len(), 3);
3249        } else {
3250            panic!("Expected Split step");
3251        }
3252    }
3253
3254    // ── build_canonical tests ───────────────────────────────────────────────────
3255
3256    #[test]
3257    fn test_build_canonical_with_circuit_breaker() {
3258        use camel_api::circuit_breaker::CircuitBreakerConfig;
3259
3260        let spec = RouteBuilder::from("direct:start")
3261            .route_id("canonical-cb")
3262            .circuit_breaker(CircuitBreakerConfig::new().failure_threshold(10))
3263            .to("mock:result")
3264            .build_canonical()
3265            .unwrap();
3266
3267        let cb = spec.circuit_breaker.expect("circuit breaker should be set");
3268        assert_eq!(cb.failure_threshold, 10);
3269    }
3270
3271    #[test]
3272    fn test_build_canonical_rejects_custom_split_aggregation() {
3273        use camel_api::splitter::{SplitterConfig, split_body_lines};
3274
3275        let err = RouteBuilder::from("direct:start")
3276            .route_id("canonical-custom-split")
3277            .split(SplitterConfig::new(split_body_lines()).aggregation(
3278                camel_api::splitter::AggregationStrategy::Custom(Arc::new(|_, ex| ex)),
3279            ))
3280            .to("mock:frag")
3281            .end_split()
3282            .build_canonical()
3283            .unwrap_err();
3284
3285        // Split with closure-based expression is rejected in canonical v1.
3286        assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3287    }
3288
3289    #[test]
3290    fn test_build_canonical_rejects_custom_aggregate_strategy() {
3291        let err = RouteBuilder::from("direct:start")
3292            .route_id("canonical-custom-agg")
3293            .aggregate(
3294                AggregatorConfig::correlate_by("key")
3295                    .complete_when_size(2)
3296                    .strategy(AggregationStrategy::Custom(Arc::new(|_, ex| ex)))
3297                    .build()
3298                    .unwrap(),
3299            )
3300            .build_canonical()
3301            .unwrap_err();
3302
3303        assert!(format!("{err}").contains("custom aggregate strategy"));
3304    }
3305
3306    #[test]
3307    fn test_build_canonical_rejects_fn_correlation_strategy() {
3308        let err = RouteBuilder::from("direct:start")
3309            .route_id("canonical-fn-corr")
3310            .aggregate(AggregatorConfig {
3311                header_name: "key".to_string(),
3312                completion: CompletionMode::Single(CompletionCondition::Size(1)),
3313                correlation: CorrelationStrategy::Fn(Arc::new(|_| Some("key".to_string()))),
3314                strategy: AggregationStrategy::CollectAll,
3315                max_buckets: None,
3316                bucket_ttl: None,
3317                force_completion_on_stop: false,
3318                discard_on_timeout: false,
3319            })
3320            .build_canonical()
3321            .unwrap_err();
3322
3323        assert!(format!("{err}").contains("Fn correlation strategy"));
3324    }
3325
3326    #[test]
3327    fn test_build_canonical_rejects_predicate_completion() {
3328        let err = RouteBuilder::from("direct:start")
3329            .route_id("canonical-pred-completion")
3330            .aggregate(AggregatorConfig {
3331                header_name: "key".to_string(),
3332                completion: CompletionMode::Single(CompletionCondition::Predicate(Arc::new(
3333                    |_| false,
3334                ))),
3335                correlation: CorrelationStrategy::HeaderName("key".to_string()),
3336                strategy: AggregationStrategy::CollectAll,
3337                max_buckets: None,
3338                bucket_ttl: None,
3339                force_completion_on_stop: false,
3340                discard_on_timeout: false,
3341            })
3342            .build_canonical()
3343            .unwrap_err();
3344
3345        assert!(format!("{err}").contains("predicate completion"));
3346    }
3347
3348    #[test]
3349    fn test_build_canonical_with_expression_correlation() {
3350        let spec = RouteBuilder::from("direct:start")
3351            .route_id("canonical-expr-corr")
3352            .aggregate(AggregatorConfig {
3353                header_name: "key".to_string(),
3354                completion: CompletionMode::Single(CompletionCondition::Size(1)),
3355                correlation: CorrelationStrategy::Expression {
3356                    expr: "header.key".to_string(),
3357                    language: "simple".to_string(),
3358                },
3359                strategy: AggregationStrategy::CollectAll,
3360                max_buckets: None,
3361                bucket_ttl: None,
3362                force_completion_on_stop: false,
3363                discard_on_timeout: false,
3364            })
3365            .build_canonical()
3366            .unwrap();
3367
3368        assert!(spec.steps.iter().any(|s| matches!(s, CanonicalStepSpec::Aggregate(a) if a.correlation_key == Some("header.key".to_string()))));
3369    }
3370
3371    #[test]
3372    fn test_build_canonical_split_rejected_with_closure_expression() {
3373        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
3374
3375        // Builder-based split uses closure expressions, which are not serializable.
3376        let err = RouteBuilder::from("direct:start")
3377            .route_id("canonical-split-last")
3378            .split(
3379                SplitterConfig::new(split_body_lines()).aggregation(AggregationStrategy::LastWins),
3380            )
3381            .to("mock:frag")
3382            .end_split()
3383            .build_canonical()
3384            .unwrap_err();
3385
3386        assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3387    }
3388
3389    // ── OnExceptionBuilder full chain tests ─────────────────────────────────────
3390
3391    #[test]
3392    fn test_on_exception_full_chain_retry_backoff_jitter_handled_by() {
3393        let definition = RouteBuilder::from("direct:start")
3394            .route_id("on-exception-full")
3395            .dead_letter_channel("log:dlc")
3396            .on_exception(|e| matches!(e, CamelError::Io(_)))
3397            .retry(5)
3398            .with_backoff(Duration::from_millis(10), 2.0, Duration::from_millis(500))
3399            .with_jitter(0.3)
3400            .handled_by("log:io-handler")
3401            .end_on_exception()
3402            .to("mock:out")
3403            .build()
3404            .unwrap();
3405
3406        let cfg = definition
3407            .error_handler_config()
3408            .expect("error handler should be set");
3409        assert_eq!(cfg.policies.len(), 1);
3410        let policy = &cfg.policies[0];
3411        let retry = policy.retry.as_ref().expect("retry should be set");
3412        assert_eq!(retry.max_attempts, 5);
3413        assert_eq!(retry.initial_delay, Duration::from_millis(10));
3414        assert_eq!(retry.multiplier, 2.0);
3415        assert_eq!(retry.max_delay, Duration::from_millis(500));
3416        assert!((retry.jitter_factor - 0.3).abs() < f64::EPSILON);
3417        assert_eq!(policy.handled_by.as_deref(), Some("log:io-handler"));
3418    }
3419
3420    #[test]
3421    fn test_on_exception_jitter_clamped_to_valid_range() {
3422        let definition = RouteBuilder::from("direct:start")
3423            .route_id("jitter-clamp")
3424            .on_exception(|_e| true)
3425            .retry(1)
3426            .with_jitter(5.0)
3427            .end_on_exception()
3428            .to("mock:out")
3429            .build()
3430            .unwrap();
3431
3432        let cfg = definition.error_handler_config().unwrap();
3433        let retry = cfg.policies[0].retry.as_ref().unwrap();
3434        assert!((retry.jitter_factor - 1.0).abs() < f64::EPSILON);
3435    }
3436
3437    // ── StepAccumulator: process_fn, convert_body_to, bean ──────────────────────
3438
3439    #[test]
3440    fn test_builder_process_fn_adds_processor_step() {
3441        use camel_api::BoxProcessorExt;
3442        let processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
3443        let definition = RouteBuilder::from("timer:tick")
3444            .route_id("process-fn-test")
3445            .process_fn(processor)
3446            .build()
3447            .unwrap();
3448
3449        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3450    }
3451
3452    #[test]
3453    fn test_builder_convert_body_to_adds_processor_step() {
3454        let definition = RouteBuilder::from("timer:tick")
3455            .route_id("convert-body-test")
3456            .convert_body_to(BodyType::Json)
3457            .build()
3458            .unwrap();
3459
3460        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3461    }
3462
3463    #[test]
3464    fn test_builder_bean_adds_bean_step() {
3465        let definition = RouteBuilder::from("timer:tick")
3466            .route_id("bean-test")
3467            .bean("myBean", "process")
3468            .build()
3469            .unwrap();
3470
3471        assert!(
3472            matches!(&definition.steps()[0], BuilderStep::Bean { name, method }
3473            if name == "myBean" && method == "process")
3474        );
3475    }
3476
3477    // ── Throttle strategy-specific tests ────────────────────────────────────────
3478
3479    #[test]
3480    fn test_throttle_builder_delay_strategy() {
3481        let definition = RouteBuilder::from("timer:tick")
3482            .route_id("throttle-delay")
3483            .throttle(10, Duration::from_secs(1))
3484            .strategy(ThrottleStrategy::Delay)
3485            .to("mock:result")
3486            .end_throttle()
3487            .build()
3488            .unwrap();
3489
3490        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3491            assert_eq!(config.strategy, ThrottleStrategy::Delay);
3492        } else {
3493            panic!("Expected Throttle step");
3494        }
3495    }
3496
3497    #[test]
3498    fn test_throttle_builder_drop_strategy() {
3499        let definition = RouteBuilder::from("timer:tick")
3500            .route_id("throttle-drop")
3501            .throttle(10, Duration::from_secs(1))
3502            .strategy(ThrottleStrategy::Drop)
3503            .to("mock:result")
3504            .end_throttle()
3505            .build()
3506            .unwrap();
3507
3508        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3509            assert_eq!(config.strategy, ThrottleStrategy::Drop);
3510        } else {
3511            panic!("Expected Throttle step");
3512        }
3513    }
3514
3515    // ── LoopInLoopBuilder with loop_while ───────────────────────────────────────
3516
3517    #[test]
3518    fn test_nested_loop_while_builder() {
3519        use camel_api::loop_eip::LoopMode;
3520
3521        let def = RouteBuilder::from("direct:start")
3522            .route_id("nested-loop-while")
3523            .loop_count(2)
3524            .to("mock:outer")
3525            .loop_while(|_ex| true)
3526            .to("mock:inner")
3527            .end_loop()
3528            .end_loop()
3529            .build()
3530            .unwrap();
3531
3532        assert_eq!(def.steps().len(), 1);
3533        if let BuilderStep::Loop { steps, .. } = &def.steps()[0] {
3534            assert_eq!(steps.len(), 2);
3535            if let BuilderStep::Loop { config, .. } = &steps[1] {
3536                assert!(matches!(config.mode, LoopMode::While(_)));
3537            } else {
3538                panic!("Expected inner Loop step");
3539            }
3540        } else {
3541            panic!("Expected outer Loop step");
3542        }
3543    }
3544
3545    // ── Choice with multiple whens + otherwise ──────────────────────────────────
3546
3547    #[test]
3548    fn test_choice_builder_multiple_whens_with_otherwise() {
3549        let definition = RouteBuilder::from("timer:tick")
3550            .route_id("choice-multi-otherwise")
3551            .choice()
3552            .when(|ex: &Exchange| ex.input.header("a").is_some())
3553            .to("mock:a")
3554            .end_when()
3555            .when(|ex: &Exchange| ex.input.header("b").is_some())
3556            .to("mock:b")
3557            .end_when()
3558            .when(|ex: &Exchange| ex.input.header("c").is_some())
3559            .to("mock:c")
3560            .end_when()
3561            .otherwise()
3562            .to("mock:fallback")
3563            .end_otherwise()
3564            .end_choice()
3565            .build()
3566            .unwrap();
3567
3568        if let BuilderStep::Choice { whens, otherwise } = &definition.steps()[0] {
3569            assert_eq!(whens.len(), 3);
3570            assert!(otherwise.is_some());
3571            assert_eq!(otherwise.as_ref().unwrap().len(), 1);
3572        } else {
3573            panic!("Expected Choice step");
3574        }
3575    }
3576
3577    // ── Multicast individual config tests ───────────────────────────────────────
3578
3579    #[test]
3580    fn test_multicast_builder_parallel_only() {
3581        let route = RouteBuilder::from("direct:start")
3582            .route_id("multicast-parallel")
3583            .multicast()
3584            .parallel(true)
3585            .to("mock:a")
3586            .end_multicast()
3587            .build()
3588            .unwrap();
3589
3590        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3591            assert!(config.parallel);
3592            assert_eq!(config.parallel_limit, None);
3593        } else {
3594            panic!("Expected Multicast step");
3595        }
3596    }
3597
3598    #[test]
3599    fn test_multicast_builder_timeout_only() {
3600        let route = RouteBuilder::from("direct:start")
3601            .route_id("multicast-timeout")
3602            .multicast()
3603            .timeout(Duration::from_secs(5))
3604            .to("mock:a")
3605            .end_multicast()
3606            .build()
3607            .unwrap();
3608
3609        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3610            assert_eq!(config.timeout, Some(Duration::from_secs(5)));
3611        } else {
3612            panic!("Expected Multicast step");
3613        }
3614    }
3615
3616    #[test]
3617    fn test_multicast_builder_aggregation_collect_all() {
3618        let route = RouteBuilder::from("direct:start")
3619            .route_id("multicast-collect")
3620            .multicast()
3621            .aggregation(MulticastStrategy::CollectAll)
3622            .to("mock:a")
3623            .end_multicast()
3624            .build()
3625            .unwrap();
3626
3627        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3628            assert!(matches!(config.aggregation, MulticastStrategy::CollectAll));
3629        } else {
3630            panic!("Expected Multicast step");
3631        }
3632    }
3633
3634    // ── extract_completion_fields: Any mode with multiple conditions ────────────
3635
3636    #[test]
3637    fn test_build_canonical_aggregate_any_completion_mode() {
3638        let spec = RouteBuilder::from("direct:start")
3639            .route_id("canonical-any-completion")
3640            .aggregate(
3641                AggregatorConfig::correlate_by("key")
3642                    .complete_on_size_or_timeout(10, Duration::from_secs(30))
3643                    .build()
3644                    .unwrap(),
3645            )
3646            .build_canonical()
3647            .unwrap();
3648
3649        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3650            assert_eq!(agg.completion_size, Some(10));
3651            assert_eq!(agg.completion_timeout_ms, Some(30_000));
3652        } else {
3653            panic!("Expected Aggregate step");
3654        }
3655    }
3656
3657    #[test]
3658    fn test_build_canonical_aggregate_timeout_completion() {
3659        let spec = RouteBuilder::from("direct:start")
3660            .route_id("canonical-timeout-completion")
3661            .aggregate(
3662                AggregatorConfig::correlate_by("key")
3663                    .complete_on_timeout(Duration::from_millis(500))
3664                    .build()
3665                    .unwrap(),
3666            )
3667            .build_canonical()
3668            .unwrap();
3669
3670        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3671            assert_eq!(agg.completion_size, None);
3672            assert_eq!(agg.completion_timeout_ms, Some(500));
3673        } else {
3674            panic!("Expected Aggregate step");
3675        }
3676    }
3677
3678    // ── canonicalize_aggregate: discard_on_timeout and force_completion_on_stop ─
3679
3680    #[test]
3681    fn test_build_canonical_aggregate_discard_on_timeout() {
3682        use camel_api::aggregator::AggregatorConfig;
3683
3684        let spec = RouteBuilder::from("direct:start")
3685            .route_id("canonical-discard-timeout")
3686            .aggregate(
3687                AggregatorConfig::correlate_by("key")
3688                    .complete_when_size(1)
3689                    .discard_on_timeout(true)
3690                    .build()
3691                    .unwrap(),
3692            )
3693            .build_canonical()
3694            .unwrap();
3695
3696        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3697            assert_eq!(agg.discard_on_timeout, Some(true));
3698        } else {
3699            panic!("Expected Aggregate step");
3700        }
3701    }
3702
3703    #[test]
3704    fn test_build_canonical_aggregate_force_completion_on_stop() {
3705        use camel_api::aggregator::AggregatorConfig;
3706
3707        let spec = RouteBuilder::from("direct:start")
3708            .route_id("canonical-force-stop")
3709            .aggregate(
3710                AggregatorConfig::correlate_by("key")
3711                    .complete_when_size(1)
3712                    .force_completion_on_stop(true)
3713                    .build()
3714                    .unwrap(),
3715            )
3716            .build_canonical()
3717            .unwrap();
3718
3719        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3720            assert_eq!(agg.force_completion_on_stop, Some(true));
3721        } else {
3722            panic!("Expected Aggregate step");
3723        }
3724    }
3725
3726    // ── build_canonical: max_buckets and bucket_ttl ─────────────────────────────
3727
3728    #[test]
3729    fn test_build_canonical_aggregate_max_buckets_and_ttl() {
3730        use camel_api::aggregator::AggregatorConfig;
3731
3732        let spec = RouteBuilder::from("direct:start")
3733            .route_id("canonical-buckets-ttl")
3734            .aggregate(
3735                AggregatorConfig::correlate_by("key")
3736                    .complete_when_size(1)
3737                    .max_buckets(100)
3738                    .bucket_ttl(Duration::from_secs(60))
3739                    .build()
3740                    .unwrap(),
3741            )
3742            .build_canonical()
3743            .unwrap();
3744
3745        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3746            assert_eq!(agg.max_buckets, Some(100));
3747            assert_eq!(agg.bucket_ttl_ms, Some(60_000));
3748        } else {
3749            panic!("Expected Aggregate step");
3750        }
3751    }
3752
3753    // ── SplitBuilder with filter inside ─────────────────────────────────────────
3754
3755    #[test]
3756    fn test_split_builder_with_filter_inside() {
3757        use camel_api::splitter::{SplitterConfig, split_body_lines};
3758
3759        let definition = RouteBuilder::from("timer:test")
3760            .route_id("split-with-filter")
3761            .split(SplitterConfig::new(split_body_lines()))
3762            .filter(|_ex| true)
3763            .to("mock:filtered-frag")
3764            .end_filter()
3765            .end_split()
3766            .build()
3767            .unwrap();
3768
3769        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3770            assert_eq!(steps.len(), 1);
3771            assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3772        } else {
3773            panic!("Expected Split step");
3774        }
3775    }
3776
3777    // ── WireTap additional tests ────────────────────────────────────────────────
3778
3779    #[test]
3780    fn test_wire_tap_multiple_taps() {
3781        let definition = RouteBuilder::from("timer:tick")
3782            .route_id("multi-wire-tap")
3783            .wire_tap("mock:tap1")
3784            .wire_tap("mock:tap2")
3785            .to("mock:result")
3786            .build()
3787            .unwrap();
3788
3789        assert_eq!(definition.steps().len(), 3);
3790        assert!(
3791            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap1")
3792        );
3793        assert!(
3794            matches!(&definition.steps()[1], BuilderStep::WireTap { uri } if uri == "mock:tap2")
3795        );
3796    }
3797
3798    // ── Error handler: explicit config after shorthand → Mixed mode ─────────────
3799
3800    #[test]
3801    fn test_builder_shorthand_then_explicit_mixed_mode() {
3802        let result = RouteBuilder::from("direct:start")
3803            .route_id("mixed-mode-2")
3804            .dead_letter_channel("log:dlc")
3805            .error_handler(ErrorHandlerConfig::log_only())
3806            .to("mock:out")
3807            .build();
3808
3809        let err = result.err().expect("mixed mode should fail");
3810        assert!(format!("{err}").contains("mixed error handler modes"));
3811    }
3812
3813    // ── build_canonical: empty from_uri error ───────────────────────────────────
3814
3815    #[test]
3816    fn test_build_canonical_empty_from_uri_errors() {
3817        let result = RouteBuilder::from("").route_id("test").build_canonical();
3818        assert!(result.is_err());
3819    }
3820
3821    #[test]
3822    fn test_build_canonical_missing_route_id_errors() {
3823        let result = RouteBuilder::from("direct:start").build_canonical();
3824        assert!(result.is_err());
3825        let err = result.unwrap_err().to_string();
3826        assert!(err.contains("route_id"));
3827    }
3828
3829    // ── SplitBuilder: aggregate inside split ────────────────────────────────────
3830
3831    #[test]
3832    fn test_split_builder_with_aggregate_inside() {
3833        use camel_api::aggregator::AggregatorConfig;
3834        use camel_api::splitter::{SplitterConfig, split_body_lines};
3835
3836        let definition = RouteBuilder::from("timer:test")
3837            .route_id("split-agg")
3838            .split(SplitterConfig::new(split_body_lines()))
3839            .aggregate(
3840                AggregatorConfig::correlate_by("frag-key")
3841                    .complete_when_size(3)
3842                    .build()
3843                    .unwrap(),
3844            )
3845            .end_split()
3846            .build()
3847            .unwrap();
3848
3849        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3850            assert_eq!(steps.len(), 1);
3851            assert!(matches!(&steps[0], BuilderStep::Aggregate { .. }));
3852        } else {
3853            panic!("Expected Split step");
3854        }
3855    }
3856
3857    // ── Throttle: steps collected inside throttle scope ─────────────────────────
3858
3859    #[test]
3860    fn test_throttle_builder_with_steps_inside() {
3861        let definition = RouteBuilder::from("timer:tick")
3862            .route_id("throttle-steps")
3863            .throttle(10, Duration::from_secs(1))
3864            .set_header("throttled", Value::Bool(true))
3865            .to("mock:throttled")
3866            .end_throttle()
3867            .build()
3868            .unwrap();
3869
3870        if let BuilderStep::Throttle { steps, .. } = &definition.steps()[0] {
3871            assert_eq!(steps.len(), 2);
3872        } else {
3873            panic!("Expected Throttle step");
3874        }
3875    }
3876
3877    // ── LoadBalance: steps collected inside scope ───────────────────────────────
3878
3879    #[test]
3880    fn test_load_balance_builder_with_steps_inside() {
3881        let definition = RouteBuilder::from("timer:tick")
3882            .route_id("lb-steps")
3883            .load_balance()
3884            .round_robin()
3885            .set_header("lb", Value::Bool(true))
3886            .to("mock:lb")
3887            .end_load_balance()
3888            .build()
3889            .unwrap();
3890
3891        if let BuilderStep::LoadBalance { steps, .. } = &definition.steps()[0] {
3892            assert_eq!(steps.len(), 2);
3893        } else {
3894            panic!("Expected LoadBalance step");
3895        }
3896    }
3897
3898    // ── Multicast: steps collected inside scope ─────────────────────────────────
3899
3900    #[test]
3901    fn test_multicast_builder_with_steps_inside() {
3902        let definition = RouteBuilder::from("timer:tick")
3903            .route_id("multicast-steps")
3904            .multicast()
3905            .set_header("mc", Value::Bool(true))
3906            .to("mock:multicast")
3907            .end_multicast()
3908            .build()
3909            .unwrap();
3910
3911        if let BuilderStep::Multicast { steps, .. } = &definition.steps()[0] {
3912            assert_eq!(steps.len(), 2);
3913        } else {
3914            panic!("Expected Multicast step");
3915        }
3916    }
3917
3918    // ── LoopBuilder: steps collected inside loop scope ──────────────────────────
3919
3920    #[test]
3921    fn test_loop_builder_with_steps_inside() {
3922        let definition = RouteBuilder::from("timer:tick")
3923            .route_id("loop-steps")
3924            .loop_count(3)
3925            .set_header("loop", Value::Bool(true))
3926            .to("mock:loop")
3927            .end_loop()
3928            .build()
3929            .unwrap();
3930
3931        if let BuilderStep::Loop { steps, .. } = &definition.steps()[0] {
3932            assert_eq!(steps.len(), 2);
3933        } else {
3934            panic!("Expected Loop step");
3935        }
3936    }
3937
3938    // ── canonical_step_name coverage for remaining variants ─────────────────────
3939
3940    #[test]
3941    fn test_build_canonical_rejects_loop_step() {
3942        let err = RouteBuilder::from("direct:start")
3943            .route_id("canonical-loop")
3944            .loop_count(3)
3945            .to("mock:loop")
3946            .end_loop()
3947            .build_canonical()
3948            .unwrap_err();
3949
3950        assert!(format!("{err}").contains("does not support step `loop`"));
3951    }
3952
3953    #[test]
3954    fn test_build_canonical_rejects_multicast_step() {
3955        let err = RouteBuilder::from("direct:start")
3956            .route_id("canonical-multicast")
3957            .multicast()
3958            .to("mock:a")
3959            .end_multicast()
3960            .build_canonical()
3961            .unwrap_err();
3962
3963        assert!(format!("{err}").contains("does not support step `multicast`"));
3964    }
3965
3966    #[test]
3967    fn test_build_canonical_rejects_throttle_step() {
3968        let err = RouteBuilder::from("direct:start")
3969            .route_id("canonical-throttle")
3970            .throttle(10, Duration::from_secs(1))
3971            .to("mock:result")
3972            .end_throttle()
3973            .build_canonical()
3974            .unwrap_err();
3975
3976        assert!(format!("{err}").contains("does not support step `throttle`"));
3977    }
3978
3979    #[test]
3980    fn test_build_canonical_rejects_load_balancer_step() {
3981        let err = RouteBuilder::from("direct:start")
3982            .route_id("canonical-lb")
3983            .load_balance()
3984            .round_robin()
3985            .to("mock:result")
3986            .end_load_balance()
3987            .build_canonical()
3988            .unwrap_err();
3989
3990        assert!(format!("{err}").contains("does not support step `load_balancer`"));
3991    }
3992
3993    #[test]
3994    fn test_build_canonical_rejects_bean_step() {
3995        let err = RouteBuilder::from("direct:start")
3996            .route_id("canonical-bean")
3997            .bean("myBean", "process")
3998            .build_canonical()
3999            .unwrap_err();
4000
4001        assert!(format!("{err}").contains("does not support step `bean`"));
4002    }
4003
4004    #[test]
4005    fn test_build_canonical_rejects_script_step() {
4006        let err = RouteBuilder::from("direct:start")
4007            .route_id("canonical-script")
4008            .script("rhai", "x = 1")
4009            .build_canonical()
4010            .unwrap_err();
4011
4012        assert!(format!("{err}").contains("does not support step `script`"));
4013    }
4014
4015    #[test]
4016    fn test_build_canonical_accepts_delay_step() {
4017        let spec = RouteBuilder::from("direct:start")
4018            .route_id("canonical-delay")
4019            .delay(Duration::from_millis(100))
4020            .build_canonical()
4021            .unwrap();
4022
4023        assert!(
4024            spec.steps.iter().any(
4025                |s| matches!(s, CanonicalStepSpec::Delay { delay_ms, .. } if *delay_ms == 100)
4026            )
4027        );
4028    }
4029
4030    #[test]
4031    fn test_build_canonical_accepts_wire_tap_step() {
4032        let spec = RouteBuilder::from("direct:start")
4033            .route_id("canonical-wiretap")
4034            .wire_tap("mock:tap")
4035            .build_canonical()
4036            .unwrap();
4037
4038        assert!(
4039            spec.steps
4040                .iter()
4041                .any(|s| matches!(s, CanonicalStepSpec::WireTap { uri } if uri == "mock:tap"))
4042        );
4043    }
4044
4045    #[test]
4046    fn test_build_canonical_rejects_dynamic_router_step() {
4047        let err = RouteBuilder::from("direct:start")
4048            .route_id("canonical-dyn-router")
4049            .dynamic_router(Arc::new(|_| Some("mock:a".to_string())))
4050            .build_canonical()
4051            .unwrap_err();
4052
4053        assert!(format!("{err}").contains("does not support step `dynamic_router`"));
4054    }
4055
4056    #[test]
4057    fn test_build_canonical_rejects_routing_slip_step() {
4058        let err = RouteBuilder::from("direct:start")
4059            .route_id("canonical-routing-slip")
4060            .routing_slip(Arc::new(|_| Some("mock:a".to_string())))
4061            .build_canonical()
4062            .unwrap_err();
4063
4064        assert!(format!("{err}").contains("does not support step `routing_slip`"));
4065    }
4066
4067    #[test]
4068    fn test_build_canonical_rejects_recipient_list_step() {
4069        let err = RouteBuilder::from("direct:start")
4070            .route_id("canonical-recipient")
4071            .recipient_list(Arc::new(|_| "mock:a".to_string()))
4072            .build_canonical()
4073            .unwrap_err();
4074
4075        assert!(format!("{err}").contains("does not support step `recipient_list`"));
4076    }
4077
4078    // ── extract_completion_fields: Any mode with predicate → error ──────────────
4079
4080    #[test]
4081    fn test_build_canonical_rejects_any_mode_with_predicate() {
4082        let err = RouteBuilder::from("direct:start")
4083            .route_id("canonical-any-pred")
4084            .aggregate(AggregatorConfig {
4085                header_name: "key".to_string(),
4086                completion: CompletionMode::Any(vec![
4087                    CompletionCondition::Size(5),
4088                    CompletionCondition::Predicate(Arc::new(|_| false)),
4089                ]),
4090                correlation: CorrelationStrategy::HeaderName("key".to_string()),
4091                strategy: AggregationStrategy::CollectAll,
4092                max_buckets: None,
4093                bucket_ttl: None,
4094                force_completion_on_stop: false,
4095                discard_on_timeout: false,
4096            })
4097            .build_canonical()
4098            .unwrap_err();
4099
4100        assert!(format!("{err}").contains("predicate completion"));
4101    }
4102
4103    // ── BUILDER-004: Validation errors for missing required fields ────────────
4104
4105    #[test]
4106    fn test_builder_validation_missing_from_uri() {
4107        let result = RouteBuilder::from("")
4108            .route_id("missing-uri-route")
4109            .to("log:info")
4110            .build();
4111        assert!(result.is_err(), "empty from URI should fail validation");
4112        let err = result.err().unwrap().to_string();
4113        assert!(
4114            err.contains("'from'") || err.contains("URI"),
4115            "error should mention from/URI, got: {err}"
4116        );
4117    }
4118
4119    #[test]
4120    fn test_builder_validation_invalid_step_uri_scheme() {
4121        let result = RouteBuilder::from("timer:tick")
4122            .route_id("bad-step-route")
4123            .to("not-a-valid-uri") // no scheme
4124            .build();
4125        // The builder itself accepts any URI string; validation happens at
4126        // resolution time. Verify the build succeeds (step URI is deferred).
4127        assert!(
4128            result.is_ok(),
4129            "builder should accept opaque step URIs; resolution happens later"
4130        );
4131    }
4132
4133    // ── BUILDER-006: Duplicate route IDs ──────────────────────────────────────
4134
4135    #[test]
4136    fn test_builder_duplicate_route_ids_produce_identical_definitions() {
4137        // The builder itself doesn't check for duplicates (that's context-level).
4138        // Verify both builds succeed with the same ID — detection is TODO(BUILDER-006).
4139        let route1 = RouteBuilder::from("direct:a")
4140            .route_id("dup-route")
4141            .to("mock:out")
4142            .build();
4143        let route2 = RouteBuilder::from("direct:b")
4144            .route_id("dup-route")
4145            .to("mock:out")
4146            .build();
4147
4148        assert!(route1.is_ok());
4149        assert!(route2.is_ok());
4150        assert_eq!(route1.unwrap().route_id(), route2.unwrap().route_id());
4151    }
4152}