Skip to main content

camel_builder/
lib.rs

1//! Fluent builder API for constructing Camel routes programmatically with EIP patterns.
2//!
3//! Main types: `RouteBuilder`, `StepAccumulator`, `SplitBuilder`, `ChoiceBuilder`, `MulticastBuilder`,
4//! `ThrottleBuilder`, `LoopBuilder`, `LoadBalancerBuilder`, `OnExceptionBuilder`.
5
6use camel_api::DelayConfig;
7use camel_api::aggregator::{
8    AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
9};
10use camel_api::body::Body;
11use camel_api::body_converter::BodyType;
12use camel_api::circuit_breaker::CircuitBreakerConfig;
13use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
14use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
15use camel_api::load_balancer::LoadBalancerConfig;
16use camel_api::loop_eip::{LoopConfig, LoopMode};
17use camel_api::multicast::{MulticastConfig, MulticastStrategy};
18use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
19use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
20use camel_api::splitter::SplitterConfig;
21use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
22use camel_api::{
23    BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
24    ProcessorFn, Value,
25    runtime::{
26        CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
27        CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
28        CanonicalWhenSpec,
29    },
30};
31use camel_component_api::ConcurrencyModel;
32use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
33use camel_processor::{
34    ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
35    StreamCacheService, UnmarshalService, builtin_data_format,
36};
37
38// ── Module declarations ─────────────────────────────────────────────────────
39pub mod do_try;
40pub use do_try::{DoCatchBuilder, DoFinallyBuilder, DoTryBuilder};
41
42/// Shared step-accumulation methods for all builder types.
43///
44/// Implementors provide `steps_mut()` and get step-adding methods for free.
45/// `filter()` and other branching methods are NOT included — they return
46/// different types per builder and stay as per-builder methods.
47pub trait StepAccumulator: Sized {
48    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
49
50    fn to(mut self, endpoint: impl Into<String>) -> Self {
51        self.steps_mut().push(BuilderStep::To(endpoint.into()));
52        self
53    }
54
55    fn process<F, Fut>(mut self, f: F) -> Self
56    where
57        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
58        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
59    {
60        let svc = ProcessorFn::new(f);
61        self.steps_mut()
62            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
63        self
64    }
65
66    fn process_fn(mut self, processor: BoxProcessor) -> Self {
67        self.steps_mut().push(BuilderStep::Processor(processor));
68        self
69    }
70
71    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
72        let svc = SetHeader::new(IdentityProcessor, key, value);
73        self.steps_mut()
74            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
75        self
76    }
77
78    fn map_body<F>(mut self, mapper: F) -> Self
79    where
80        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
81    {
82        let svc = MapBody::new(IdentityProcessor, mapper);
83        self.steps_mut()
84            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
85        self
86    }
87
88    fn set_body<B>(mut self, body: B) -> Self
89    where
90        B: Into<Body> + Clone + Send + Sync + 'static,
91    {
92        let body: Body = body.into();
93        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
94        self.steps_mut()
95            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
96        self
97    }
98
99    /// Apache Camel-compatible alias for [`set_body`](Self::set_body).
100    ///
101    /// Transforms the message body using the given value. Semantically identical
102    /// to `set_body` — provided for familiarity with Apache Camel route DSLs.
103    fn transform<B>(self, body: B) -> Self
104    where
105        B: Into<Body> + Clone + Send + Sync + 'static,
106    {
107        self.set_body(body)
108    }
109
110    fn set_body_fn<F>(mut self, expr: F) -> Self
111    where
112        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
113    {
114        let svc = SetBody::new(IdentityProcessor, expr);
115        self.steps_mut()
116            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
117        self
118    }
119
120    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
121    where
122        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
123    {
124        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
125        self.steps_mut()
126            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
127        self
128    }
129
130    fn aggregate(mut self, config: AggregatorConfig) -> Self {
131        self.steps_mut().push(BuilderStep::Aggregate { config });
132        self
133    }
134
135    /// Stop processing this exchange immediately. No further steps in the
136    /// current pipeline will run.
137    ///
138    /// Can be used at any point in the route: directly on RouteBuilder,
139    /// inside `.filter()`, inside `.split()`, etc.
140    fn stop(mut self) -> Self {
141        self.steps_mut().push(BuilderStep::Stop);
142        self
143    }
144
145    fn delay(mut self, duration: std::time::Duration) -> Self {
146        self.steps_mut().push(BuilderStep::Delay {
147            config: DelayConfig::from_duration(duration),
148        });
149        self
150    }
151
152    fn delay_with_header(
153        mut self,
154        duration: std::time::Duration,
155        header: impl Into<String>,
156    ) -> Self {
157        self.steps_mut().push(BuilderStep::Delay {
158            config: DelayConfig::from_duration_with_header(duration, header),
159        });
160        self
161    }
162
163    /// Log a message at the specified level.
164    ///
165    /// The message will be logged when an exchange passes through this step.
166    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
167        self.steps_mut().push(BuilderStep::Log {
168            level,
169            message: message.into(),
170        });
171        self
172    }
173
174    /// Convert the message body to the target type.
175    ///
176    /// Supported: Text ↔ Json ↔ Bytes. `Body::Stream` always fails.
177    /// Returns `TypeConversionFailed` if conversion is not possible.
178    ///
179    /// # Example
180    /// ```ignore
181    /// route.set_body(Value::String(r#"{"x":1}"#.into()))
182    ///      .convert_body_to(BodyType::Json)
183    ///      .to("direct:next")
184    /// ```
185    fn convert_body_to(mut self, target: BodyType) -> Self {
186        let svc = ConvertBodyTo::new(IdentityProcessor, target);
187        self.steps_mut()
188            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
189        self
190    }
191
192    fn stream_cache(mut self, threshold: usize) -> Self {
193        let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
194        let svc = StreamCacheService::new(IdentityProcessor, config);
195        self.steps_mut()
196            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
197        self
198    }
199
200    /// Materialize `Body::Stream` into `Body::Bytes` using the default threshold (128 KB).
201    ///
202    /// Equivalent to `.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)`.
203    fn stream_cache_default(self) -> Self {
204        self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
205    }
206
207    /// Marshal the message body using the specified data format.
208    ///
209    /// Supported formats: `"json"`, `"xml"`. Returns `Err(CamelError::Config)` if
210    /// the format name is unknown.
211    /// Converts a structured body (e.g., `Body::Json`) to a wire-format body (e.g., `Body::Text`).
212    ///
213    /// # Example
214    /// ```ignore
215    /// route.marshal("json")?.to("direct:next")
216    /// ```
217    fn marshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
218        let name = format.into();
219        let df = builtin_data_format(&name)
220            .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
221        let svc = MarshalService::new(IdentityProcessor, df);
222        self.steps_mut()
223            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
224        Ok(self)
225    }
226
227    /// Unmarshal the message body using the specified data format.
228    ///
229    /// Supported formats: `"json"`, `"xml"`. Returns `Err(CamelError::Config)` if
230    /// the format name is unknown.
231    /// Converts a wire-format body (e.g., `Body::Text`) to a structured body (e.g., `Body::Json`).
232    ///
233    /// # Example
234    /// ```ignore
235    /// route.unmarshal("json")?.to("direct:next")
236    /// ```
237    fn unmarshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
238        let name = format.into();
239        let df = builtin_data_format(&name)
240            .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
241        let svc = UnmarshalService::new(IdentityProcessor, df);
242        self.steps_mut()
243            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
244        Ok(self)
245    }
246
247    /// Validate the exchange body against a schema file.
248    ///
249    /// Shorthand for `to("validator:path")`. Supports XSD, JSON Schema, and YAML.
250    ///
251    /// # Example
252    /// ```ignore
253    /// route.validate("schemas/order.xsd").to("direct:out")
254    /// ```
255    fn validate(mut self, schema_path: impl Into<String>) -> Self {
256        let path = schema_path.into();
257        let uri = if path.starts_with("validator:") {
258            path
259        } else {
260            format!("validator:{path}")
261        };
262        self.steps_mut().push(BuilderStep::To(uri));
263        self
264    }
265
266    /// Execute a script that can modify the exchange (headers, properties, body).
267    ///
268    /// The script has access to `headers`, `properties`, and `body` variables
269    /// and can modify them with assignment syntax: `headers["k"] = v`.
270    ///
271    /// # Example
272    /// ```ignore
273    /// // ignore: requires full CamelContext setup with registered language
274    /// route.script("rhai", r#"headers["tenant"] = "acme"; body = body + "_processed""#)
275    /// ```
276    fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
277        self.steps_mut().push(BuilderStep::Script {
278            language: language.into(),
279            script: script.into(),
280        });
281        self
282    }
283
284    /// EIP-7 enrich: synchronous content enrichment via a resolved producer.
285    ///
286    /// Calls the given endpoint URI as a producer, then merges the response
287    /// back into the original exchange body using the default `UseEnrichedBody`
288    /// strategy (original headers/properties are preserved).
289    fn enrich(mut self, uri: impl Into<String>) -> Self {
290        self.steps_mut().push(BuilderStep::Enrich {
291            uri: uri.into(),
292            strategy: None,
293            timeout_ms: None,
294        });
295        self
296    }
297
298    /// EIP-7 pollEnrich: blocking poll of a PollingConsumer with timeout.
299    ///
300    /// Reads from a polling endpoint (e.g., file) and merges the result
301    /// into the exchange body using the default `UseEnrichedBody` strategy.
302    /// `timeout_ms` controls how long to wait for data.
303    fn poll_enrich(mut self, uri: impl Into<String>, timeout_ms: u64) -> Self {
304        self.steps_mut().push(BuilderStep::PollEnrich {
305            uri: uri.into(),
306            strategy: None,
307            timeout_ms: Some(timeout_ms),
308        });
309        self
310    }
311
312    fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
313        self.steps_mut().push(BuilderStep::Bean {
314            name: name.into(),
315            method: method.into(),
316        });
317        self
318    }
319}
320
321/// A fluent builder for constructing routes.
322///
323/// # Example
324///
325/// ```ignore
326/// let definition = RouteBuilder::from("timer:tick?period=1000")
327///     .set_header("source", Value::String("timer".into()))
328///     .filter(|ex| ex.input.body.as_text().is_some())
329///     .to("log:info?showHeaders=true")
330///     .build()?;
331/// ```
332// TODO(BUILDER-003): RouteBuilder does not support clone-and-reuse semantics.
333// Once built, the builder is consumed. Add `Clone` or a `fork()` method to allow
334// reusing a partially-built route as a template for multiple routes.
335pub struct RouteBuilder {
336    from_uri: String,
337    steps: Vec<BuilderStep>,
338    error_handler: Option<ErrorHandlerConfig>,
339    error_handler_mode: ErrorHandlerMode,
340    circuit_breaker_config: Option<CircuitBreakerConfig>,
341    security_policy_config: Option<camel_api::security_policy::SecurityPolicyConfig>,
342    security_authenticator: Option<std::sync::Arc<dyn camel_auth::TokenAuthenticator>>,
343    concurrency: Option<ConcurrencyModel>,
344    route_id: Option<String>,
345    auto_startup: Option<bool>,
346    startup_order: Option<i32>,
347}
348
349#[derive(Default)]
350enum ErrorHandlerMode {
351    #[default]
352    None,
353    ExplicitConfig,
354    Shorthand {
355        dlc_uri: Option<String>,
356        specs: Vec<OnExceptionSpec>,
357    },
358    Mixed,
359}
360
361#[derive(Clone)]
362struct OnExceptionSpec {
363    matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
364    retry: Option<RedeliveryPolicy>,
365    handled_by: Option<String>,
366}
367
368impl RouteBuilder {
369    /// Start building a route from the given source endpoint URI.
370    pub fn from(endpoint: &str) -> Self {
371        Self {
372            from_uri: endpoint.to_string(),
373            steps: Vec::new(),
374            error_handler: None,
375            error_handler_mode: ErrorHandlerMode::None,
376            circuit_breaker_config: None,
377            security_policy_config: None,
378            security_authenticator: None,
379            concurrency: None,
380            route_id: None,
381            auto_startup: None,
382            startup_order: None,
383        }
384    }
385
386    /// Open a filter scope. Only exchanges matching `predicate` will be processed
387    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
388    /// and continue to steps after `.end_filter()`.
389    pub fn filter<F>(self, predicate: F) -> FilterBuilder
390    where
391        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
392    {
393        FilterBuilder {
394            parent: self,
395            predicate: std::sync::Arc::new(predicate),
396            steps: vec![],
397        }
398    }
399
400    /// Open a choice scope for content-based routing.
401    ///
402    /// Within the choice, you can define multiple `.when()` clauses and an
403    /// optional `.otherwise()` clause. The first matching `when` predicate
404    /// determines which sub-pipeline executes.
405    pub fn choice(self) -> ChoiceBuilder {
406        ChoiceBuilder {
407            parent: self,
408            whens: vec![],
409            _otherwise: None,
410        }
411    }
412
413    /// Add a WireTap step that sends a clone of the exchange to the given
414    /// endpoint URI (fire-and-forget). The original exchange continues
415    /// downstream unchanged.
416    pub fn wire_tap(mut self, endpoint: &str) -> Self {
417        self.steps.push(BuilderStep::WireTap {
418            uri: endpoint.to_string(),
419        });
420        self
421    }
422
423    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
424    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
425        self.error_handler_mode = match self.error_handler_mode {
426            ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
427                ErrorHandlerMode::ExplicitConfig
428            }
429            ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
430        };
431        self.error_handler = Some(config);
432        self
433    }
434
435    /// Set a dead letter channel URI for shorthand error handler mode.
436    pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
437        let uri = uri.into();
438        self.error_handler_mode = match self.error_handler_mode {
439            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
440                dlc_uri: Some(uri),
441                specs: Vec::new(),
442            },
443            ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
444                dlc_uri: Some(uri),
445                specs,
446            },
447            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
448        };
449        self
450    }
451
452    /// Add a shorthand exception policy scope. Call `.end_on_exception()` to return to route builder.
453    pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
454    where
455        F: Fn(&CamelError) -> bool + Send + Sync + 'static,
456    {
457        self.error_handler_mode = match self.error_handler_mode {
458            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
459                dlc_uri: None,
460                specs: Vec::new(),
461            },
462            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
463            shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
464        };
465
466        OnExceptionBuilder {
467            parent: self,
468            policy: OnExceptionSpec {
469                matches: std::sync::Arc::new(matches),
470                retry: None,
471                handled_by: None,
472            },
473        }
474    }
475
476    /// Set a circuit breaker for this route.
477    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
478        self.circuit_breaker_config = Some(config);
479        self
480    }
481
482    pub fn security_policy(
483        mut self,
484        config: camel_api::security_policy::SecurityPolicyConfig,
485    ) -> Self {
486        self.security_policy_config = Some(config);
487        self
488    }
489
490    pub fn security_authenticator(
491        mut self,
492        auth: std::sync::Arc<dyn camel_auth::TokenAuthenticator>,
493    ) -> Self {
494        self.security_authenticator = Some(auth);
495        self
496    }
497
498    /// Override the consumer's default concurrency model.
499    ///
500    /// When set, the pipeline spawns a task per exchange, processing them
501    /// concurrently. `max` limits the number of simultaneously active
502    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
503    ///
504    /// # Example
505    /// ```ignore
506    /// RouteBuilder::from("http://0.0.0.0:8080/api")
507    ///     .concurrent(16)  // max 16 in-flight pipeline executions
508    ///     .process(handle_request)
509    ///     .build()
510    /// ```
511    pub fn concurrent(mut self, max: usize) -> Self {
512        let max = if max == 0 { None } else { Some(max) };
513        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
514        self
515    }
516
517    /// Force sequential processing, overriding a concurrent-capable consumer.
518    ///
519    /// Useful for HTTP routes that mutate shared state and need ordering
520    /// guarantees.
521    pub fn sequential(mut self) -> Self {
522        self.concurrency = Some(ConcurrencyModel::Sequential);
523        self
524    }
525
526    /// Set the route ID for this route.
527    ///
528    /// If not set, the route will be assigned an auto-generated ID.
529    pub fn route_id(mut self, id: impl Into<String>) -> Self {
530        self.route_id = Some(id.into());
531        self
532    }
533
534    /// Set whether this route should automatically start when the context starts.
535    ///
536    /// Default is `true`.
537    pub fn auto_startup(mut self, auto: bool) -> Self {
538        self.auto_startup = Some(auto);
539        self
540    }
541
542    /// Set the startup order for this route.
543    ///
544    /// Routes with lower values start first. Default is 1000.
545    pub fn startup_order(mut self, order: i32) -> Self {
546        self.startup_order = Some(order);
547        self
548    }
549
550    /// Begin a Splitter sub-pipeline. Steps added after this call (until
551    /// `.end_split()`) will be executed per-fragment.
552    ///
553    /// Returns a `SplitBuilder` — you cannot call `.build()` until
554    /// `.end_split()` closes the split scope (enforced by the type system).
555    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
556        SplitBuilder {
557            parent: self,
558            config,
559            steps: Vec::new(),
560        }
561    }
562
563    /// Begin a Multicast sub-pipeline. Steps added after this call (until
564    /// `.end_multicast()`) will each receive a copy of the exchange.
565    ///
566    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
567    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
568    pub fn multicast(self) -> MulticastBuilder {
569        MulticastBuilder {
570            parent: self,
571            steps: Vec::new(),
572            config: MulticastConfig::new(),
573        }
574    }
575
576    /// Begin a Throttle sub-pipeline. Rate limits message processing to at most
577    /// `max_requests` per `period`. Steps inside the throttle scope are only
578    /// executed when the rate limit allows.
579    ///
580    /// Returns a `ThrottleBuilder` — you cannot call `.build()` until
581    /// `.end_throttle()` closes the throttle scope (enforced by the type system).
582    pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
583        ThrottleBuilder {
584            parent: self,
585            config: ThrottlerConfig::new(max_requests, period),
586            steps: Vec::new(),
587        }
588    }
589
590    /// Begin a Loop sub-pipeline that iterates a fixed number of times.
591    pub fn loop_count(self, count: usize) -> LoopBuilder {
592        LoopBuilder {
593            parent: self,
594            config: LoopConfig {
595                mode: LoopMode::Count(count),
596            },
597            steps: vec![],
598        }
599    }
600
601    /// Begin a Loop sub-pipeline that iterates while a predicate is true.
602    pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
603    where
604        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
605    {
606        LoopBuilder {
607            parent: self,
608            config: LoopConfig {
609                mode: LoopMode::While(std::sync::Arc::new(predicate)),
610            },
611            steps: vec![],
612        }
613    }
614
615    /// Begin a LoadBalance sub-pipeline. Distributes exchanges across multiple
616    /// endpoints using a configurable strategy (round-robin, random, weighted, failover).
617    ///
618    /// Returns a `LoadBalancerBuilder` — you cannot call `.build()` until
619    /// `.end_load_balance()` closes the load balance scope (enforced by the type system).
620    pub fn load_balance(self) -> LoadBalancerBuilder {
621        LoadBalancerBuilder {
622            parent: self,
623            config: LoadBalancerConfig::round_robin(),
624            steps: Vec::new(),
625        }
626    }
627
628    /// Add a dynamic router step that routes exchanges dynamically based on
629    /// expression evaluation at runtime.
630    ///
631    /// The expression receives the exchange and returns `Some(uri)` to route to
632    /// the next endpoint, or `None` to stop routing.
633    ///
634    /// # Example
635    /// ```ignore
636    /// RouteBuilder::from("timer:tick")
637    ///     .route_id("test-route")
638    ///     .dynamic_router(|ex| {
639    ///         ex.input.header("dest").and_then(|v| v.as_str().map(|s| s.to_string()))
640    ///     })
641    ///     .build()
642    /// ```
643    pub fn dynamic_router(self, expression: RouterExpression) -> Self {
644        self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
645    }
646
647    /// Add a dynamic router step with full configuration.
648    ///
649    /// Allows customization of URI delimiter, cache size, timeout, and other options.
650    pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
651        self.steps.push(BuilderStep::DynamicRouter { config });
652        self
653    }
654
655    pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
656        self.routing_slip_with_config(RoutingSlipConfig::new(expression))
657    }
658
659    pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
660        self.steps.push(BuilderStep::RoutingSlip { config });
661        self
662    }
663
664    pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
665        self.recipient_list_with_config(RecipientListConfig::new(expression))
666    }
667
668    pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
669        self.steps.push(BuilderStep::RecipientList { config });
670        self
671    }
672
673    /// Consume the builder and produce a [`RouteDefinition`].
674    // TODO(BUILDER-006): Validate duplicate route IDs. When a route with the same
675    // ID is already registered in the context, return `Err(CamelError::RouteError)`.
676    // Currently, duplicate IDs are silently accepted; detection should happen at
677    // `CamelContext::add_route_definition` time.
678    pub fn build(self) -> Result<RouteDefinition, CamelError> {
679        validate_uri(&self.from_uri)?;
680        let route_id = self
681            .route_id
682            .filter(|s| !s.trim().is_empty())
683            .ok_or_else(|| {
684                CamelError::RouteError(
685                    "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
686                        .to_string(),
687                )
688            })?;
689        let resolved_error_handler = match self.error_handler_mode {
690            ErrorHandlerMode::None => self.error_handler,
691            ErrorHandlerMode::ExplicitConfig => self.error_handler,
692            ErrorHandlerMode::Mixed => {
693                return Err(CamelError::RouteError(
694                    "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
695                ));
696            }
697            ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
698                let mut config = if let Some(uri) = dlc_uri {
699                    ErrorHandlerConfig::dead_letter_channel(uri)
700                } else {
701                    ErrorHandlerConfig::log_only()
702                };
703
704                for spec in specs {
705                    let matcher = spec.matches.clone();
706                    let mut builder = config.on_exception(move |e| matcher(e));
707
708                    if let Some(retry) = spec.retry {
709                        builder = builder.retry(retry.max_attempts).with_backoff(
710                            retry.initial_delay,
711                            retry.multiplier,
712                            retry.max_delay,
713                        );
714                        if retry.jitter_factor > 0.0 {
715                            builder = builder.with_jitter(retry.jitter_factor);
716                        }
717                    }
718
719                    if let Some(uri) = spec.handled_by {
720                        builder = builder.handled_by(uri);
721                    }
722
723                    config = builder.build();
724                }
725
726                Some(config)
727            }
728        };
729
730        let definition = RouteDefinition::new(self.from_uri, self.steps);
731        let definition = if let Some(eh) = resolved_error_handler {
732            definition.with_error_handler(eh)
733        } else {
734            definition
735        };
736        let definition = if let Some(cb) = self.circuit_breaker_config {
737            definition.with_circuit_breaker(cb)
738        } else {
739            definition
740        };
741        let definition = if let Some(sp) = self.security_policy_config {
742            definition.with_security_policy(sp)
743        } else {
744            definition
745        };
746        let definition = if let Some(auth) = self.security_authenticator {
747            definition.with_security_authenticator(auth)
748        } else {
749            definition
750        };
751        let definition = if let Some(concurrency) = self.concurrency {
752            definition.with_concurrency(concurrency)
753        } else {
754            definition
755        };
756        let definition = definition.with_route_id(route_id);
757        let definition = if let Some(auto) = self.auto_startup {
758            definition.with_auto_startup(auto)
759        } else {
760            definition
761        };
762        let definition = if let Some(order) = self.startup_order {
763            definition.with_startup_order(order)
764        } else {
765            definition
766        };
767        Ok(definition)
768    }
769
770    /// Compile this builder route into canonical spec.
771    pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
772        validate_uri(&self.from_uri)?;
773        let route_id = self
774            .route_id
775            .filter(|s| !s.trim().is_empty())
776            .ok_or_else(|| {
777                CamelError::RouteError(
778                    "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
779                        .to_string(),
780                )
781            })?;
782
783        let steps = canonicalize_steps(self.steps)?;
784        let circuit_breaker = self
785            .circuit_breaker_config
786            .map(canonicalize_circuit_breaker);
787
788        if self.security_policy_config.is_some() {
789            return Err(CamelError::RouteError(
790                "routes with security_policy cannot use the canonical/hot-reload path (not yet supported)"
791                    .into(),
792            ));
793        }
794
795        let spec = CanonicalRouteSpec {
796            route_id,
797            from: self.from_uri,
798            steps,
799            circuit_breaker,
800            auto_startup: None,
801            startup_order: None,
802            concurrency: None,
803            version: camel_api::CANONICAL_CONTRACT_VERSION,
804        };
805        spec.validate_contract()?;
806        Ok(spec)
807    }
808}
809
810pub struct OnExceptionBuilder {
811    parent: RouteBuilder,
812    policy: OnExceptionSpec,
813}
814
815impl OnExceptionBuilder {
816    pub fn retry(mut self, max_attempts: u32) -> Self {
817        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
818        self
819    }
820
821    pub fn with_backoff(
822        mut self,
823        initial: std::time::Duration,
824        multiplier: f64,
825        max: std::time::Duration,
826    ) -> Self {
827        if let Some(ref mut retry) = self.policy.retry {
828            retry.initial_delay = initial;
829            retry.multiplier = multiplier;
830            retry.max_delay = max;
831        } else {
832            tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
833        }
834        self
835    }
836
837    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
838        if let Some(ref mut retry) = self.policy.retry {
839            retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
840        } else {
841            tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
842        }
843        self
844    }
845
846    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
847        self.policy.handled_by = Some(uri.into());
848        self
849    }
850
851    pub fn end_on_exception(mut self) -> RouteBuilder {
852        if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
853            specs.push(self.policy);
854        }
855        self.parent
856    }
857}
858
859/// Validate that a URI is non-empty and contains a scheme component.
860fn validate_uri(uri: &str) -> Result<(), CamelError> {
861    let trimmed = uri.trim();
862    if trimmed.is_empty() {
863        return Err(CamelError::RouteError(
864            "route must have a 'from' URI".to_string(),
865        ));
866    }
867    if !trimmed.contains(':') {
868        return Err(CamelError::RouteError(
869            "URI must have a scheme (e.g. 'timer:tick')".to_string(),
870        ));
871    }
872    let scheme = trimmed.split(':').next().unwrap_or("");
873    if scheme.trim().is_empty() {
874        return Err(CamelError::RouteError(
875            "URI scheme must not be empty".to_string(),
876        ));
877    }
878    Ok(())
879}
880
881fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
882    let mut canonical = Vec::with_capacity(steps.len());
883    for step in steps {
884        canonical.push(canonicalize_step(step)?);
885    }
886    Ok(canonical)
887}
888
889fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
890    match step {
891        BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
892        BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
893        BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
894        BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
895        BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
896            delay_ms: config.delay_ms,
897            dynamic_header: config.dynamic_header,
898        }),
899        BuilderStep::DeclarativeScript { expression } => {
900            Ok(CanonicalStepSpec::Script { expression })
901        }
902        BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
903            predicate,
904            steps: canonicalize_steps(steps)?,
905        }),
906        BuilderStep::DeclarativeChoice { whens, otherwise } => {
907            let mut canonical_whens = Vec::with_capacity(whens.len());
908            for DeclarativeWhenStep { predicate, steps } in whens {
909                canonical_whens.push(CanonicalWhenSpec {
910                    predicate,
911                    steps: canonicalize_steps(steps)?,
912                });
913            }
914            let otherwise = match otherwise {
915                Some(steps) => Some(canonicalize_steps(steps)?),
916                None => None,
917            };
918            Ok(CanonicalStepSpec::Choice {
919                whens: canonical_whens,
920                otherwise,
921            })
922        }
923        BuilderStep::DeclarativeSplit {
924            expression,
925            aggregation,
926            parallel,
927            parallel_limit,
928            stop_on_exception,
929            steps,
930        } => Ok(CanonicalStepSpec::Split {
931            expression: CanonicalSplitExpressionSpec::Language(expression),
932            aggregation: canonicalize_split_aggregation(aggregation)?,
933            parallel,
934            parallel_limit,
935            stop_on_exception,
936            steps: canonicalize_steps(steps)?,
937        }),
938        BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate(
939            canonicalize_aggregate(config)?,
940        )),
941        other => {
942            let step_name = canonical_step_name(&other);
943            let detail = camel_api::canonical_contract_rejection_reason(step_name)
944                .unwrap_or("not included in canonical v1");
945            Err(CamelError::RouteError(format!(
946                "canonical v1 does not support step `{step_name}`: {detail}"
947            )))
948        }
949    }
950}
951
952fn canonicalize_split_aggregation(
953    strategy: camel_api::splitter::AggregationStrategy,
954) -> Result<CanonicalSplitAggregationSpec, CamelError> {
955    match strategy {
956        camel_api::splitter::AggregationStrategy::LastWins => {
957            Ok(CanonicalSplitAggregationSpec::LastWins)
958        }
959        camel_api::splitter::AggregationStrategy::CollectAll => {
960            Ok(CanonicalSplitAggregationSpec::CollectAll)
961        }
962        camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
963            "canonical v1 does not support custom split aggregation".to_string(),
964        )),
965        camel_api::splitter::AggregationStrategy::Original => {
966            Ok(CanonicalSplitAggregationSpec::Original)
967        }
968    }
969}
970
971fn extract_completion_fields(
972    mode: &CompletionMode,
973) -> Result<(Option<usize>, Option<u64>), CamelError> {
974    match mode {
975        CompletionMode::Single(cond) => match cond {
976            CompletionCondition::Size(n) => Ok((Some(*n), None)),
977            CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
978            CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
979                "canonical v1 does not support aggregate predicate completion".to_string(),
980            )),
981        },
982        CompletionMode::Any(conds) => {
983            let mut size = None;
984            let mut timeout_ms = None;
985            for cond in conds {
986                match cond {
987                    CompletionCondition::Size(n) => size = Some(*n),
988                    CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
989                    CompletionCondition::Predicate(_) => {
990                        return Err(CamelError::RouteError(
991                            "canonical v1 does not support aggregate predicate completion"
992                                .to_string(),
993                        ));
994                    }
995                }
996            }
997            Ok((size, timeout_ms))
998        }
999    }
1000}
1001
1002fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
1003    let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
1004
1005    let header = match &config.correlation {
1006        CorrelationStrategy::HeaderName(h) => h.clone(),
1007        CorrelationStrategy::Expression { expr, .. } => expr.clone(),
1008        CorrelationStrategy::Fn(_) => {
1009            return Err(CamelError::RouteError(
1010                "canonical v1 does not support Fn correlation strategy".to_string(),
1011            ));
1012        }
1013    };
1014
1015    let correlation_key = match &config.correlation {
1016        CorrelationStrategy::HeaderName(_) => None,
1017        CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
1018        CorrelationStrategy::Fn(_) => unreachable!(),
1019    };
1020
1021    let strategy = match config.strategy {
1022        AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
1023        AggregationStrategy::Custom(_) => {
1024            return Err(CamelError::RouteError(
1025                "canonical v1 does not support custom aggregate strategy".to_string(),
1026            ));
1027        }
1028    };
1029    let bucket_ttl_ms = config
1030        .bucket_ttl
1031        .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
1032
1033    Ok(CanonicalAggregateSpec {
1034        header,
1035        completion_size,
1036        completion_timeout_ms,
1037        correlation_key,
1038        force_completion_on_stop: if config.force_completion_on_stop {
1039            Some(true)
1040        } else {
1041            None
1042        },
1043        discard_on_timeout: if config.discard_on_timeout {
1044            Some(true)
1045        } else {
1046            None
1047        },
1048        strategy,
1049        max_buckets: config.max_buckets,
1050        bucket_ttl_ms,
1051    })
1052}
1053
1054fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
1055    CanonicalCircuitBreakerSpec {
1056        failure_threshold: config.failure_threshold,
1057        open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
1058    }
1059}
1060
1061fn canonical_step_name(step: &BuilderStep) -> &'static str {
1062    match step {
1063        BuilderStep::Processor(_) => "processor",
1064        BuilderStep::To(_) => "to",
1065        BuilderStep::Stop => "stop",
1066        BuilderStep::Log { .. } => "log",
1067        BuilderStep::DeclarativeSetHeader { .. } => "set_header",
1068        BuilderStep::DeclarativeSetBody { .. } => "set_body",
1069        BuilderStep::DeclarativeFilter { .. } => "filter",
1070        BuilderStep::DeclarativeChoice { .. } => "choice",
1071        BuilderStep::DeclarativeScript { .. } => "script",
1072        BuilderStep::DeclarativeFunction { .. } => "function",
1073        BuilderStep::DeclarativeSplit { .. } => "split",
1074        BuilderStep::Split { .. } => "split",
1075        BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
1076        BuilderStep::Aggregate { .. } => "aggregate",
1077        BuilderStep::Filter { .. } => "filter",
1078        BuilderStep::Choice { .. } => "choice",
1079        BuilderStep::WireTap { .. } => "wire_tap",
1080        BuilderStep::Delay { .. } => "delay",
1081        BuilderStep::Multicast { .. } => "multicast",
1082        BuilderStep::DeclarativeLog { .. } => "log",
1083        BuilderStep::Bean { .. } => "bean",
1084        BuilderStep::Script { .. } => "script",
1085        BuilderStep::Throttle { .. } => "throttle",
1086        BuilderStep::LoadBalance { .. } => "load_balancer",
1087        BuilderStep::DynamicRouter { .. } => "dynamic_router",
1088        BuilderStep::RoutingSlip { .. } => "routing_slip",
1089        BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
1090        BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
1091        BuilderStep::RecipientList { .. } => "recipient_list",
1092        BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
1093        BuilderStep::DeclarativeSetProperty { .. } => "set_property",
1094        BuilderStep::DeclarativeStreamSplit { .. } => "stream_split",
1095        BuilderStep::Enrich { .. } => "enrich",
1096        BuilderStep::PollEnrich { .. } => "poll_enrich",
1097        BuilderStep::DeclarativeDoTry { .. } => "do_try",
1098    }
1099}
1100
1101impl StepAccumulator for RouteBuilder {
1102    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1103        &mut self.steps
1104    }
1105}
1106
1107/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
1108///
1109/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1110/// but NOT `.build()` and NOT `.split()` (no nested splits).
1111///
1112/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
1113/// and returns the parent `RouteBuilder`.
1114pub struct SplitBuilder {
1115    parent: RouteBuilder,
1116    config: SplitterConfig,
1117    steps: Vec<BuilderStep>,
1118}
1119
1120impl SplitBuilder {
1121    /// Open a filter scope within the split sub-pipeline.
1122    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1123    where
1124        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1125    {
1126        FilterInSplitBuilder {
1127            parent: self,
1128            predicate: std::sync::Arc::new(predicate),
1129            steps: vec![],
1130        }
1131    }
1132
1133    /// Close the split scope. Packages the accumulated sub-steps into a
1134    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
1135    pub fn end_split(mut self) -> RouteBuilder {
1136        let split_step = BuilderStep::Split {
1137            config: self.config,
1138            steps: self.steps,
1139        };
1140        self.parent.steps.push(split_step);
1141        self.parent
1142    }
1143}
1144
1145impl StepAccumulator for SplitBuilder {
1146    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1147        &mut self.steps
1148    }
1149}
1150
1151/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
1152pub struct FilterBuilder {
1153    parent: RouteBuilder,
1154    predicate: FilterPredicate,
1155    steps: Vec<BuilderStep>,
1156}
1157
1158impl FilterBuilder {
1159    /// Close the filter scope. Packages the accumulated sub-steps into a
1160    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
1161    pub fn end_filter(mut self) -> RouteBuilder {
1162        let step = BuilderStep::Filter {
1163            predicate: self.predicate,
1164            steps: self.steps,
1165        };
1166        self.parent.steps.push(step);
1167        self.parent
1168    }
1169}
1170
1171impl StepAccumulator for FilterBuilder {
1172    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1173        &mut self.steps
1174    }
1175}
1176
1177/// Builder for a filter scope nested inside a `.split()` block.
1178pub struct FilterInSplitBuilder {
1179    parent: SplitBuilder,
1180    predicate: FilterPredicate,
1181    steps: Vec<BuilderStep>,
1182}
1183
1184impl FilterInSplitBuilder {
1185    /// Close the filter scope and return the parent `SplitBuilder`.
1186    pub fn end_filter(mut self) -> SplitBuilder {
1187        let step = BuilderStep::Filter {
1188            predicate: self.predicate,
1189            steps: self.steps,
1190        };
1191        self.parent.steps.push(step);
1192        self.parent
1193    }
1194}
1195
1196impl StepAccumulator for FilterInSplitBuilder {
1197    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1198        &mut self.steps
1199    }
1200}
1201
1202// ── Choice/When/Otherwise builders ─────────────────────────────────────────
1203
1204/// Builder for a `.choice()` ... `.end_choice()` block.
1205///
1206/// Accumulates `when` clauses and an optional `otherwise` clause.
1207/// Cannot call `.build()` until `.end_choice()` is called.
1208pub struct ChoiceBuilder {
1209    parent: RouteBuilder,
1210    whens: Vec<WhenStep>,
1211    _otherwise: Option<Vec<BuilderStep>>,
1212}
1213
1214impl ChoiceBuilder {
1215    /// Open a `when` clause. Only exchanges matching `predicate` will be
1216    /// processed by the steps inside the `.when()` ... `.end_when()` scope.
1217    pub fn when<F>(self, predicate: F) -> WhenBuilder
1218    where
1219        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1220    {
1221        WhenBuilder {
1222            parent: self,
1223            predicate: std::sync::Arc::new(predicate),
1224            steps: vec![],
1225        }
1226    }
1227
1228    /// Open an `otherwise` clause. Executed when no `when` predicate matched.
1229    ///
1230    /// Only one `otherwise` is allowed per `choice`. Call this after all `.when()` clauses.
1231    pub fn otherwise(self) -> OtherwiseBuilder {
1232        OtherwiseBuilder {
1233            parent: self,
1234            steps: vec![],
1235        }
1236    }
1237
1238    /// Close the choice scope. Packages all accumulated `when` clauses and
1239    /// optional `otherwise` into a `BuilderStep::Choice` and returns the
1240    /// parent `RouteBuilder`.
1241    pub fn end_choice(mut self) -> RouteBuilder {
1242        let step = BuilderStep::Choice {
1243            whens: self.whens,
1244            otherwise: self._otherwise,
1245        };
1246        self.parent.steps.push(step);
1247        self.parent
1248    }
1249}
1250
1251/// Builder for the sub-pipeline within a `.when()` ... `.end_when()` block.
1252pub struct WhenBuilder {
1253    parent: ChoiceBuilder,
1254    predicate: camel_api::FilterPredicate,
1255    steps: Vec<BuilderStep>,
1256}
1257
1258impl WhenBuilder {
1259    /// Close the when scope. Packages the accumulated sub-steps into a
1260    /// `WhenStep` and returns the parent `ChoiceBuilder`.
1261    pub fn end_when(mut self) -> ChoiceBuilder {
1262        self.parent.whens.push(WhenStep {
1263            predicate: self.predicate,
1264            steps: self.steps,
1265        });
1266        self.parent
1267    }
1268}
1269
1270impl StepAccumulator for WhenBuilder {
1271    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1272        &mut self.steps
1273    }
1274}
1275
1276/// Builder for the sub-pipeline within an `.otherwise()` ... `.end_otherwise()` block.
1277pub struct OtherwiseBuilder {
1278    parent: ChoiceBuilder,
1279    steps: Vec<BuilderStep>,
1280}
1281
1282impl OtherwiseBuilder {
1283    /// Close the otherwise scope and return the parent `ChoiceBuilder`.
1284    pub fn end_otherwise(self) -> ChoiceBuilder {
1285        let OtherwiseBuilder { mut parent, steps } = self;
1286        parent._otherwise = Some(steps);
1287        parent
1288    }
1289}
1290
1291impl StepAccumulator for OtherwiseBuilder {
1292    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1293        &mut self.steps
1294    }
1295}
1296
1297/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
1298///
1299/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1300/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
1301///
1302/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
1303/// and returns the parent `RouteBuilder`.
1304pub struct MulticastBuilder {
1305    parent: RouteBuilder,
1306    steps: Vec<BuilderStep>,
1307    config: MulticastConfig,
1308}
1309
1310impl MulticastBuilder {
1311    pub fn parallel(mut self, parallel: bool) -> Self {
1312        self.config = self.config.parallel(parallel);
1313        self
1314    }
1315
1316    pub fn parallel_limit(mut self, limit: usize) -> Self {
1317        self.config = self.config.parallel_limit(limit);
1318        self
1319    }
1320
1321    pub fn stop_on_exception(mut self, stop: bool) -> Self {
1322        self.config = self.config.stop_on_exception(stop);
1323        self
1324    }
1325
1326    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1327        self.config = self.config.timeout(duration);
1328        self
1329    }
1330
1331    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1332        self.config = self.config.aggregation(strategy);
1333        self
1334    }
1335
1336    pub fn end_multicast(mut self) -> RouteBuilder {
1337        let step = BuilderStep::Multicast {
1338            steps: self.steps,
1339            config: self.config,
1340        };
1341        self.parent.steps.push(step);
1342        self.parent
1343    }
1344}
1345
1346impl StepAccumulator for MulticastBuilder {
1347    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1348        &mut self.steps
1349    }
1350}
1351
1352/// Builder for the sub-pipeline within a `.throttle()` ... `.end_throttle()` block.
1353///
1354/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1355/// but NOT `.build()` and NOT `.throttle()` (no nested throttles).
1356///
1357/// Calling `.end_throttle()` packages the sub-steps into a `BuilderStep::Throttle`
1358/// and returns the parent `RouteBuilder`.
1359pub struct ThrottleBuilder {
1360    parent: RouteBuilder,
1361    config: ThrottlerConfig,
1362    steps: Vec<BuilderStep>,
1363}
1364
1365impl ThrottleBuilder {
1366    /// Set the throttle strategy. Default is `Delay`.
1367    ///
1368    /// - `Delay`: Queue messages until capacity available
1369    /// - `Reject`: Return error immediately when throttled
1370    /// - `Drop`: Silently discard excess messages
1371    pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1372        self.config = self.config.strategy(strategy);
1373        self
1374    }
1375
1376    /// Close the throttle scope. Packages the accumulated sub-steps into a
1377    /// `BuilderStep::Throttle` and returns the parent `RouteBuilder`.
1378    pub fn end_throttle(mut self) -> RouteBuilder {
1379        let step = BuilderStep::Throttle {
1380            config: self.config,
1381            steps: self.steps,
1382        };
1383        self.parent.steps.push(step);
1384        self.parent
1385    }
1386}
1387
1388impl StepAccumulator for ThrottleBuilder {
1389    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1390        &mut self.steps
1391    }
1392}
1393
1394/// Builder for the sub-pipeline within a `.loop_count()` / `.loop_while()` ... `.end_loop()` block.
1395pub struct LoopBuilder {
1396    parent: RouteBuilder,
1397    config: LoopConfig,
1398    steps: Vec<BuilderStep>,
1399}
1400
1401impl LoopBuilder {
1402    pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1403        LoopInLoopBuilder {
1404            parent: self,
1405            config: LoopConfig {
1406                mode: LoopMode::Count(count),
1407            },
1408            steps: vec![],
1409        }
1410    }
1411
1412    pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1413    where
1414        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1415    {
1416        LoopInLoopBuilder {
1417            parent: self,
1418            config: LoopConfig {
1419                mode: LoopMode::While(std::sync::Arc::new(predicate)),
1420            },
1421            steps: vec![],
1422        }
1423    }
1424
1425    pub fn end_loop(mut self) -> RouteBuilder {
1426        let step = BuilderStep::Loop {
1427            config: self.config,
1428            steps: self.steps,
1429        };
1430        self.parent.steps.push(step);
1431        self.parent
1432    }
1433}
1434
1435impl StepAccumulator for LoopBuilder {
1436    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1437        &mut self.steps
1438    }
1439}
1440
1441pub struct LoopInLoopBuilder {
1442    parent: LoopBuilder,
1443    config: LoopConfig,
1444    steps: Vec<BuilderStep>,
1445}
1446
1447impl LoopInLoopBuilder {
1448    pub fn end_loop(mut self) -> LoopBuilder {
1449        let step = BuilderStep::Loop {
1450            config: self.config,
1451            steps: self.steps,
1452        };
1453        self.parent.steps.push(step);
1454        self.parent
1455    }
1456}
1457
1458impl StepAccumulator for LoopInLoopBuilder {
1459    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1460        &mut self.steps
1461    }
1462}
1463
1464/// Builder for the sub-pipeline within a `.load_balance()` ... `.end_load_balance()` block.
1465///
1466/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1467/// but NOT `.build()` and NOT `.load_balance()` (no nested load balancers).
1468///
1469/// Calling `.end_load_balance()` packages the sub-steps into a `BuilderStep::LoadBalance`
1470/// and returns the parent `RouteBuilder`.
1471pub struct LoadBalancerBuilder {
1472    parent: RouteBuilder,
1473    config: LoadBalancerConfig,
1474    steps: Vec<BuilderStep>,
1475}
1476
1477impl LoadBalancerBuilder {
1478    /// Set the load balance strategy to round-robin (default).
1479    pub fn round_robin(mut self) -> Self {
1480        self.config = LoadBalancerConfig::round_robin();
1481        self
1482    }
1483
1484    /// Set the load balance strategy to random selection.
1485    pub fn random(mut self) -> Self {
1486        self.config = LoadBalancerConfig::random();
1487        self
1488    }
1489
1490    /// Set the load balance strategy to weighted selection.
1491    ///
1492    /// Each endpoint is assigned a weight that determines its probability
1493    /// of being selected.
1494    pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1495        self.config = LoadBalancerConfig::weighted(weights);
1496        self
1497    }
1498
1499    /// Set the load balance strategy to failover.
1500    ///
1501    /// Exchanges are sent to the first endpoint; on failure, the next endpoint
1502    /// is tried.
1503    pub fn failover(mut self) -> Self {
1504        self.config = LoadBalancerConfig::failover();
1505        self
1506    }
1507
1508    /// Close the load balance scope. Packages the accumulated sub-steps into a
1509    /// `BuilderStep::LoadBalance` and returns the parent `RouteBuilder`.
1510    pub fn end_load_balance(mut self) -> RouteBuilder {
1511        let step = BuilderStep::LoadBalance {
1512            config: self.config,
1513            steps: self.steps,
1514        };
1515        self.parent.steps.push(step);
1516        self.parent
1517    }
1518}
1519
1520impl StepAccumulator for LoadBalancerBuilder {
1521    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1522        &mut self.steps
1523    }
1524}
1525
1526// ---------------------------------------------------------------------------
1527// Tests
1528// ---------------------------------------------------------------------------
1529
1530#[cfg(test)]
1531mod tests {
1532    use super::*;
1533    use camel_api::error_handler::ErrorHandlerConfig;
1534    use camel_api::load_balancer::LoadBalanceStrategy;
1535    use camel_api::{Exchange, Message};
1536    use camel_core::route::BuilderStep;
1537    use std::sync::Arc;
1538    use std::time::Duration;
1539    use tower::{Service, ServiceExt};
1540
1541    #[test]
1542    fn test_builder_from_creates_definition() {
1543        let definition = RouteBuilder::from("timer:tick")
1544            .route_id("test-route")
1545            .build()
1546            .unwrap();
1547        assert_eq!(definition.from_uri(), "timer:tick");
1548    }
1549
1550    #[test]
1551    fn test_builder_empty_from_uri_errors() {
1552        let result = RouteBuilder::from("").route_id("test-route").build();
1553        assert!(result.is_err());
1554    }
1555
1556    #[test]
1557    fn test_build_rejects_schemeless_uri() {
1558        let result = RouteBuilder::from("no-scheme-here")
1559            .route_id("test-route")
1560            .build();
1561        match result {
1562            Err(err) => {
1563                let err_msg = format!("{err}");
1564                assert!(
1565                    err_msg.contains("scheme"),
1566                    "expected scheme-related error, got: {err_msg}"
1567                );
1568            }
1569            Ok(_) => panic!("schemeless URI should fail"),
1570        }
1571    }
1572
1573    #[test]
1574    fn test_build_rejects_empty_scheme_uri() {
1575        let result = RouteBuilder::from(":missing-scheme")
1576            .route_id("test-route")
1577            .build();
1578        match result {
1579            Err(err) => {
1580                let err_msg = format!("{err}");
1581                assert!(
1582                    err_msg.contains("scheme"),
1583                    "expected scheme-related error, got: {err_msg}"
1584                );
1585            }
1586            Ok(_) => panic!("empty-scheme URI should fail"),
1587        }
1588    }
1589
1590    #[test]
1591    fn test_build_accepts_valid_uri() {
1592        let result = RouteBuilder::from("timer:tick")
1593            .route_id("test-route")
1594            .build();
1595        assert!(result.is_ok());
1596    }
1597
1598    #[test]
1599    fn test_build_canonical_rejects_schemeless_uri() {
1600        let result = RouteBuilder::from("no-scheme-here")
1601            .route_id("test-route")
1602            .build_canonical();
1603        assert!(result.is_err());
1604    }
1605
1606    #[test]
1607    fn test_builder_to_adds_step() {
1608        let definition = RouteBuilder::from("timer:tick")
1609            .route_id("test-route")
1610            .to("log:info")
1611            .build()
1612            .unwrap();
1613
1614        assert_eq!(definition.from_uri(), "timer:tick");
1615        // We can verify steps were added by checking the structure
1616        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1617    }
1618
1619    #[test]
1620    fn test_builder_filter_adds_filter_step() {
1621        let definition = RouteBuilder::from("timer:tick")
1622            .route_id("test-route")
1623            .filter(|_ex| true)
1624            .to("mock:result")
1625            .end_filter()
1626            .build()
1627            .unwrap();
1628
1629        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1630    }
1631
1632    #[test]
1633    fn test_builder_set_header_adds_processor_step() {
1634        let definition = RouteBuilder::from("timer:tick")
1635            .route_id("test-route")
1636            .set_header("key", Value::String("value".into()))
1637            .build()
1638            .unwrap();
1639
1640        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1641    }
1642
1643    #[test]
1644    fn test_builder_map_body_adds_processor_step() {
1645        let definition = RouteBuilder::from("timer:tick")
1646            .route_id("test-route")
1647            .map_body(|body| body)
1648            .build()
1649            .unwrap();
1650
1651        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1652    }
1653
1654    #[test]
1655    fn test_builder_process_adds_processor_step() {
1656        let definition = RouteBuilder::from("timer:tick")
1657            .route_id("test-route")
1658            .process(|ex| async move { Ok(ex) })
1659            .build()
1660            .unwrap();
1661
1662        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1663    }
1664
1665    #[test]
1666    fn test_builder_chain_multiple_steps() {
1667        let definition = RouteBuilder::from("timer:tick")
1668            .route_id("test-route")
1669            .set_header("source", Value::String("timer".into()))
1670            .filter(|ex| ex.input.header("source").is_some())
1671            .to("log:info")
1672            .end_filter()
1673            .to("mock:result")
1674            .build()
1675            .unwrap();
1676
1677        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
1678        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
1679        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
1680        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1681    }
1682
1683    #[test]
1684    fn test_loop_count_builder() {
1685        use camel_api::loop_eip::LoopMode;
1686
1687        let def = RouteBuilder::from("direct:start")
1688            .route_id("loop-test")
1689            .loop_count(3)
1690            .to("mock:inside")
1691            .end_loop()
1692            .to("mock:after")
1693            .build()
1694            .unwrap();
1695
1696        assert_eq!(def.steps().len(), 2);
1697        match &def.steps()[0] {
1698            BuilderStep::Loop { config, steps } => {
1699                assert!(matches!(config.mode, LoopMode::Count(3)));
1700                assert_eq!(steps.len(), 1);
1701            }
1702            other => panic!("Expected Loop, got {:?}", other),
1703        }
1704        assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1705    }
1706
1707    #[test]
1708    fn test_loop_while_builder() {
1709        use camel_api::loop_eip::LoopMode;
1710
1711        let def = RouteBuilder::from("direct:start")
1712            .route_id("loop-while-test")
1713            .loop_while(|_ex| true)
1714            .to("mock:retry")
1715            .end_loop()
1716            .build()
1717            .unwrap();
1718
1719        assert_eq!(def.steps().len(), 1);
1720        match &def.steps()[0] {
1721            BuilderStep::Loop { config, steps } => {
1722                assert!(matches!(config.mode, LoopMode::While(_)));
1723                assert_eq!(steps.len(), 1);
1724            }
1725            other => panic!("Expected Loop, got {:?}", other),
1726        }
1727    }
1728
1729    #[test]
1730    fn test_nested_loop_builder() {
1731        use camel_api::loop_eip::LoopMode;
1732
1733        let def = RouteBuilder::from("direct:start")
1734            .route_id("nested-loop-test")
1735            .loop_count(2)
1736            .to("mock:outer")
1737            .loop_count(3)
1738            .to("mock:inner")
1739            .end_loop()
1740            .end_loop()
1741            .to("mock:after")
1742            .build()
1743            .unwrap();
1744
1745        assert_eq!(def.steps().len(), 2);
1746        match &def.steps()[0] {
1747            BuilderStep::Loop { steps, .. } => {
1748                assert_eq!(steps.len(), 2);
1749                match &steps[1] {
1750                    BuilderStep::Loop {
1751                        config,
1752                        steps: inner_steps,
1753                    } => {
1754                        assert!(matches!(config.mode, LoopMode::Count(3)));
1755                        assert_eq!(inner_steps.len(), 1);
1756                    }
1757                    other => panic!("Expected nested Loop, got {:?}", other),
1758                }
1759            }
1760            other => panic!("Expected outer Loop, got {:?}", other),
1761        }
1762    }
1763
1764    // -----------------------------------------------------------------------
1765    // Processor behavior tests — exercise the real Tower services directly
1766    // -----------------------------------------------------------------------
1767
1768    #[tokio::test]
1769    async fn test_set_header_processor_works() {
1770        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1771        let exchange = Exchange::new(Message::new("test"));
1772        let result = svc.call(exchange).await.unwrap();
1773        assert_eq!(
1774            result.input.header("greeting"),
1775            Some(&Value::String("hello".into()))
1776        );
1777    }
1778
1779    #[tokio::test]
1780    async fn test_filter_processor_passes() {
1781        use camel_api::BoxProcessorExt;
1782        use camel_processor::FilterService;
1783
1784        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1785        let mut svc =
1786            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1787        let exchange = Exchange::new(Message::new("pass"));
1788        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1789        assert_eq!(result.input.body.as_text(), Some("pass"));
1790    }
1791
1792    #[tokio::test]
1793    async fn test_filter_processor_blocks() {
1794        use camel_api::BoxProcessorExt;
1795        use camel_processor::FilterService;
1796
1797        let sub = BoxProcessor::from_fn(|_ex| {
1798            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1799        });
1800        let mut svc =
1801            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1802        let exchange = Exchange::new(Message::new("reject"));
1803        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1804        assert_eq!(result.input.body.as_text(), Some("reject"));
1805    }
1806
1807    #[tokio::test]
1808    async fn test_map_body_processor_works() {
1809        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1810            if let Some(text) = body.as_text() {
1811                Body::Text(text.to_uppercase())
1812            } else {
1813                body
1814            }
1815        });
1816        let exchange = Exchange::new(Message::new("hello"));
1817        let result = mapper.oneshot(exchange).await.unwrap();
1818        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1819    }
1820
1821    #[tokio::test]
1822    async fn test_process_custom_processor_works() {
1823        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1824            ex.set_property("custom", Value::Bool(true));
1825            Ok(ex)
1826        });
1827        let exchange = Exchange::new(Message::default());
1828        let result = processor.oneshot(exchange).await.unwrap();
1829        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1830    }
1831
1832    // -----------------------------------------------------------------------
1833    // Sequential pipeline test
1834    // -----------------------------------------------------------------------
1835
1836    #[tokio::test]
1837    async fn test_compose_pipeline_runs_steps_in_order() {
1838        use camel_core::route::{CompiledStep, compose_pipeline};
1839
1840        let processors = vec![
1841            CompiledStep::Process {
1842                processor: BoxProcessor::new(SetHeader::new(
1843                    IdentityProcessor,
1844                    "step",
1845                    Value::String("one".into()),
1846                )),
1847                body_contract: None,
1848            },
1849            CompiledStep::Process {
1850                processor: BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1851                    if let Some(text) = body.as_text() {
1852                        Body::Text(format!("{}-processed", text))
1853                    } else {
1854                        body
1855                    }
1856                })),
1857                body_contract: None,
1858            },
1859        ];
1860
1861        let pipeline = compose_pipeline(processors);
1862        let exchange = Exchange::new(Message::new("hello"));
1863        let result = pipeline.oneshot(exchange).await.unwrap();
1864
1865        assert_eq!(
1866            result.input.header("step"),
1867            Some(&Value::String("one".into()))
1868        );
1869        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1870    }
1871
1872    #[tokio::test]
1873    async fn test_compose_pipeline_empty_is_identity() {
1874        use camel_core::route::compose_pipeline;
1875
1876        let pipeline = compose_pipeline(vec![]);
1877        let exchange = Exchange::new(Message::new("unchanged"));
1878        let result = pipeline.oneshot(exchange).await.unwrap();
1879        assert_eq!(result.input.body.as_text(), Some("unchanged"));
1880    }
1881
1882    // -----------------------------------------------------------------------
1883    // Circuit breaker builder tests
1884    // -----------------------------------------------------------------------
1885
1886    #[test]
1887    fn test_builder_circuit_breaker_sets_config() {
1888        use camel_api::circuit_breaker::CircuitBreakerConfig;
1889
1890        let config = CircuitBreakerConfig::new().failure_threshold(5);
1891        let definition = RouteBuilder::from("timer:tick")
1892            .route_id("test-route")
1893            .circuit_breaker(config)
1894            .build()
1895            .unwrap();
1896
1897        let cb = definition
1898            .circuit_breaker_config()
1899            .expect("circuit breaker should be set");
1900        assert_eq!(cb.failure_threshold, 5);
1901    }
1902
1903    #[test]
1904    fn test_builder_circuit_breaker_with_error_handler() {
1905        use camel_api::circuit_breaker::CircuitBreakerConfig;
1906        use camel_api::error_handler::ErrorHandlerConfig;
1907
1908        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1909        let eh_config = ErrorHandlerConfig::log_only();
1910
1911        let definition = RouteBuilder::from("timer:tick")
1912            .route_id("test-route")
1913            .to("log:info")
1914            .circuit_breaker(cb_config)
1915            .error_handler(eh_config)
1916            .build()
1917            .unwrap();
1918
1919        assert!(
1920            definition.circuit_breaker_config().is_some(),
1921            "circuit breaker config should be set"
1922        );
1923        // Route definition was built successfully with both configs.
1924    }
1925
1926    #[test]
1927    fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1928        let definition = RouteBuilder::from("direct:start")
1929            .route_id("test-route")
1930            .dead_letter_channel("log:dlc")
1931            .on_exception(|e| matches!(e, CamelError::Io(_)))
1932            .retry(3)
1933            .handled_by("log:io")
1934            .end_on_exception()
1935            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1936            .retry(1)
1937            .end_on_exception()
1938            .to("mock:out")
1939            .build()
1940            .expect("route should build");
1941
1942        let cfg = definition
1943            .error_handler_config()
1944            .expect("error handler should be set");
1945        assert_eq!(cfg.policies.len(), 2);
1946        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1947        assert_eq!(
1948            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1949            Some(3)
1950        );
1951        assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1952        assert_eq!(
1953            cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1954            Some(1)
1955        );
1956    }
1957
1958    #[test]
1959    fn test_builder_on_exception_mixed_mode_rejected() {
1960        let result = RouteBuilder::from("direct:start")
1961            .route_id("test-route")
1962            .error_handler(ErrorHandlerConfig::log_only())
1963            .on_exception(|_e| true)
1964            .end_on_exception()
1965            .to("mock:out")
1966            .build();
1967
1968        let err = result.err().expect("mixed mode should fail with an error");
1969
1970        assert!(
1971            format!("{err}").contains("mixed error handler modes"),
1972            "unexpected error: {err}"
1973        );
1974    }
1975
1976    #[test]
1977    fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1978        let definition = RouteBuilder::from("direct:start")
1979            .route_id("test-route")
1980            .on_exception(|_e| true)
1981            .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1982            .with_jitter(0.5)
1983            .end_on_exception()
1984            .to("mock:out")
1985            .build()
1986            .expect("route should build");
1987
1988        let cfg = definition
1989            .error_handler_config()
1990            .expect("error handler should be set");
1991        assert_eq!(cfg.policies.len(), 1);
1992        assert!(cfg.policies[0].retry.is_none());
1993    }
1994
1995    #[test]
1996    fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1997        let definition = RouteBuilder::from("direct:start")
1998            .route_id("test-route")
1999            .dead_letter_channel("log:dlc")
2000            .to("mock:out")
2001            .build()
2002            .expect("route should build");
2003
2004        let cfg = definition
2005            .error_handler_config()
2006            .expect("error handler should be set");
2007        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
2008        assert!(cfg.policies.is_empty());
2009    }
2010
2011    #[test]
2012    fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
2013        let definition = RouteBuilder::from("direct:start")
2014            .route_id("test-route")
2015            .dead_letter_channel("log:first")
2016            .on_exception(|e| matches!(e, CamelError::Io(_)))
2017            .retry(2)
2018            .end_on_exception()
2019            .dead_letter_channel("log:second")
2020            .to("mock:out")
2021            .build()
2022            .expect("route should build");
2023
2024        let cfg = definition
2025            .error_handler_config()
2026            .expect("error handler should be set");
2027        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2028        assert_eq!(cfg.policies.len(), 1);
2029        assert_eq!(
2030            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
2031            Some(2)
2032        );
2033    }
2034
2035    #[test]
2036    fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
2037        let definition = RouteBuilder::from("direct:start")
2038            .route_id("test-route")
2039            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
2040            .retry(1)
2041            .end_on_exception()
2042            .to("mock:out")
2043            .build()
2044            .expect("route should build");
2045
2046        let cfg = definition
2047            .error_handler_config()
2048            .expect("error handler should be set");
2049        assert!(cfg.dlc_uri.is_none());
2050        assert_eq!(cfg.policies.len(), 1);
2051    }
2052
2053    #[test]
2054    fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
2055        let first = ErrorHandlerConfig::dead_letter_channel("log:first");
2056        let second = ErrorHandlerConfig::dead_letter_channel("log:second");
2057
2058        let definition = RouteBuilder::from("direct:start")
2059            .route_id("test-route")
2060            .error_handler(first)
2061            .error_handler(second)
2062            .to("mock:out")
2063            .build()
2064            .expect("route should build");
2065
2066        let cfg = definition
2067            .error_handler_config()
2068            .expect("error handler should be set");
2069        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2070    }
2071
2072    // --- Splitter builder tests ---
2073
2074    #[test]
2075    fn test_split_builder_typestate() {
2076        use camel_api::splitter::{SplitterConfig, split_body_lines};
2077
2078        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
2079        let definition = RouteBuilder::from("timer:test?period=1000")
2080            .route_id("test-route")
2081            .split(SplitterConfig::new(split_body_lines()))
2082            .to("mock:per-fragment")
2083            .end_split()
2084            .to("mock:final")
2085            .build()
2086            .unwrap();
2087
2088        // Should have 2 top-level steps: Split + To("mock:final")
2089        assert_eq!(definition.steps().len(), 2);
2090    }
2091
2092    #[test]
2093    fn test_split_builder_steps_collected() {
2094        use camel_api::splitter::{SplitterConfig, split_body_lines};
2095
2096        let definition = RouteBuilder::from("timer:test?period=1000")
2097            .route_id("test-route")
2098            .split(SplitterConfig::new(split_body_lines()))
2099            .set_header("fragment", Value::String("yes".into()))
2100            .to("mock:per-fragment")
2101            .end_split()
2102            .build()
2103            .unwrap();
2104
2105        // Should have 1 top-level step: Split (containing 2 sub-steps)
2106        assert_eq!(definition.steps().len(), 1);
2107        match &definition.steps()[0] {
2108            BuilderStep::Split { steps, .. } => {
2109                assert_eq!(steps.len(), 2); // SetHeader + To
2110            }
2111            other => panic!("Expected Split, got {:?}", other),
2112        }
2113    }
2114
2115    #[test]
2116    fn test_split_builder_config_propagated() {
2117        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
2118
2119        let definition = RouteBuilder::from("timer:test?period=1000")
2120            .route_id("test-route")
2121            .split(
2122                SplitterConfig::new(split_body_lines())
2123                    .parallel(true)
2124                    .parallel_limit(4)
2125                    .aggregation(AggregationStrategy::CollectAll),
2126            )
2127            .to("mock:per-fragment")
2128            .end_split()
2129            .build()
2130            .unwrap();
2131
2132        match &definition.steps()[0] {
2133            BuilderStep::Split { config, .. } => {
2134                assert!(config.parallel);
2135                assert_eq!(config.parallel_limit, Some(4));
2136                assert!(matches!(
2137                    config.aggregation,
2138                    AggregationStrategy::CollectAll
2139                ));
2140            }
2141            other => panic!("Expected Split, got {:?}", other),
2142        }
2143    }
2144
2145    #[test]
2146    fn test_aggregate_builder_adds_step() {
2147        use camel_api::aggregator::AggregatorConfig;
2148        use camel_core::route::BuilderStep;
2149
2150        let definition = RouteBuilder::from("timer:tick")
2151            .route_id("test-route")
2152            .aggregate(
2153                AggregatorConfig::correlate_by("key")
2154                    .complete_when_size(2)
2155                    .build()
2156                    .unwrap(),
2157            )
2158            .build()
2159            .unwrap();
2160
2161        assert_eq!(definition.steps().len(), 1);
2162        assert!(matches!(
2163            definition.steps()[0],
2164            BuilderStep::Aggregate { .. }
2165        ));
2166    }
2167
2168    #[test]
2169    fn test_aggregate_in_split_builder() {
2170        use camel_api::aggregator::AggregatorConfig;
2171        use camel_api::splitter::{SplitterConfig, split_body_lines};
2172        use camel_core::route::BuilderStep;
2173
2174        let definition = RouteBuilder::from("timer:tick")
2175            .route_id("test-route")
2176            .split(SplitterConfig::new(split_body_lines()))
2177            .aggregate(
2178                AggregatorConfig::correlate_by("key")
2179                    .complete_when_size(1)
2180                    .build()
2181                    .unwrap(),
2182            )
2183            .end_split()
2184            .build()
2185            .unwrap();
2186
2187        assert_eq!(definition.steps().len(), 1);
2188        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2189            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2190        } else {
2191            panic!("expected Split step");
2192        }
2193    }
2194
2195    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
2196
2197    #[test]
2198    fn test_builder_set_body_static_adds_processor() {
2199        let definition = RouteBuilder::from("timer:tick")
2200            .route_id("test-route")
2201            .set_body("fixed")
2202            .build()
2203            .unwrap();
2204        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2205    }
2206
2207    #[test]
2208    fn test_builder_set_body_fn_adds_processor() {
2209        let definition = RouteBuilder::from("timer:tick")
2210            .route_id("test-route")
2211            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2212            .build()
2213            .unwrap();
2214        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2215    }
2216
2217    #[test]
2218    fn transform_alias_produces_same_as_set_body() {
2219        let route_transform = RouteBuilder::from("timer:tick")
2220            .route_id("test-route")
2221            .transform("hello")
2222            .build()
2223            .unwrap();
2224
2225        let route_set_body = RouteBuilder::from("timer:tick")
2226            .route_id("test-route")
2227            .set_body("hello")
2228            .build()
2229            .unwrap();
2230
2231        assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2232    }
2233
2234    #[test]
2235    fn test_builder_set_header_fn_adds_processor() {
2236        let definition = RouteBuilder::from("timer:tick")
2237            .route_id("test-route")
2238            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2239            .build()
2240            .unwrap();
2241        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2242    }
2243
2244    #[tokio::test]
2245    async fn test_set_body_static_processor_works() {
2246        use camel_core::route::{CompiledStep, compose_pipeline};
2247        let def = RouteBuilder::from("t:t")
2248            .route_id("test-route")
2249            .set_body("replaced")
2250            .build()
2251            .unwrap();
2252        let pipeline = compose_pipeline(
2253            def.steps()
2254                .iter()
2255                .filter_map(|s| {
2256                    if let BuilderStep::Processor(p) = s {
2257                        Some(p.clone())
2258                    } else {
2259                        None
2260                    }
2261                })
2262                .map(|p| CompiledStep::Process {
2263                    processor: p,
2264                    body_contract: None,
2265                })
2266                .collect(),
2267        );
2268        let exchange = Exchange::new(Message::new("original"));
2269        let result = pipeline.oneshot(exchange).await.unwrap();
2270        assert_eq!(result.input.body.as_text(), Some("replaced"));
2271    }
2272
2273    #[tokio::test]
2274    async fn test_set_body_fn_processor_works() {
2275        use camel_core::route::{CompiledStep, compose_pipeline};
2276        let def = RouteBuilder::from("t:t")
2277            .route_id("test-route")
2278            .set_body_fn(|ex: &Exchange| {
2279                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2280            })
2281            .build()
2282            .unwrap();
2283        let pipeline = compose_pipeline(
2284            def.steps()
2285                .iter()
2286                .filter_map(|s| {
2287                    if let BuilderStep::Processor(p) = s {
2288                        Some(p.clone())
2289                    } else {
2290                        None
2291                    }
2292                })
2293                .map(|p| CompiledStep::Process {
2294                    processor: p,
2295                    body_contract: None,
2296                })
2297                .collect(),
2298        );
2299        let exchange = Exchange::new(Message::new("hello"));
2300        let result = pipeline.oneshot(exchange).await.unwrap();
2301        assert_eq!(result.input.body.as_text(), Some("HELLO"));
2302    }
2303
2304    #[tokio::test]
2305    async fn test_set_header_fn_processor_works() {
2306        use camel_core::route::{CompiledStep, compose_pipeline};
2307        let def = RouteBuilder::from("t:t")
2308            .route_id("test-route")
2309            .set_header_fn("echo", |ex: &Exchange| {
2310                ex.input
2311                    .body
2312                    .as_text()
2313                    .map(|t| Value::String(t.into()))
2314                    .unwrap_or(Value::Null)
2315            })
2316            .build()
2317            .unwrap();
2318        let pipeline = compose_pipeline(
2319            def.steps()
2320                .iter()
2321                .filter_map(|s| {
2322                    if let BuilderStep::Processor(p) = s {
2323                        Some(p.clone())
2324                    } else {
2325                        None
2326                    }
2327                })
2328                .map(|p| CompiledStep::Process {
2329                    processor: p,
2330                    body_contract: None,
2331                })
2332                .collect(),
2333        );
2334        let exchange = Exchange::new(Message::new("ping"));
2335        let result = pipeline.oneshot(exchange).await.unwrap();
2336        assert_eq!(
2337            result.input.header("echo"),
2338            Some(&Value::String("ping".into()))
2339        );
2340    }
2341
2342    // ── FilterBuilder typestate tests ─────────────────────────────────────
2343
2344    #[test]
2345    fn test_filter_builder_typestate() {
2346        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2347            .route_id("test-route")
2348            .filter(|_ex| true)
2349            .to("mock:inner")
2350            .end_filter()
2351            .to("mock:outer")
2352            .build();
2353        assert!(result.is_ok());
2354    }
2355
2356    #[test]
2357    fn test_filter_builder_steps_collected() {
2358        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2359            .route_id("test-route")
2360            .filter(|_ex| true)
2361            .to("mock:inner")
2362            .end_filter()
2363            .build()
2364            .unwrap();
2365
2366        assert_eq!(definition.steps().len(), 1);
2367        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2368    }
2369
2370    #[test]
2371    fn test_wire_tap_builder_adds_step() {
2372        let definition = RouteBuilder::from("timer:tick")
2373            .route_id("test-route")
2374            .wire_tap("mock:tap")
2375            .to("mock:result")
2376            .build()
2377            .unwrap();
2378
2379        assert_eq!(definition.steps().len(), 2);
2380        assert!(
2381            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2382        );
2383        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2384    }
2385
2386    // ── MulticastBuilder typestate tests ─────────────────────────────────────
2387
2388    #[test]
2389    fn test_multicast_builder_typestate() {
2390        let definition = RouteBuilder::from("timer:tick")
2391            .route_id("test-route")
2392            .multicast()
2393            .to("direct:a")
2394            .to("direct:b")
2395            .end_multicast()
2396            .to("mock:result")
2397            .build()
2398            .unwrap();
2399
2400        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
2401    }
2402
2403    #[test]
2404    fn test_multicast_builder_steps_collected() {
2405        let definition = RouteBuilder::from("timer:tick")
2406            .route_id("test-route")
2407            .multicast()
2408            .to("direct:a")
2409            .to("direct:b")
2410            .end_multicast()
2411            .build()
2412            .unwrap();
2413
2414        match &definition.steps()[0] {
2415            BuilderStep::Multicast { steps, .. } => {
2416                assert_eq!(steps.len(), 2);
2417            }
2418            other => panic!("Expected Multicast, got {:?}", other),
2419        }
2420    }
2421
2422    // ── Concurrency builder tests ─────────────────────────────────────
2423
2424    #[test]
2425    fn test_builder_concurrent_sets_concurrency() {
2426        use camel_component_api::ConcurrencyModel;
2427
2428        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2429            .route_id("test-route")
2430            .concurrent(16)
2431            .to("log:info")
2432            .build()
2433            .unwrap();
2434
2435        assert_eq!(
2436            definition.concurrency_override(),
2437            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2438        );
2439    }
2440
2441    #[test]
2442    fn test_builder_concurrent_zero_means_unbounded() {
2443        use camel_component_api::ConcurrencyModel;
2444
2445        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2446            .route_id("test-route")
2447            .concurrent(0)
2448            .to("log:info")
2449            .build()
2450            .unwrap();
2451
2452        assert_eq!(
2453            definition.concurrency_override(),
2454            Some(&ConcurrencyModel::Concurrent { max: None })
2455        );
2456    }
2457
2458    #[test]
2459    fn test_builder_sequential_sets_concurrency() {
2460        use camel_component_api::ConcurrencyModel;
2461
2462        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2463            .route_id("test-route")
2464            .sequential()
2465            .to("log:info")
2466            .build()
2467            .unwrap();
2468
2469        assert_eq!(
2470            definition.concurrency_override(),
2471            Some(&ConcurrencyModel::Sequential)
2472        );
2473    }
2474
2475    #[test]
2476    fn test_builder_default_concurrency_is_none() {
2477        let definition = RouteBuilder::from("timer:tick")
2478            .route_id("test-route")
2479            .to("log:info")
2480            .build()
2481            .unwrap();
2482
2483        assert_eq!(definition.concurrency_override(), None);
2484    }
2485
2486    // ── Route lifecycle builder tests ─────────────────────────────────────
2487
2488    #[test]
2489    fn test_builder_route_id_sets_id() {
2490        let definition = RouteBuilder::from("timer:tick")
2491            .route_id("my-route")
2492            .build()
2493            .unwrap();
2494
2495        assert_eq!(definition.route_id(), "my-route");
2496    }
2497
2498    #[test]
2499    fn test_build_without_route_id_fails() {
2500        let result = RouteBuilder::from("timer:tick?period=1000")
2501            .to("log:info")
2502            .build();
2503        let err = match result {
2504            Err(e) => e.to_string(),
2505            Ok(_) => panic!("build() should fail without route_id"),
2506        };
2507        assert!(
2508            err.contains("route_id"),
2509            "error should mention route_id, got: {}",
2510            err
2511        );
2512    }
2513
2514    #[test]
2515    fn test_builder_empty_route_id_rejected() {
2516        let result = RouteBuilder::from("timer:tick").route_id("").build();
2517        let err = result.err().expect("empty route_id should be rejected");
2518        assert!(matches!(err, CamelError::RouteError(_)));
2519    }
2520
2521    #[test]
2522    fn test_builder_whitespace_route_id_rejected() {
2523        let result = RouteBuilder::from("timer:tick").route_id("   ").build();
2524        assert!(result.is_err());
2525    }
2526
2527    #[test]
2528    fn test_builder_auto_startup_false() {
2529        let definition = RouteBuilder::from("timer:tick")
2530            .route_id("test-route")
2531            .auto_startup(false)
2532            .build()
2533            .unwrap();
2534
2535        assert!(!definition.auto_startup());
2536    }
2537
2538    #[test]
2539    fn test_builder_startup_order_custom() {
2540        let definition = RouteBuilder::from("timer:tick")
2541            .route_id("test-route")
2542            .startup_order(50)
2543            .build()
2544            .unwrap();
2545
2546        assert_eq!(definition.startup_order(), 50);
2547    }
2548
2549    #[test]
2550    fn test_builder_defaults() {
2551        let definition = RouteBuilder::from("timer:tick")
2552            .route_id("test-route")
2553            .build()
2554            .unwrap();
2555
2556        assert_eq!(definition.route_id(), "test-route");
2557        assert!(definition.auto_startup());
2558        assert_eq!(definition.startup_order(), 1000);
2559    }
2560
2561    // ── Choice typestate tests ──────────────────────────────────────────────────
2562
2563    #[test]
2564    fn test_choice_builder_single_when() {
2565        let definition = RouteBuilder::from("timer:tick")
2566            .route_id("test-route")
2567            .choice()
2568            .when(|ex: &Exchange| ex.input.header("type").is_some())
2569            .to("mock:typed")
2570            .end_when()
2571            .end_choice()
2572            .build()
2573            .unwrap();
2574        assert_eq!(definition.steps().len(), 1);
2575        assert!(
2576            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2577            if whens.len() == 1 && otherwise.is_none())
2578        );
2579    }
2580
2581    #[test]
2582    fn test_choice_builder_when_otherwise() {
2583        let definition = RouteBuilder::from("timer:tick")
2584            .route_id("test-route")
2585            .choice()
2586            .when(|ex: &Exchange| ex.input.header("a").is_some())
2587            .to("mock:a")
2588            .end_when()
2589            .otherwise()
2590            .to("mock:fallback")
2591            .end_otherwise()
2592            .end_choice()
2593            .build()
2594            .unwrap();
2595        assert!(
2596            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2597            if whens.len() == 1 && otherwise.is_some())
2598        );
2599    }
2600
2601    #[test]
2602    fn test_choice_builder_multiple_whens() {
2603        let definition = RouteBuilder::from("timer:tick")
2604            .route_id("test-route")
2605            .choice()
2606            .when(|ex: &Exchange| ex.input.header("a").is_some())
2607            .to("mock:a")
2608            .end_when()
2609            .when(|ex: &Exchange| ex.input.header("b").is_some())
2610            .to("mock:b")
2611            .end_when()
2612            .end_choice()
2613            .build()
2614            .unwrap();
2615        assert!(
2616            matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2617            if whens.len() == 2)
2618        );
2619    }
2620
2621    #[test]
2622    fn test_choice_step_after_choice() {
2623        // Steps after end_choice() are added to the outer pipeline, not inside choice.
2624        let definition = RouteBuilder::from("timer:tick")
2625            .route_id("test-route")
2626            .choice()
2627            .when(|_ex: &Exchange| true)
2628            .to("mock:inner")
2629            .end_when()
2630            .end_choice()
2631            .to("mock:outer") // must be step[1], not inside choice
2632            .build()
2633            .unwrap();
2634        assert_eq!(definition.steps().len(), 2);
2635        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2636    }
2637
2638    // ── Throttle typestate tests ──────────────────────────────────────────────────
2639
2640    #[test]
2641    fn test_throttle_builder_typestate() {
2642        let definition = RouteBuilder::from("timer:tick")
2643            .route_id("test-route")
2644            .throttle(10, std::time::Duration::from_secs(1))
2645            .to("mock:result")
2646            .end_throttle()
2647            .build()
2648            .unwrap();
2649
2650        assert_eq!(definition.steps().len(), 1);
2651        assert!(matches!(
2652            &definition.steps()[0],
2653            BuilderStep::Throttle { .. }
2654        ));
2655    }
2656
2657    #[test]
2658    fn test_throttle_builder_with_strategy() {
2659        let definition = RouteBuilder::from("timer:tick")
2660            .route_id("test-route")
2661            .throttle(10, std::time::Duration::from_secs(1))
2662            .strategy(ThrottleStrategy::Reject)
2663            .to("mock:result")
2664            .end_throttle()
2665            .build()
2666            .unwrap();
2667
2668        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2669            assert_eq!(config.strategy, ThrottleStrategy::Reject);
2670        } else {
2671            panic!("Expected Throttle step");
2672        }
2673    }
2674
2675    #[test]
2676    fn test_throttle_builder_steps_collected() {
2677        let definition = RouteBuilder::from("timer:tick")
2678            .route_id("test-route")
2679            .throttle(5, std::time::Duration::from_secs(1))
2680            .set_header("throttled", Value::Bool(true))
2681            .to("mock:throttled")
2682            .end_throttle()
2683            .build()
2684            .unwrap();
2685
2686        match &definition.steps()[0] {
2687            BuilderStep::Throttle { steps, .. } => {
2688                assert_eq!(steps.len(), 2); // SetHeader + To
2689            }
2690            other => panic!("Expected Throttle, got {:?}", other),
2691        }
2692    }
2693
2694    #[test]
2695    fn test_throttle_step_after_throttle() {
2696        // Steps after end_throttle() are added to the outer pipeline, not inside throttle.
2697        let definition = RouteBuilder::from("timer:tick")
2698            .route_id("test-route")
2699            .throttle(10, std::time::Duration::from_secs(1))
2700            .to("mock:inner")
2701            .end_throttle()
2702            .to("mock:outer")
2703            .build()
2704            .unwrap();
2705
2706        assert_eq!(definition.steps().len(), 2);
2707        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2708    }
2709
2710    // ── LoadBalance typestate tests ──────────────────────────────────────────────────
2711
2712    #[test]
2713    fn test_load_balance_builder_typestate() {
2714        let definition = RouteBuilder::from("timer:tick")
2715            .route_id("test-route")
2716            .load_balance()
2717            .round_robin()
2718            .to("mock:a")
2719            .to("mock:b")
2720            .end_load_balance()
2721            .build()
2722            .unwrap();
2723
2724        assert_eq!(definition.steps().len(), 1);
2725        assert!(matches!(
2726            &definition.steps()[0],
2727            BuilderStep::LoadBalance { .. }
2728        ));
2729    }
2730
2731    #[test]
2732    fn test_load_balance_builder_with_strategy() {
2733        let definition = RouteBuilder::from("timer:tick")
2734            .route_id("test-route")
2735            .load_balance()
2736            .random()
2737            .to("mock:result")
2738            .end_load_balance()
2739            .build()
2740            .unwrap();
2741
2742        if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2743            assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2744        } else {
2745            panic!("Expected LoadBalance step");
2746        }
2747    }
2748
2749    #[test]
2750    fn test_load_balance_builder_steps_collected() {
2751        let definition = RouteBuilder::from("timer:tick")
2752            .route_id("test-route")
2753            .load_balance()
2754            .set_header("lb", Value::Bool(true))
2755            .to("mock:a")
2756            .end_load_balance()
2757            .build()
2758            .unwrap();
2759
2760        match &definition.steps()[0] {
2761            BuilderStep::LoadBalance { steps, .. } => {
2762                assert_eq!(steps.len(), 2); // SetHeader + To
2763            }
2764            other => panic!("Expected LoadBalance, got {:?}", other),
2765        }
2766    }
2767
2768    #[test]
2769    fn test_load_balance_step_after_load_balance() {
2770        // Steps after end_load_balance() are added to the outer pipeline, not inside load_balance.
2771        let definition = RouteBuilder::from("timer:tick")
2772            .route_id("test-route")
2773            .load_balance()
2774            .to("mock:inner")
2775            .end_load_balance()
2776            .to("mock:outer")
2777            .build()
2778            .unwrap();
2779
2780        assert_eq!(definition.steps().len(), 2);
2781        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2782    }
2783
2784    // ── DynamicRouter typestate tests ──────────────────────────────────────────────────
2785
2786    #[test]
2787    fn test_dynamic_router_builder() {
2788        let definition = RouteBuilder::from("timer:tick")
2789            .route_id("test-route")
2790            .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2791            .build()
2792            .unwrap();
2793
2794        assert_eq!(definition.steps().len(), 1);
2795        assert!(matches!(
2796            &definition.steps()[0],
2797            BuilderStep::DynamicRouter { .. }
2798        ));
2799    }
2800
2801    #[test]
2802    fn test_dynamic_router_builder_with_config() {
2803        let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2804            .max_iterations(100)
2805            .cache_size(500);
2806
2807        let definition = RouteBuilder::from("timer:tick")
2808            .route_id("test-route")
2809            .dynamic_router_with_config(config)
2810            .build()
2811            .unwrap();
2812
2813        assert_eq!(definition.steps().len(), 1);
2814        if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2815            assert_eq!(config.max_iterations, 100);
2816            assert_eq!(config.cache_size, 500);
2817        } else {
2818            panic!("Expected DynamicRouter step");
2819        }
2820    }
2821
2822    #[test]
2823    fn test_dynamic_router_step_after_router() {
2824        // Steps after dynamic_router() are added to the outer pipeline.
2825        let definition = RouteBuilder::from("timer:tick")
2826            .route_id("test-route")
2827            .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2828            .to("mock:outer")
2829            .build()
2830            .unwrap();
2831
2832        assert_eq!(definition.steps().len(), 2);
2833        assert!(matches!(
2834            &definition.steps()[0],
2835            BuilderStep::DynamicRouter { .. }
2836        ));
2837        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2838    }
2839
2840    #[test]
2841    fn routing_slip_builder_creates_step() {
2842        use camel_api::RoutingSlipExpression;
2843
2844        let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2845
2846        let route = RouteBuilder::from("direct:start")
2847            .route_id("routing-slip-test")
2848            .routing_slip(expression)
2849            .build()
2850            .unwrap();
2851
2852        assert!(
2853            matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2854            "Expected RoutingSlip step"
2855        );
2856    }
2857
2858    #[test]
2859    fn routing_slip_with_config_builder_creates_step() {
2860        use camel_api::RoutingSlipConfig;
2861
2862        let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2863            .uri_delimiter("|")
2864            .cache_size(50)
2865            .ignore_invalid_endpoints(true);
2866
2867        let route = RouteBuilder::from("direct:start")
2868            .route_id("routing-slip-config-test")
2869            .routing_slip_with_config(config)
2870            .build()
2871            .unwrap();
2872
2873        if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2874            assert_eq!(config.uri_delimiter, "|");
2875            assert_eq!(config.cache_size, 50);
2876            assert!(config.ignore_invalid_endpoints);
2877        } else {
2878            panic!("Expected RoutingSlip step");
2879        }
2880    }
2881
2882    #[test]
2883    fn test_builder_marshal_adds_processor_step() {
2884        let definition = RouteBuilder::from("timer:tick")
2885            .route_id("test-route")
2886            .marshal("json")
2887            .unwrap()
2888            .build()
2889            .unwrap();
2890        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2891    }
2892
2893    #[test]
2894    fn test_builder_unmarshal_adds_processor_step() {
2895        let definition = RouteBuilder::from("timer:tick")
2896            .route_id("test-route")
2897            .unmarshal("json")
2898            .unwrap()
2899            .build()
2900            .unwrap();
2901        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2902    }
2903
2904    #[test]
2905    fn test_builder_stream_cache_adds_processor_step() {
2906        let definition = RouteBuilder::from("timer:tick")
2907            .route_id("test-route")
2908            .stream_cache(1024)
2909            .build()
2910            .unwrap();
2911        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2912    }
2913
2914    #[test]
2915    fn validate_adds_to_step_with_validator_uri() {
2916        let def = RouteBuilder::from("direct:in")
2917            .route_id("test")
2918            .validate("schemas/order.xsd")
2919            .build()
2920            .unwrap();
2921        let steps = def.steps();
2922        assert_eq!(steps.len(), 1);
2923        assert!(
2924            matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2925            "got: {:?}",
2926            steps[0]
2927        );
2928    }
2929
2930    #[test]
2931    fn test_builder_marshal_returns_err_for_unknown_format() {
2932        let result = RouteBuilder::from("timer:tick")
2933            .route_id("test-route")
2934            .marshal("csv");
2935        let err = match result {
2936            Err(e) => e,
2937            Ok(_) => panic!("marshal with unknown format should return Err"),
2938        };
2939        let msg = err.to_string();
2940        assert!(
2941            msg.contains("unknown data format"),
2942            "error should mention unknown format, got: {msg}"
2943        );
2944        assert!(
2945            msg.contains("csv"),
2946            "error should mention format name, got: {msg}"
2947        );
2948    }
2949
2950    #[test]
2951    fn test_builder_unmarshal_returns_err_for_unknown_format() {
2952        let result = RouteBuilder::from("timer:tick")
2953            .route_id("test-route")
2954            .unmarshal("csv");
2955        let err = match result {
2956            Err(e) => e,
2957            Ok(_) => panic!("unmarshal with unknown format should return Err"),
2958        };
2959        let msg = err.to_string();
2960        assert!(
2961            msg.contains("unknown data format"),
2962            "error should mention unknown format, got: {msg}"
2963        );
2964        assert!(
2965            msg.contains("csv"),
2966            "error should mention format name, got: {msg}"
2967        );
2968    }
2969
2970    #[test]
2971    fn test_builder_recipient_list_creates_step() {
2972        let route = RouteBuilder::from("direct:start")
2973            .route_id("recipient-list-test")
2974            .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2975            .build()
2976            .unwrap();
2977
2978        assert!(matches!(
2979            &route.steps()[0],
2980            BuilderStep::RecipientList { .. }
2981        ));
2982    }
2983
2984    #[test]
2985    fn test_builder_recipient_list_with_config_creates_step() {
2986        let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2987
2988        let route = RouteBuilder::from("direct:start")
2989            .route_id("recipient-list-config-test")
2990            .recipient_list_with_config(config)
2991            .build()
2992            .unwrap();
2993
2994        assert!(matches!(
2995            &route.steps()[0],
2996            BuilderStep::RecipientList { .. }
2997        ));
2998    }
2999
3000    #[test]
3001    fn test_builder_script_adds_script_step() {
3002        let route = RouteBuilder::from("direct:start")
3003            .route_id("script-test")
3004            .script("rhai", "headers[\"x\"] = \"y\"")
3005            .build()
3006            .unwrap();
3007
3008        assert!(matches!(
3009            &route.steps()[0],
3010            BuilderStep::Script { language, script }
3011            if language == "rhai" && script == "headers[\"x\"] = \"y\""
3012        ));
3013    }
3014
3015    #[test]
3016    fn test_builder_delay_and_delay_with_header_add_steps() {
3017        let route = RouteBuilder::from("direct:start")
3018            .route_id("delay-test")
3019            .delay(Duration::from_millis(250))
3020            .delay_with_header(Duration::from_millis(500), "x-delay")
3021            .build()
3022            .unwrap();
3023
3024        assert_eq!(route.steps().len(), 2);
3025        assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
3026        assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
3027    }
3028
3029    #[test]
3030    fn test_builder_log_and_stop_add_steps_in_order() {
3031        let route = RouteBuilder::from("direct:start")
3032            .route_id("log-stop-test")
3033            .log("hello", LogLevel::Info)
3034            .stop()
3035            .to("mock:after")
3036            .build()
3037            .unwrap();
3038
3039        assert_eq!(route.steps().len(), 3);
3040        assert!(matches!(
3041            &route.steps()[0],
3042            BuilderStep::Log { message, .. } if message == "hello"
3043        ));
3044        assert!(matches!(&route.steps()[1], BuilderStep::Stop));
3045        assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
3046    }
3047
3048    #[test]
3049    fn test_builder_stream_cache_default_adds_processor_step() {
3050        let route = RouteBuilder::from("direct:start")
3051            .route_id("stream-cache-default-test")
3052            .stream_cache_default()
3053            .build()
3054            .unwrap();
3055
3056        assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
3057    }
3058
3059    #[test]
3060    fn test_validate_preserves_existing_validator_prefix() {
3061        let route = RouteBuilder::from("direct:in")
3062            .route_id("validate-prefix-test")
3063            .validate("validator:schemas/order.xsd")
3064            .build()
3065            .unwrap();
3066
3067        assert!(matches!(
3068            &route.steps()[0],
3069            BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
3070        ));
3071    }
3072
3073    #[test]
3074    fn test_load_balance_builder_weighted_failover_config() {
3075        let route = RouteBuilder::from("direct:start")
3076            .route_id("lb-weighted-failover")
3077            .load_balance()
3078            .weighted(vec![
3079                ("direct:a".to_string(), 3),
3080                ("direct:b".to_string(), 1),
3081            ])
3082            .failover()
3083            .to("mock:result")
3084            .end_load_balance()
3085            .build()
3086            .unwrap();
3087
3088        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3089            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3090        } else {
3091            panic!("Expected LoadBalance step");
3092        }
3093    }
3094
3095    #[test]
3096    fn test_multicast_builder_all_config_setters() {
3097        let route = RouteBuilder::from("direct:start")
3098            .route_id("multicast-config-test")
3099            .multicast()
3100            .parallel(true)
3101            .parallel_limit(4)
3102            .stop_on_exception(true)
3103            .timeout(Duration::from_millis(300))
3104            .aggregation(MulticastStrategy::Original)
3105            .to("mock:a")
3106            .end_multicast()
3107            .build()
3108            .unwrap();
3109
3110        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3111            assert!(config.parallel);
3112            assert_eq!(config.parallel_limit, Some(4));
3113            assert!(config.stop_on_exception);
3114            assert_eq!(config.timeout, Some(Duration::from_millis(300)));
3115            assert!(matches!(config.aggregation, MulticastStrategy::Original));
3116        } else {
3117            panic!("Expected Multicast step");
3118        }
3119    }
3120
3121    #[test]
3122    fn test_build_canonical_rejects_unsupported_processor_step() {
3123        let err = RouteBuilder::from("direct:start")
3124            .route_id("canonical-reject")
3125            .set_header("k", Value::String("v".into()))
3126            .build_canonical()
3127            .unwrap_err();
3128
3129        assert!(format!("{err}").contains("does not support step `processor`"));
3130    }
3131
3132    // ── LoadBalance strategy-specific tests ─────────────────────────────────────
3133
3134    #[test]
3135    fn test_load_balance_builder_weighted_strategy() {
3136        let route = RouteBuilder::from("direct:start")
3137            .route_id("lb-weighted")
3138            .load_balance()
3139            .weighted(vec![
3140                ("direct:a".to_string(), 5),
3141                ("direct:b".to_string(), 2),
3142                ("direct:c".to_string(), 1),
3143            ])
3144            .to("mock:result")
3145            .end_load_balance()
3146            .build()
3147            .unwrap();
3148
3149        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3150            assert!(matches!(config.strategy, LoadBalanceStrategy::Weighted(_)));
3151        } else {
3152            panic!("Expected LoadBalance step");
3153        }
3154    }
3155
3156    #[test]
3157    fn test_load_balance_builder_failover_strategy() {
3158        let route = RouteBuilder::from("direct:start")
3159            .route_id("lb-failover")
3160            .load_balance()
3161            .failover()
3162            .to("mock:primary")
3163            .end_load_balance()
3164            .build()
3165            .unwrap();
3166
3167        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3168            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3169        } else {
3170            panic!("Expected LoadBalance step");
3171        }
3172    }
3173
3174    // ── FilterInSplitBuilder tests ──────────────────────────────────────────────
3175
3176    #[test]
3177    fn test_filter_in_split_builder_typestate() {
3178        use camel_api::splitter::{SplitterConfig, split_body_lines};
3179
3180        let definition = RouteBuilder::from("timer:test")
3181            .route_id("filter-in-split")
3182            .split(SplitterConfig::new(split_body_lines()))
3183            .filter(|_ex| true)
3184            .to("mock:filtered")
3185            .end_filter()
3186            .end_split()
3187            .build()
3188            .unwrap();
3189
3190        assert_eq!(definition.steps().len(), 1);
3191        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3192            assert_eq!(steps.len(), 1);
3193            assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3194        } else {
3195            panic!("Expected Split step");
3196        }
3197    }
3198
3199    #[test]
3200    fn test_filter_in_split_builder_multiple_steps() {
3201        use camel_api::splitter::{SplitterConfig, split_body_lines};
3202
3203        let definition = RouteBuilder::from("timer:test")
3204            .route_id("filter-in-split-multi")
3205            .split(SplitterConfig::new(split_body_lines()))
3206            .to("mock:before-filter")
3207            .filter(|_ex| true)
3208            .to("mock:inside-filter")
3209            .end_filter()
3210            .to("mock:after-filter")
3211            .end_split()
3212            .build()
3213            .unwrap();
3214
3215        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3216            // To("before-filter") + Filter{...} + To("after-filter") = 3
3217            assert_eq!(steps.len(), 3);
3218        } else {
3219            panic!("Expected Split step");
3220        }
3221    }
3222
3223    // ── build_canonical tests ───────────────────────────────────────────────────
3224
3225    #[test]
3226    fn test_build_canonical_with_circuit_breaker() {
3227        use camel_api::circuit_breaker::CircuitBreakerConfig;
3228
3229        let spec = RouteBuilder::from("direct:start")
3230            .route_id("canonical-cb")
3231            .circuit_breaker(CircuitBreakerConfig::new().failure_threshold(10))
3232            .to("mock:result")
3233            .build_canonical()
3234            .unwrap();
3235
3236        let cb = spec.circuit_breaker.expect("circuit breaker should be set");
3237        assert_eq!(cb.failure_threshold, 10);
3238    }
3239
3240    #[test]
3241    fn test_build_canonical_rejects_custom_split_aggregation() {
3242        use camel_api::splitter::{SplitterConfig, split_body_lines};
3243
3244        let err = RouteBuilder::from("direct:start")
3245            .route_id("canonical-custom-split")
3246            .split(SplitterConfig::new(split_body_lines()).aggregation(
3247                camel_api::splitter::AggregationStrategy::Custom(Arc::new(|_, ex| ex)),
3248            ))
3249            .to("mock:frag")
3250            .end_split()
3251            .build_canonical()
3252            .unwrap_err();
3253
3254        // Split with closure-based expression is rejected in canonical v1.
3255        assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3256    }
3257
3258    #[test]
3259    fn test_build_canonical_rejects_custom_aggregate_strategy() {
3260        let err = RouteBuilder::from("direct:start")
3261            .route_id("canonical-custom-agg")
3262            .aggregate(
3263                AggregatorConfig::correlate_by("key")
3264                    .complete_when_size(2)
3265                    .strategy(AggregationStrategy::Custom(Arc::new(|_, ex| ex)))
3266                    .build()
3267                    .unwrap(),
3268            )
3269            .build_canonical()
3270            .unwrap_err();
3271
3272        assert!(format!("{err}").contains("custom aggregate strategy"));
3273    }
3274
3275    #[test]
3276    fn test_build_canonical_rejects_fn_correlation_strategy() {
3277        let err = RouteBuilder::from("direct:start")
3278            .route_id("canonical-fn-corr")
3279            .aggregate(AggregatorConfig {
3280                header_name: "key".to_string(),
3281                completion: CompletionMode::Single(CompletionCondition::Size(1)),
3282                correlation: CorrelationStrategy::Fn(Arc::new(|_| Some("key".to_string()))),
3283                strategy: AggregationStrategy::CollectAll,
3284                max_buckets: None,
3285                bucket_ttl: None,
3286                force_completion_on_stop: false,
3287                discard_on_timeout: false,
3288            })
3289            .build_canonical()
3290            .unwrap_err();
3291
3292        assert!(format!("{err}").contains("Fn correlation strategy"));
3293    }
3294
3295    #[test]
3296    fn test_build_canonical_rejects_predicate_completion() {
3297        let err = RouteBuilder::from("direct:start")
3298            .route_id("canonical-pred-completion")
3299            .aggregate(AggregatorConfig {
3300                header_name: "key".to_string(),
3301                completion: CompletionMode::Single(CompletionCondition::Predicate(Arc::new(
3302                    |_| false,
3303                ))),
3304                correlation: CorrelationStrategy::HeaderName("key".to_string()),
3305                strategy: AggregationStrategy::CollectAll,
3306                max_buckets: None,
3307                bucket_ttl: None,
3308                force_completion_on_stop: false,
3309                discard_on_timeout: false,
3310            })
3311            .build_canonical()
3312            .unwrap_err();
3313
3314        assert!(format!("{err}").contains("predicate completion"));
3315    }
3316
3317    #[test]
3318    fn test_build_canonical_with_expression_correlation() {
3319        let spec = RouteBuilder::from("direct:start")
3320            .route_id("canonical-expr-corr")
3321            .aggregate(AggregatorConfig {
3322                header_name: "key".to_string(),
3323                completion: CompletionMode::Single(CompletionCondition::Size(1)),
3324                correlation: CorrelationStrategy::Expression {
3325                    expr: "header.key".to_string(),
3326                    language: "simple".to_string(),
3327                },
3328                strategy: AggregationStrategy::CollectAll,
3329                max_buckets: None,
3330                bucket_ttl: None,
3331                force_completion_on_stop: false,
3332                discard_on_timeout: false,
3333            })
3334            .build_canonical()
3335            .unwrap();
3336
3337        assert!(spec.steps.iter().any(|s| matches!(s, CanonicalStepSpec::Aggregate(a) if a.correlation_key == Some("header.key".to_string()))));
3338    }
3339
3340    #[test]
3341    fn test_build_canonical_split_rejected_with_closure_expression() {
3342        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
3343
3344        // Builder-based split uses closure expressions, which are not serializable.
3345        let err = RouteBuilder::from("direct:start")
3346            .route_id("canonical-split-last")
3347            .split(
3348                SplitterConfig::new(split_body_lines()).aggregation(AggregationStrategy::LastWins),
3349            )
3350            .to("mock:frag")
3351            .end_split()
3352            .build_canonical()
3353            .unwrap_err();
3354
3355        assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3356    }
3357
3358    // ── OnExceptionBuilder full chain tests ─────────────────────────────────────
3359
3360    #[test]
3361    fn test_on_exception_full_chain_retry_backoff_jitter_handled_by() {
3362        let definition = RouteBuilder::from("direct:start")
3363            .route_id("on-exception-full")
3364            .dead_letter_channel("log:dlc")
3365            .on_exception(|e| matches!(e, CamelError::Io(_)))
3366            .retry(5)
3367            .with_backoff(Duration::from_millis(10), 2.0, Duration::from_millis(500))
3368            .with_jitter(0.3)
3369            .handled_by("log:io-handler")
3370            .end_on_exception()
3371            .to("mock:out")
3372            .build()
3373            .unwrap();
3374
3375        let cfg = definition
3376            .error_handler_config()
3377            .expect("error handler should be set");
3378        assert_eq!(cfg.policies.len(), 1);
3379        let policy = &cfg.policies[0];
3380        let retry = policy.retry.as_ref().expect("retry should be set");
3381        assert_eq!(retry.max_attempts, 5);
3382        assert_eq!(retry.initial_delay, Duration::from_millis(10));
3383        assert_eq!(retry.multiplier, 2.0);
3384        assert_eq!(retry.max_delay, Duration::from_millis(500));
3385        assert!((retry.jitter_factor - 0.3).abs() < f64::EPSILON);
3386        assert_eq!(policy.handled_by.as_deref(), Some("log:io-handler"));
3387    }
3388
3389    #[test]
3390    fn test_on_exception_jitter_clamped_to_valid_range() {
3391        let definition = RouteBuilder::from("direct:start")
3392            .route_id("jitter-clamp")
3393            .on_exception(|_e| true)
3394            .retry(1)
3395            .with_jitter(5.0)
3396            .end_on_exception()
3397            .to("mock:out")
3398            .build()
3399            .unwrap();
3400
3401        let cfg = definition.error_handler_config().unwrap();
3402        let retry = cfg.policies[0].retry.as_ref().unwrap();
3403        assert!((retry.jitter_factor - 1.0).abs() < f64::EPSILON);
3404    }
3405
3406    // ── StepAccumulator: process_fn, convert_body_to, bean ──────────────────────
3407
3408    #[test]
3409    fn test_builder_process_fn_adds_processor_step() {
3410        use camel_api::BoxProcessorExt;
3411        let processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
3412        let definition = RouteBuilder::from("timer:tick")
3413            .route_id("process-fn-test")
3414            .process_fn(processor)
3415            .build()
3416            .unwrap();
3417
3418        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3419    }
3420
3421    #[test]
3422    fn test_builder_convert_body_to_adds_processor_step() {
3423        let definition = RouteBuilder::from("timer:tick")
3424            .route_id("convert-body-test")
3425            .convert_body_to(BodyType::Json)
3426            .build()
3427            .unwrap();
3428
3429        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3430    }
3431
3432    #[test]
3433    fn test_builder_bean_adds_bean_step() {
3434        let definition = RouteBuilder::from("timer:tick")
3435            .route_id("bean-test")
3436            .bean("myBean", "process")
3437            .build()
3438            .unwrap();
3439
3440        assert!(
3441            matches!(&definition.steps()[0], BuilderStep::Bean { name, method }
3442            if name == "myBean" && method == "process")
3443        );
3444    }
3445
3446    // ── Throttle strategy-specific tests ────────────────────────────────────────
3447
3448    #[test]
3449    fn test_throttle_builder_delay_strategy() {
3450        let definition = RouteBuilder::from("timer:tick")
3451            .route_id("throttle-delay")
3452            .throttle(10, Duration::from_secs(1))
3453            .strategy(ThrottleStrategy::Delay)
3454            .to("mock:result")
3455            .end_throttle()
3456            .build()
3457            .unwrap();
3458
3459        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3460            assert_eq!(config.strategy, ThrottleStrategy::Delay);
3461        } else {
3462            panic!("Expected Throttle step");
3463        }
3464    }
3465
3466    #[test]
3467    fn test_throttle_builder_drop_strategy() {
3468        let definition = RouteBuilder::from("timer:tick")
3469            .route_id("throttle-drop")
3470            .throttle(10, Duration::from_secs(1))
3471            .strategy(ThrottleStrategy::Drop)
3472            .to("mock:result")
3473            .end_throttle()
3474            .build()
3475            .unwrap();
3476
3477        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3478            assert_eq!(config.strategy, ThrottleStrategy::Drop);
3479        } else {
3480            panic!("Expected Throttle step");
3481        }
3482    }
3483
3484    // ── LoopInLoopBuilder with loop_while ───────────────────────────────────────
3485
3486    #[test]
3487    fn test_nested_loop_while_builder() {
3488        use camel_api::loop_eip::LoopMode;
3489
3490        let def = RouteBuilder::from("direct:start")
3491            .route_id("nested-loop-while")
3492            .loop_count(2)
3493            .to("mock:outer")
3494            .loop_while(|_ex| true)
3495            .to("mock:inner")
3496            .end_loop()
3497            .end_loop()
3498            .build()
3499            .unwrap();
3500
3501        assert_eq!(def.steps().len(), 1);
3502        if let BuilderStep::Loop { steps, .. } = &def.steps()[0] {
3503            assert_eq!(steps.len(), 2);
3504            if let BuilderStep::Loop { config, .. } = &steps[1] {
3505                assert!(matches!(config.mode, LoopMode::While(_)));
3506            } else {
3507                panic!("Expected inner Loop step");
3508            }
3509        } else {
3510            panic!("Expected outer Loop step");
3511        }
3512    }
3513
3514    // ── Choice with multiple whens + otherwise ──────────────────────────────────
3515
3516    #[test]
3517    fn test_choice_builder_multiple_whens_with_otherwise() {
3518        let definition = RouteBuilder::from("timer:tick")
3519            .route_id("choice-multi-otherwise")
3520            .choice()
3521            .when(|ex: &Exchange| ex.input.header("a").is_some())
3522            .to("mock:a")
3523            .end_when()
3524            .when(|ex: &Exchange| ex.input.header("b").is_some())
3525            .to("mock:b")
3526            .end_when()
3527            .when(|ex: &Exchange| ex.input.header("c").is_some())
3528            .to("mock:c")
3529            .end_when()
3530            .otherwise()
3531            .to("mock:fallback")
3532            .end_otherwise()
3533            .end_choice()
3534            .build()
3535            .unwrap();
3536
3537        if let BuilderStep::Choice { whens, otherwise } = &definition.steps()[0] {
3538            assert_eq!(whens.len(), 3);
3539            assert!(otherwise.is_some());
3540            assert_eq!(otherwise.as_ref().unwrap().len(), 1);
3541        } else {
3542            panic!("Expected Choice step");
3543        }
3544    }
3545
3546    // ── Multicast individual config tests ───────────────────────────────────────
3547
3548    #[test]
3549    fn test_multicast_builder_parallel_only() {
3550        let route = RouteBuilder::from("direct:start")
3551            .route_id("multicast-parallel")
3552            .multicast()
3553            .parallel(true)
3554            .to("mock:a")
3555            .end_multicast()
3556            .build()
3557            .unwrap();
3558
3559        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3560            assert!(config.parallel);
3561            assert_eq!(config.parallel_limit, None);
3562        } else {
3563            panic!("Expected Multicast step");
3564        }
3565    }
3566
3567    #[test]
3568    fn test_multicast_builder_timeout_only() {
3569        let route = RouteBuilder::from("direct:start")
3570            .route_id("multicast-timeout")
3571            .multicast()
3572            .timeout(Duration::from_secs(5))
3573            .to("mock:a")
3574            .end_multicast()
3575            .build()
3576            .unwrap();
3577
3578        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3579            assert_eq!(config.timeout, Some(Duration::from_secs(5)));
3580        } else {
3581            panic!("Expected Multicast step");
3582        }
3583    }
3584
3585    #[test]
3586    fn test_multicast_builder_aggregation_collect_all() {
3587        let route = RouteBuilder::from("direct:start")
3588            .route_id("multicast-collect")
3589            .multicast()
3590            .aggregation(MulticastStrategy::CollectAll)
3591            .to("mock:a")
3592            .end_multicast()
3593            .build()
3594            .unwrap();
3595
3596        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3597            assert!(matches!(config.aggregation, MulticastStrategy::CollectAll));
3598        } else {
3599            panic!("Expected Multicast step");
3600        }
3601    }
3602
3603    // ── extract_completion_fields: Any mode with multiple conditions ────────────
3604
3605    #[test]
3606    fn test_build_canonical_aggregate_any_completion_mode() {
3607        let spec = RouteBuilder::from("direct:start")
3608            .route_id("canonical-any-completion")
3609            .aggregate(
3610                AggregatorConfig::correlate_by("key")
3611                    .complete_on_size_or_timeout(10, Duration::from_secs(30))
3612                    .build()
3613                    .unwrap(),
3614            )
3615            .build_canonical()
3616            .unwrap();
3617
3618        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3619            assert_eq!(agg.completion_size, Some(10));
3620            assert_eq!(agg.completion_timeout_ms, Some(30_000));
3621        } else {
3622            panic!("Expected Aggregate step");
3623        }
3624    }
3625
3626    #[test]
3627    fn test_build_canonical_aggregate_timeout_completion() {
3628        let spec = RouteBuilder::from("direct:start")
3629            .route_id("canonical-timeout-completion")
3630            .aggregate(
3631                AggregatorConfig::correlate_by("key")
3632                    .complete_on_timeout(Duration::from_millis(500))
3633                    .build()
3634                    .unwrap(),
3635            )
3636            .build_canonical()
3637            .unwrap();
3638
3639        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3640            assert_eq!(agg.completion_size, None);
3641            assert_eq!(agg.completion_timeout_ms, Some(500));
3642        } else {
3643            panic!("Expected Aggregate step");
3644        }
3645    }
3646
3647    // ── canonicalize_aggregate: discard_on_timeout and force_completion_on_stop ─
3648
3649    #[test]
3650    fn test_build_canonical_aggregate_discard_on_timeout() {
3651        use camel_api::aggregator::AggregatorConfig;
3652
3653        let spec = RouteBuilder::from("direct:start")
3654            .route_id("canonical-discard-timeout")
3655            .aggregate(
3656                AggregatorConfig::correlate_by("key")
3657                    .complete_when_size(1)
3658                    .discard_on_timeout(true)
3659                    .build()
3660                    .unwrap(),
3661            )
3662            .build_canonical()
3663            .unwrap();
3664
3665        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3666            assert_eq!(agg.discard_on_timeout, Some(true));
3667        } else {
3668            panic!("Expected Aggregate step");
3669        }
3670    }
3671
3672    #[test]
3673    fn test_build_canonical_aggregate_force_completion_on_stop() {
3674        use camel_api::aggregator::AggregatorConfig;
3675
3676        let spec = RouteBuilder::from("direct:start")
3677            .route_id("canonical-force-stop")
3678            .aggregate(
3679                AggregatorConfig::correlate_by("key")
3680                    .complete_when_size(1)
3681                    .force_completion_on_stop(true)
3682                    .build()
3683                    .unwrap(),
3684            )
3685            .build_canonical()
3686            .unwrap();
3687
3688        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3689            assert_eq!(agg.force_completion_on_stop, Some(true));
3690        } else {
3691            panic!("Expected Aggregate step");
3692        }
3693    }
3694
3695    // ── build_canonical: max_buckets and bucket_ttl ─────────────────────────────
3696
3697    #[test]
3698    fn test_build_canonical_aggregate_max_buckets_and_ttl() {
3699        use camel_api::aggregator::AggregatorConfig;
3700
3701        let spec = RouteBuilder::from("direct:start")
3702            .route_id("canonical-buckets-ttl")
3703            .aggregate(
3704                AggregatorConfig::correlate_by("key")
3705                    .complete_when_size(1)
3706                    .max_buckets(100)
3707                    .bucket_ttl(Duration::from_secs(60))
3708                    .build()
3709                    .unwrap(),
3710            )
3711            .build_canonical()
3712            .unwrap();
3713
3714        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3715            assert_eq!(agg.max_buckets, Some(100));
3716            assert_eq!(agg.bucket_ttl_ms, Some(60_000));
3717        } else {
3718            panic!("Expected Aggregate step");
3719        }
3720    }
3721
3722    // ── SplitBuilder with filter inside ─────────────────────────────────────────
3723
3724    #[test]
3725    fn test_split_builder_with_filter_inside() {
3726        use camel_api::splitter::{SplitterConfig, split_body_lines};
3727
3728        let definition = RouteBuilder::from("timer:test")
3729            .route_id("split-with-filter")
3730            .split(SplitterConfig::new(split_body_lines()))
3731            .filter(|_ex| true)
3732            .to("mock:filtered-frag")
3733            .end_filter()
3734            .end_split()
3735            .build()
3736            .unwrap();
3737
3738        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3739            assert_eq!(steps.len(), 1);
3740            assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3741        } else {
3742            panic!("Expected Split step");
3743        }
3744    }
3745
3746    // ── WireTap additional tests ────────────────────────────────────────────────
3747
3748    #[test]
3749    fn test_wire_tap_multiple_taps() {
3750        let definition = RouteBuilder::from("timer:tick")
3751            .route_id("multi-wire-tap")
3752            .wire_tap("mock:tap1")
3753            .wire_tap("mock:tap2")
3754            .to("mock:result")
3755            .build()
3756            .unwrap();
3757
3758        assert_eq!(definition.steps().len(), 3);
3759        assert!(
3760            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap1")
3761        );
3762        assert!(
3763            matches!(&definition.steps()[1], BuilderStep::WireTap { uri } if uri == "mock:tap2")
3764        );
3765    }
3766
3767    // ── Error handler: explicit config after shorthand → Mixed mode ─────────────
3768
3769    #[test]
3770    fn test_builder_shorthand_then_explicit_mixed_mode() {
3771        let result = RouteBuilder::from("direct:start")
3772            .route_id("mixed-mode-2")
3773            .dead_letter_channel("log:dlc")
3774            .error_handler(ErrorHandlerConfig::log_only())
3775            .to("mock:out")
3776            .build();
3777
3778        let err = result.err().expect("mixed mode should fail");
3779        assert!(format!("{err}").contains("mixed error handler modes"));
3780    }
3781
3782    // ── build_canonical: empty from_uri error ───────────────────────────────────
3783
3784    #[test]
3785    fn test_build_canonical_empty_from_uri_errors() {
3786        let result = RouteBuilder::from("").route_id("test").build_canonical();
3787        assert!(result.is_err());
3788    }
3789
3790    #[test]
3791    fn test_build_canonical_missing_route_id_errors() {
3792        let result = RouteBuilder::from("direct:start").build_canonical();
3793        assert!(result.is_err());
3794        let err = result.unwrap_err().to_string();
3795        assert!(err.contains("route_id"));
3796    }
3797
3798    // ── SplitBuilder: aggregate inside split ────────────────────────────────────
3799
3800    #[test]
3801    fn test_split_builder_with_aggregate_inside() {
3802        use camel_api::aggregator::AggregatorConfig;
3803        use camel_api::splitter::{SplitterConfig, split_body_lines};
3804
3805        let definition = RouteBuilder::from("timer:test")
3806            .route_id("split-agg")
3807            .split(SplitterConfig::new(split_body_lines()))
3808            .aggregate(
3809                AggregatorConfig::correlate_by("frag-key")
3810                    .complete_when_size(3)
3811                    .build()
3812                    .unwrap(),
3813            )
3814            .end_split()
3815            .build()
3816            .unwrap();
3817
3818        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3819            assert_eq!(steps.len(), 1);
3820            assert!(matches!(&steps[0], BuilderStep::Aggregate { .. }));
3821        } else {
3822            panic!("Expected Split step");
3823        }
3824    }
3825
3826    // ── Throttle: steps collected inside throttle scope ─────────────────────────
3827
3828    #[test]
3829    fn test_throttle_builder_with_steps_inside() {
3830        let definition = RouteBuilder::from("timer:tick")
3831            .route_id("throttle-steps")
3832            .throttle(10, Duration::from_secs(1))
3833            .set_header("throttled", Value::Bool(true))
3834            .to("mock:throttled")
3835            .end_throttle()
3836            .build()
3837            .unwrap();
3838
3839        if let BuilderStep::Throttle { steps, .. } = &definition.steps()[0] {
3840            assert_eq!(steps.len(), 2);
3841        } else {
3842            panic!("Expected Throttle step");
3843        }
3844    }
3845
3846    // ── LoadBalance: steps collected inside scope ───────────────────────────────
3847
3848    #[test]
3849    fn test_load_balance_builder_with_steps_inside() {
3850        let definition = RouteBuilder::from("timer:tick")
3851            .route_id("lb-steps")
3852            .load_balance()
3853            .round_robin()
3854            .set_header("lb", Value::Bool(true))
3855            .to("mock:lb")
3856            .end_load_balance()
3857            .build()
3858            .unwrap();
3859
3860        if let BuilderStep::LoadBalance { steps, .. } = &definition.steps()[0] {
3861            assert_eq!(steps.len(), 2);
3862        } else {
3863            panic!("Expected LoadBalance step");
3864        }
3865    }
3866
3867    // ── Multicast: steps collected inside scope ─────────────────────────────────
3868
3869    #[test]
3870    fn test_multicast_builder_with_steps_inside() {
3871        let definition = RouteBuilder::from("timer:tick")
3872            .route_id("multicast-steps")
3873            .multicast()
3874            .set_header("mc", Value::Bool(true))
3875            .to("mock:multicast")
3876            .end_multicast()
3877            .build()
3878            .unwrap();
3879
3880        if let BuilderStep::Multicast { steps, .. } = &definition.steps()[0] {
3881            assert_eq!(steps.len(), 2);
3882        } else {
3883            panic!("Expected Multicast step");
3884        }
3885    }
3886
3887    // ── LoopBuilder: steps collected inside loop scope ──────────────────────────
3888
3889    #[test]
3890    fn test_loop_builder_with_steps_inside() {
3891        let definition = RouteBuilder::from("timer:tick")
3892            .route_id("loop-steps")
3893            .loop_count(3)
3894            .set_header("loop", Value::Bool(true))
3895            .to("mock:loop")
3896            .end_loop()
3897            .build()
3898            .unwrap();
3899
3900        if let BuilderStep::Loop { steps, .. } = &definition.steps()[0] {
3901            assert_eq!(steps.len(), 2);
3902        } else {
3903            panic!("Expected Loop step");
3904        }
3905    }
3906
3907    // ── canonical_step_name coverage for remaining variants ─────────────────────
3908
3909    #[test]
3910    fn test_build_canonical_rejects_loop_step() {
3911        let err = RouteBuilder::from("direct:start")
3912            .route_id("canonical-loop")
3913            .loop_count(3)
3914            .to("mock:loop")
3915            .end_loop()
3916            .build_canonical()
3917            .unwrap_err();
3918
3919        assert!(format!("{err}").contains("does not support step `loop`"));
3920    }
3921
3922    #[test]
3923    fn test_build_canonical_rejects_multicast_step() {
3924        let err = RouteBuilder::from("direct:start")
3925            .route_id("canonical-multicast")
3926            .multicast()
3927            .to("mock:a")
3928            .end_multicast()
3929            .build_canonical()
3930            .unwrap_err();
3931
3932        assert!(format!("{err}").contains("does not support step `multicast`"));
3933    }
3934
3935    #[test]
3936    fn test_build_canonical_rejects_throttle_step() {
3937        let err = RouteBuilder::from("direct:start")
3938            .route_id("canonical-throttle")
3939            .throttle(10, Duration::from_secs(1))
3940            .to("mock:result")
3941            .end_throttle()
3942            .build_canonical()
3943            .unwrap_err();
3944
3945        assert!(format!("{err}").contains("does not support step `throttle`"));
3946    }
3947
3948    #[test]
3949    fn test_build_canonical_rejects_load_balancer_step() {
3950        let err = RouteBuilder::from("direct:start")
3951            .route_id("canonical-lb")
3952            .load_balance()
3953            .round_robin()
3954            .to("mock:result")
3955            .end_load_balance()
3956            .build_canonical()
3957            .unwrap_err();
3958
3959        assert!(format!("{err}").contains("does not support step `load_balancer`"));
3960    }
3961
3962    #[test]
3963    fn test_build_canonical_rejects_bean_step() {
3964        let err = RouteBuilder::from("direct:start")
3965            .route_id("canonical-bean")
3966            .bean("myBean", "process")
3967            .build_canonical()
3968            .unwrap_err();
3969
3970        assert!(format!("{err}").contains("does not support step `bean`"));
3971    }
3972
3973    #[test]
3974    fn test_build_canonical_rejects_script_step() {
3975        let err = RouteBuilder::from("direct:start")
3976            .route_id("canonical-script")
3977            .script("rhai", "x = 1")
3978            .build_canonical()
3979            .unwrap_err();
3980
3981        assert!(format!("{err}").contains("does not support step `script`"));
3982    }
3983
3984    #[test]
3985    fn test_build_canonical_accepts_delay_step() {
3986        let spec = RouteBuilder::from("direct:start")
3987            .route_id("canonical-delay")
3988            .delay(Duration::from_millis(100))
3989            .build_canonical()
3990            .unwrap();
3991
3992        assert!(
3993            spec.steps.iter().any(
3994                |s| matches!(s, CanonicalStepSpec::Delay { delay_ms, .. } if *delay_ms == 100)
3995            )
3996        );
3997    }
3998
3999    #[test]
4000    fn test_build_canonical_accepts_wire_tap_step() {
4001        let spec = RouteBuilder::from("direct:start")
4002            .route_id("canonical-wiretap")
4003            .wire_tap("mock:tap")
4004            .build_canonical()
4005            .unwrap();
4006
4007        assert!(
4008            spec.steps
4009                .iter()
4010                .any(|s| matches!(s, CanonicalStepSpec::WireTap { uri } if uri == "mock:tap"))
4011        );
4012    }
4013
4014    #[test]
4015    fn test_build_canonical_rejects_dynamic_router_step() {
4016        let err = RouteBuilder::from("direct:start")
4017            .route_id("canonical-dyn-router")
4018            .dynamic_router(Arc::new(|_| Some("mock:a".to_string())))
4019            .build_canonical()
4020            .unwrap_err();
4021
4022        assert!(format!("{err}").contains("does not support step `dynamic_router`"));
4023    }
4024
4025    #[test]
4026    fn test_build_canonical_rejects_routing_slip_step() {
4027        let err = RouteBuilder::from("direct:start")
4028            .route_id("canonical-routing-slip")
4029            .routing_slip(Arc::new(|_| Some("mock:a".to_string())))
4030            .build_canonical()
4031            .unwrap_err();
4032
4033        assert!(format!("{err}").contains("does not support step `routing_slip`"));
4034    }
4035
4036    #[test]
4037    fn test_build_canonical_rejects_recipient_list_step() {
4038        let err = RouteBuilder::from("direct:start")
4039            .route_id("canonical-recipient")
4040            .recipient_list(Arc::new(|_| "mock:a".to_string()))
4041            .build_canonical()
4042            .unwrap_err();
4043
4044        assert!(format!("{err}").contains("does not support step `recipient_list`"));
4045    }
4046
4047    // ── extract_completion_fields: Any mode with predicate → error ──────────────
4048
4049    #[test]
4050    fn test_build_canonical_rejects_any_mode_with_predicate() {
4051        let err = RouteBuilder::from("direct:start")
4052            .route_id("canonical-any-pred")
4053            .aggregate(AggregatorConfig {
4054                header_name: "key".to_string(),
4055                completion: CompletionMode::Any(vec![
4056                    CompletionCondition::Size(5),
4057                    CompletionCondition::Predicate(Arc::new(|_| false)),
4058                ]),
4059                correlation: CorrelationStrategy::HeaderName("key".to_string()),
4060                strategy: AggregationStrategy::CollectAll,
4061                max_buckets: None,
4062                bucket_ttl: None,
4063                force_completion_on_stop: false,
4064                discard_on_timeout: false,
4065            })
4066            .build_canonical()
4067            .unwrap_err();
4068
4069        assert!(format!("{err}").contains("predicate completion"));
4070    }
4071
4072    // ── BUILDER-004: Validation errors for missing required fields ────────────
4073
4074    #[test]
4075    fn test_builder_validation_missing_from_uri() {
4076        let result = RouteBuilder::from("")
4077            .route_id("missing-uri-route")
4078            .to("log:info")
4079            .build();
4080        assert!(result.is_err(), "empty from URI should fail validation");
4081        let err = result.err().unwrap().to_string();
4082        assert!(
4083            err.contains("'from'") || err.contains("URI"),
4084            "error should mention from/URI, got: {err}"
4085        );
4086    }
4087
4088    #[test]
4089    fn test_builder_validation_invalid_step_uri_scheme() {
4090        let result = RouteBuilder::from("timer:tick")
4091            .route_id("bad-step-route")
4092            .to("not-a-valid-uri") // no scheme
4093            .build();
4094        // The builder itself accepts any URI string; validation happens at
4095        // resolution time. Verify the build succeeds (step URI is deferred).
4096        assert!(
4097            result.is_ok(),
4098            "builder should accept opaque step URIs; resolution happens later"
4099        );
4100    }
4101
4102    // ── BUILDER-006: Duplicate route IDs ──────────────────────────────────────
4103
4104    #[test]
4105    fn test_builder_duplicate_route_ids_produce_identical_definitions() {
4106        // The builder itself doesn't check for duplicates (that's context-level).
4107        // Verify both builds succeed with the same ID — detection is TODO(BUILDER-006).
4108        let route1 = RouteBuilder::from("direct:a")
4109            .route_id("dup-route")
4110            .to("mock:out")
4111            .build();
4112        let route2 = RouteBuilder::from("direct:b")
4113            .route_id("dup-route")
4114            .to("mock:out")
4115            .build();
4116
4117        assert!(route1.is_ok());
4118        assert!(route2.is_ok());
4119        assert_eq!(route1.unwrap().route_id(), route2.unwrap().route_id());
4120    }
4121}