Skip to main content

camel_builder/
lib.rs

1use camel_api::DelayConfig;
2use camel_api::aggregator::{
3    AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
4};
5use camel_api::body::Body;
6use camel_api::body_converter::BodyType;
7use camel_api::circuit_breaker::CircuitBreakerConfig;
8use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
9use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
10use camel_api::load_balancer::LoadBalancerConfig;
11use camel_api::loop_eip::{LoopConfig, LoopMode};
12use camel_api::multicast::{MulticastConfig, MulticastStrategy};
13use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
14use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
15use camel_api::splitter::SplitterConfig;
16use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
17use camel_api::{
18    BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
19    ProcessorFn, Value,
20    runtime::{
21        CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
22        CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
23        CanonicalWhenSpec,
24    },
25};
26use camel_component_api::ConcurrencyModel;
27use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
28use camel_processor::{
29    ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
30    StreamCacheService, UnmarshalService, builtin_data_format,
31};
32
33/// Shared step-accumulation methods for all builder types.
34///
35/// Implementors provide `steps_mut()` and get step-adding methods for free.
36/// `filter()` and other branching methods are NOT included — they return
37/// different types per builder and stay as per-builder methods.
38pub trait StepAccumulator: Sized {
39    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
40
41    fn to(mut self, endpoint: impl Into<String>) -> Self {
42        self.steps_mut().push(BuilderStep::To(endpoint.into()));
43        self
44    }
45
46    fn process<F, Fut>(mut self, f: F) -> Self
47    where
48        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
49        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
50    {
51        let svc = ProcessorFn::new(f);
52        self.steps_mut()
53            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
54        self
55    }
56
57    fn process_fn(mut self, processor: BoxProcessor) -> Self {
58        self.steps_mut().push(BuilderStep::Processor(processor));
59        self
60    }
61
62    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
63        let svc = SetHeader::new(IdentityProcessor, key, value);
64        self.steps_mut()
65            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
66        self
67    }
68
69    fn map_body<F>(mut self, mapper: F) -> Self
70    where
71        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
72    {
73        let svc = MapBody::new(IdentityProcessor, mapper);
74        self.steps_mut()
75            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
76        self
77    }
78
79    fn set_body<B>(mut self, body: B) -> Self
80    where
81        B: Into<Body> + Clone + Send + Sync + 'static,
82    {
83        let body: Body = body.into();
84        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
85        self.steps_mut()
86            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
87        self
88    }
89
90    /// Apache Camel-compatible alias for [`set_body`](Self::set_body).
91    ///
92    /// Transforms the message body using the given value. Semantically identical
93    /// to `set_body` — provided for familiarity with Apache Camel route DSLs.
94    fn transform<B>(self, body: B) -> Self
95    where
96        B: Into<Body> + Clone + Send + Sync + 'static,
97    {
98        self.set_body(body)
99    }
100
101    fn set_body_fn<F>(mut self, expr: F) -> Self
102    where
103        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
104    {
105        let svc = SetBody::new(IdentityProcessor, expr);
106        self.steps_mut()
107            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
108        self
109    }
110
111    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
112    where
113        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
114    {
115        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
116        self.steps_mut()
117            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
118        self
119    }
120
121    fn aggregate(mut self, config: AggregatorConfig) -> Self {
122        self.steps_mut().push(BuilderStep::Aggregate { config });
123        self
124    }
125
126    /// Stop processing this exchange immediately. No further steps in the
127    /// current pipeline will run.
128    ///
129    /// Can be used at any point in the route: directly on RouteBuilder,
130    /// inside `.filter()`, inside `.split()`, etc.
131    fn stop(mut self) -> Self {
132        self.steps_mut().push(BuilderStep::Stop);
133        self
134    }
135
136    fn delay(mut self, duration: std::time::Duration) -> Self {
137        self.steps_mut().push(BuilderStep::Delay {
138            config: DelayConfig::from_duration(duration),
139        });
140        self
141    }
142
143    fn delay_with_header(
144        mut self,
145        duration: std::time::Duration,
146        header: impl Into<String>,
147    ) -> Self {
148        self.steps_mut().push(BuilderStep::Delay {
149            config: DelayConfig::from_duration_with_header(duration, header),
150        });
151        self
152    }
153
154    /// Log a message at the specified level.
155    ///
156    /// The message will be logged when an exchange passes through this step.
157    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
158        self.steps_mut().push(BuilderStep::Log {
159            level,
160            message: message.into(),
161        });
162        self
163    }
164
165    /// Convert the message body to the target type.
166    ///
167    /// Supported: Text ↔ Json ↔ Bytes. `Body::Stream` always fails.
168    /// Returns `TypeConversionFailed` if conversion is not possible.
169    ///
170    /// # Example
171    /// ```ignore
172    /// route.set_body(Value::String(r#"{"x":1}"#.into()))
173    ///      .convert_body_to(BodyType::Json)
174    ///      .to("direct:next")
175    /// ```
176    fn convert_body_to(mut self, target: BodyType) -> Self {
177        let svc = ConvertBodyTo::new(IdentityProcessor, target);
178        self.steps_mut()
179            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
180        self
181    }
182
183    fn stream_cache(mut self, threshold: usize) -> Self {
184        let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
185        let svc = StreamCacheService::new(IdentityProcessor, config);
186        self.steps_mut()
187            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
188        self
189    }
190
191    /// Materialize `Body::Stream` into `Body::Bytes` using the default threshold (128 KB).
192    ///
193    /// Equivalent to `.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)`.
194    fn stream_cache_default(self) -> Self {
195        self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
196    }
197
198    /// Marshal the message body using the specified data format.
199    ///
200    /// Supported formats: `"json"`, `"xml"`. Panics if the format name is unknown.
201    /// Converts a structured body (e.g., `Body::Json`) to a wire-format body (e.g., `Body::Text`).
202    ///
203    /// # Example
204    /// ```ignore
205    /// route.marshal("json").to("direct:next")
206    /// ```
207    fn marshal(mut self, format: impl Into<String>) -> Self {
208        let name = format.into();
209        let df =
210            builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
211        let svc = MarshalService::new(IdentityProcessor, df);
212        self.steps_mut()
213            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
214        self
215    }
216
217    /// Unmarshal the message body using the specified data format.
218    ///
219    /// Supported formats: `"json"`, `"xml"`. Panics if the format name is unknown.
220    /// Converts a wire-format body (e.g., `Body::Text`) to a structured body (e.g., `Body::Json`).
221    ///
222    /// # Example
223    /// ```ignore
224    /// route.unmarshal("json").to("direct:next")
225    /// ```
226    fn unmarshal(mut self, format: impl Into<String>) -> Self {
227        let name = format.into();
228        let df =
229            builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
230        let svc = UnmarshalService::new(IdentityProcessor, df);
231        self.steps_mut()
232            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
233        self
234    }
235
236    /// Validate the exchange body against a schema file.
237    ///
238    /// Shorthand for `to("validator:path")`. Supports XSD, JSON Schema, and YAML.
239    ///
240    /// # Example
241    /// ```ignore
242    /// route.validate("schemas/order.xsd").to("direct:out")
243    /// ```
244    fn validate(mut self, schema_path: impl Into<String>) -> Self {
245        let path = schema_path.into();
246        let uri = if path.starts_with("validator:") {
247            path
248        } else {
249            format!("validator:{path}")
250        };
251        self.steps_mut().push(BuilderStep::To(uri));
252        self
253    }
254
255    /// Execute a script that can modify the exchange (headers, properties, body).
256    ///
257    /// The script has access to `headers`, `properties`, and `body` variables
258    /// and can modify them with assignment syntax: `headers["k"] = v`.
259    ///
260    /// # Example
261    /// ```ignore
262    /// // ignore: requires full CamelContext setup with registered language
263    /// route.script("rhai", r#"headers["tenant"] = "acme"; body = body + "_processed""#)
264    /// ```
265    fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
266        self.steps_mut().push(BuilderStep::Script {
267            language: language.into(),
268            script: script.into(),
269        });
270        self
271    }
272
273    fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
274        self.steps_mut().push(BuilderStep::Bean {
275            name: name.into(),
276            method: method.into(),
277        });
278        self
279    }
280}
281
282/// A fluent builder for constructing routes.
283///
284/// # Example
285///
286/// ```ignore
287/// let definition = RouteBuilder::from("timer:tick?period=1000")
288///     .set_header("source", Value::String("timer".into()))
289///     .filter(|ex| ex.input.body.as_text().is_some())
290///     .to("log:info?showHeaders=true")
291///     .build()?;
292/// ```
293pub struct RouteBuilder {
294    from_uri: String,
295    steps: Vec<BuilderStep>,
296    error_handler: Option<ErrorHandlerConfig>,
297    error_handler_mode: ErrorHandlerMode,
298    circuit_breaker_config: Option<CircuitBreakerConfig>,
299    concurrency: Option<ConcurrencyModel>,
300    route_id: Option<String>,
301    auto_startup: Option<bool>,
302    startup_order: Option<i32>,
303}
304
305#[derive(Default)]
306enum ErrorHandlerMode {
307    #[default]
308    None,
309    ExplicitConfig,
310    Shorthand {
311        dlc_uri: Option<String>,
312        specs: Vec<OnExceptionSpec>,
313    },
314    Mixed,
315}
316
317#[derive(Clone)]
318struct OnExceptionSpec {
319    matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
320    retry: Option<RedeliveryPolicy>,
321    handled_by: Option<String>,
322}
323
324impl RouteBuilder {
325    /// Start building a route from the given source endpoint URI.
326    pub fn from(endpoint: &str) -> Self {
327        Self {
328            from_uri: endpoint.to_string(),
329            steps: Vec::new(),
330            error_handler: None,
331            error_handler_mode: ErrorHandlerMode::None,
332            circuit_breaker_config: None,
333            concurrency: None,
334            route_id: None,
335            auto_startup: None,
336            startup_order: None,
337        }
338    }
339
340    /// Open a filter scope. Only exchanges matching `predicate` will be processed
341    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
342    /// and continue to steps after `.end_filter()`.
343    pub fn filter<F>(self, predicate: F) -> FilterBuilder
344    where
345        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
346    {
347        FilterBuilder {
348            parent: self,
349            predicate: std::sync::Arc::new(predicate),
350            steps: vec![],
351        }
352    }
353
354    /// Open a choice scope for content-based routing.
355    ///
356    /// Within the choice, you can define multiple `.when()` clauses and an
357    /// optional `.otherwise()` clause. The first matching `when` predicate
358    /// determines which sub-pipeline executes.
359    pub fn choice(self) -> ChoiceBuilder {
360        ChoiceBuilder {
361            parent: self,
362            whens: vec![],
363            _otherwise: None,
364        }
365    }
366
367    /// Add a WireTap step that sends a clone of the exchange to the given
368    /// endpoint URI (fire-and-forget). The original exchange continues
369    /// downstream unchanged.
370    pub fn wire_tap(mut self, endpoint: &str) -> Self {
371        self.steps.push(BuilderStep::WireTap {
372            uri: endpoint.to_string(),
373        });
374        self
375    }
376
377    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
378    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
379        self.error_handler_mode = match self.error_handler_mode {
380            ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
381                ErrorHandlerMode::ExplicitConfig
382            }
383            ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
384        };
385        self.error_handler = Some(config);
386        self
387    }
388
389    /// Set a dead letter channel URI for shorthand error handler mode.
390    pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
391        let uri = uri.into();
392        self.error_handler_mode = match self.error_handler_mode {
393            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
394                dlc_uri: Some(uri),
395                specs: Vec::new(),
396            },
397            ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
398                dlc_uri: Some(uri),
399                specs,
400            },
401            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
402        };
403        self
404    }
405
406    /// Add a shorthand exception policy scope. Call `.end_on_exception()` to return to route builder.
407    pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
408    where
409        F: Fn(&CamelError) -> bool + Send + Sync + 'static,
410    {
411        self.error_handler_mode = match self.error_handler_mode {
412            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
413                dlc_uri: None,
414                specs: Vec::new(),
415            },
416            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
417            shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
418        };
419
420        OnExceptionBuilder {
421            parent: self,
422            policy: OnExceptionSpec {
423                matches: std::sync::Arc::new(matches),
424                retry: None,
425                handled_by: None,
426            },
427        }
428    }
429
430    /// Set a circuit breaker for this route.
431    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
432        self.circuit_breaker_config = Some(config);
433        self
434    }
435
436    /// Override the consumer's default concurrency model.
437    ///
438    /// When set, the pipeline spawns a task per exchange, processing them
439    /// concurrently. `max` limits the number of simultaneously active
440    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
441    ///
442    /// # Example
443    /// ```ignore
444    /// RouteBuilder::from("http://0.0.0.0:8080/api")
445    ///     .concurrent(16)  // max 16 in-flight pipeline executions
446    ///     .process(handle_request)
447    ///     .build()
448    /// ```
449    pub fn concurrent(mut self, max: usize) -> Self {
450        let max = if max == 0 { None } else { Some(max) };
451        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
452        self
453    }
454
455    /// Force sequential processing, overriding a concurrent-capable consumer.
456    ///
457    /// Useful for HTTP routes that mutate shared state and need ordering
458    /// guarantees.
459    pub fn sequential(mut self) -> Self {
460        self.concurrency = Some(ConcurrencyModel::Sequential);
461        self
462    }
463
464    /// Set the route ID for this route.
465    ///
466    /// If not set, the route will be assigned an auto-generated ID.
467    pub fn route_id(mut self, id: impl Into<String>) -> Self {
468        self.route_id = Some(id.into());
469        self
470    }
471
472    /// Set whether this route should automatically start when the context starts.
473    ///
474    /// Default is `true`.
475    pub fn auto_startup(mut self, auto: bool) -> Self {
476        self.auto_startup = Some(auto);
477        self
478    }
479
480    /// Set the startup order for this route.
481    ///
482    /// Routes with lower values start first. Default is 1000.
483    pub fn startup_order(mut self, order: i32) -> Self {
484        self.startup_order = Some(order);
485        self
486    }
487
488    /// Begin a Splitter sub-pipeline. Steps added after this call (until
489    /// `.end_split()`) will be executed per-fragment.
490    ///
491    /// Returns a `SplitBuilder` — you cannot call `.build()` until
492    /// `.end_split()` closes the split scope (enforced by the type system).
493    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
494        SplitBuilder {
495            parent: self,
496            config,
497            steps: Vec::new(),
498        }
499    }
500
501    /// Begin a Multicast sub-pipeline. Steps added after this call (until
502    /// `.end_multicast()`) will each receive a copy of the exchange.
503    ///
504    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
505    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
506    pub fn multicast(self) -> MulticastBuilder {
507        MulticastBuilder {
508            parent: self,
509            steps: Vec::new(),
510            config: MulticastConfig::new(),
511        }
512    }
513
514    /// Begin a Throttle sub-pipeline. Rate limits message processing to at most
515    /// `max_requests` per `period`. Steps inside the throttle scope are only
516    /// executed when the rate limit allows.
517    ///
518    /// Returns a `ThrottleBuilder` — you cannot call `.build()` until
519    /// `.end_throttle()` closes the throttle scope (enforced by the type system).
520    pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
521        ThrottleBuilder {
522            parent: self,
523            config: ThrottlerConfig::new(max_requests, period),
524            steps: Vec::new(),
525        }
526    }
527
528    /// Begin a Loop sub-pipeline that iterates a fixed number of times.
529    pub fn loop_count(self, count: usize) -> LoopBuilder {
530        LoopBuilder {
531            parent: self,
532            config: LoopConfig {
533                mode: LoopMode::Count(count),
534            },
535            steps: vec![],
536        }
537    }
538
539    /// Begin a Loop sub-pipeline that iterates while a predicate is true.
540    pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
541    where
542        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
543    {
544        LoopBuilder {
545            parent: self,
546            config: LoopConfig {
547                mode: LoopMode::While(std::sync::Arc::new(predicate)),
548            },
549            steps: vec![],
550        }
551    }
552
553    /// Begin a LoadBalance sub-pipeline. Distributes exchanges across multiple
554    /// endpoints using a configurable strategy (round-robin, random, weighted, failover).
555    ///
556    /// Returns a `LoadBalancerBuilder` — you cannot call `.build()` until
557    /// `.end_load_balance()` closes the load balance scope (enforced by the type system).
558    pub fn load_balance(self) -> LoadBalancerBuilder {
559        LoadBalancerBuilder {
560            parent: self,
561            config: LoadBalancerConfig::round_robin(),
562            steps: Vec::new(),
563        }
564    }
565
566    /// Add a dynamic router step that routes exchanges dynamically based on
567    /// expression evaluation at runtime.
568    ///
569    /// The expression receives the exchange and returns `Some(uri)` to route to
570    /// the next endpoint, or `None` to stop routing.
571    ///
572    /// # Example
573    /// ```ignore
574    /// RouteBuilder::from("timer:tick")
575    ///     .route_id("test-route")
576    ///     .dynamic_router(|ex| {
577    ///         ex.input.header("dest").and_then(|v| v.as_str().map(|s| s.to_string()))
578    ///     })
579    ///     .build()
580    /// ```
581    pub fn dynamic_router(self, expression: RouterExpression) -> Self {
582        self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
583    }
584
585    /// Add a dynamic router step with full configuration.
586    ///
587    /// Allows customization of URI delimiter, cache size, timeout, and other options.
588    pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
589        self.steps.push(BuilderStep::DynamicRouter { config });
590        self
591    }
592
593    pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
594        self.routing_slip_with_config(RoutingSlipConfig::new(expression))
595    }
596
597    pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
598        self.steps.push(BuilderStep::RoutingSlip { config });
599        self
600    }
601
602    pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
603        self.recipient_list_with_config(RecipientListConfig::new(expression))
604    }
605
606    pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
607        self.steps.push(BuilderStep::RecipientList { config });
608        self
609    }
610
611    /// Consume the builder and produce a [`RouteDefinition`].
612    pub fn build(self) -> Result<RouteDefinition, CamelError> {
613        if self.from_uri.is_empty() {
614            return Err(CamelError::RouteError(
615                "route must have a 'from' URI".to_string(),
616            ));
617        }
618        let route_id = self.route_id.ok_or_else(|| {
619            CamelError::RouteError(
620                "route must have a 'route_id' — call .route_id(\"name\") on the builder"
621                    .to_string(),
622            )
623        })?;
624        let resolved_error_handler = match self.error_handler_mode {
625            ErrorHandlerMode::None => self.error_handler,
626            ErrorHandlerMode::ExplicitConfig => self.error_handler,
627            ErrorHandlerMode::Mixed => {
628                return Err(CamelError::RouteError(
629                    "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
630                ));
631            }
632            ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
633                let mut config = if let Some(uri) = dlc_uri {
634                    ErrorHandlerConfig::dead_letter_channel(uri)
635                } else {
636                    ErrorHandlerConfig::log_only()
637                };
638
639                for spec in specs {
640                    let matcher = spec.matches.clone();
641                    let mut builder = config.on_exception(move |e| matcher(e));
642
643                    if let Some(retry) = spec.retry {
644                        builder = builder.retry(retry.max_attempts).with_backoff(
645                            retry.initial_delay,
646                            retry.multiplier,
647                            retry.max_delay,
648                        );
649                        if retry.jitter_factor > 0.0 {
650                            builder = builder.with_jitter(retry.jitter_factor);
651                        }
652                    }
653
654                    if let Some(uri) = spec.handled_by {
655                        builder = builder.handled_by(uri);
656                    }
657
658                    config = builder.build();
659                }
660
661                Some(config)
662            }
663        };
664
665        let definition = RouteDefinition::new(self.from_uri, self.steps);
666        let definition = if let Some(eh) = resolved_error_handler {
667            definition.with_error_handler(eh)
668        } else {
669            definition
670        };
671        let definition = if let Some(cb) = self.circuit_breaker_config {
672            definition.with_circuit_breaker(cb)
673        } else {
674            definition
675        };
676        let definition = if let Some(concurrency) = self.concurrency {
677            definition.with_concurrency(concurrency)
678        } else {
679            definition
680        };
681        let definition = definition.with_route_id(route_id);
682        let definition = if let Some(auto) = self.auto_startup {
683            definition.with_auto_startup(auto)
684        } else {
685            definition
686        };
687        let definition = if let Some(order) = self.startup_order {
688            definition.with_startup_order(order)
689        } else {
690            definition
691        };
692        Ok(definition)
693    }
694
695    /// Compile this builder route into canonical v1 spec.
696    pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
697        if self.from_uri.is_empty() {
698            return Err(CamelError::RouteError(
699                "route must have a 'from' URI".to_string(),
700            ));
701        }
702        let route_id = self.route_id.ok_or_else(|| {
703            CamelError::RouteError(
704                "route must have a 'route_id' — call .route_id(\"name\") on the builder"
705                    .to_string(),
706            )
707        })?;
708
709        let steps = canonicalize_steps(self.steps)?;
710        let circuit_breaker = self
711            .circuit_breaker_config
712            .map(canonicalize_circuit_breaker);
713
714        let spec = CanonicalRouteSpec {
715            route_id,
716            from: self.from_uri,
717            steps,
718            circuit_breaker,
719            version: camel_api::CANONICAL_CONTRACT_VERSION,
720        };
721        spec.validate_contract()?;
722        Ok(spec)
723    }
724}
725
726pub struct OnExceptionBuilder {
727    parent: RouteBuilder,
728    policy: OnExceptionSpec,
729}
730
731impl OnExceptionBuilder {
732    pub fn retry(mut self, max_attempts: u32) -> Self {
733        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
734        self
735    }
736
737    pub fn with_backoff(
738        mut self,
739        initial: std::time::Duration,
740        multiplier: f64,
741        max: std::time::Duration,
742    ) -> Self {
743        if let Some(ref mut retry) = self.policy.retry {
744            retry.initial_delay = initial;
745            retry.multiplier = multiplier;
746            retry.max_delay = max;
747        }
748        self
749    }
750
751    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
752        if let Some(ref mut retry) = self.policy.retry {
753            retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
754        }
755        self
756    }
757
758    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
759        self.policy.handled_by = Some(uri.into());
760        self
761    }
762
763    pub fn end_on_exception(mut self) -> RouteBuilder {
764        if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
765            specs.push(self.policy);
766        }
767        self.parent
768    }
769}
770
771fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
772    let mut canonical = Vec::with_capacity(steps.len());
773    for step in steps {
774        canonical.push(canonicalize_step(step)?);
775    }
776    Ok(canonical)
777}
778
779fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
780    match step {
781        BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
782        BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
783        BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
784        BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
785        BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
786            delay_ms: config.delay_ms,
787            dynamic_header: config.dynamic_header,
788        }),
789        BuilderStep::DeclarativeScript { expression } => {
790            Ok(CanonicalStepSpec::Script { expression })
791        }
792        BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
793            predicate,
794            steps: canonicalize_steps(steps)?,
795        }),
796        BuilderStep::DeclarativeChoice { whens, otherwise } => {
797            let mut canonical_whens = Vec::with_capacity(whens.len());
798            for DeclarativeWhenStep { predicate, steps } in whens {
799                canonical_whens.push(CanonicalWhenSpec {
800                    predicate,
801                    steps: canonicalize_steps(steps)?,
802                });
803            }
804            let otherwise = match otherwise {
805                Some(steps) => Some(canonicalize_steps(steps)?),
806                None => None,
807            };
808            Ok(CanonicalStepSpec::Choice {
809                whens: canonical_whens,
810                otherwise,
811            })
812        }
813        BuilderStep::DeclarativeSplit {
814            expression,
815            aggregation,
816            parallel,
817            parallel_limit,
818            stop_on_exception,
819            steps,
820        } => Ok(CanonicalStepSpec::Split {
821            expression: CanonicalSplitExpressionSpec::Language(expression),
822            aggregation: canonicalize_split_aggregation(aggregation)?,
823            parallel,
824            parallel_limit,
825            stop_on_exception,
826            steps: canonicalize_steps(steps)?,
827        }),
828        BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate {
829            config: canonicalize_aggregate(config)?,
830        }),
831        other => {
832            let step_name = canonical_step_name(&other);
833            let detail = camel_api::canonical_contract_rejection_reason(step_name)
834                .unwrap_or("not included in canonical v1");
835            Err(CamelError::RouteError(format!(
836                "canonical v1 does not support step `{step_name}`: {detail}"
837            )))
838        }
839    }
840}
841
842fn canonicalize_split_aggregation(
843    strategy: camel_api::splitter::AggregationStrategy,
844) -> Result<CanonicalSplitAggregationSpec, CamelError> {
845    match strategy {
846        camel_api::splitter::AggregationStrategy::LastWins => {
847            Ok(CanonicalSplitAggregationSpec::LastWins)
848        }
849        camel_api::splitter::AggregationStrategy::CollectAll => {
850            Ok(CanonicalSplitAggregationSpec::CollectAll)
851        }
852        camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
853            "canonical v1 does not support custom split aggregation".to_string(),
854        )),
855        camel_api::splitter::AggregationStrategy::Original => {
856            Ok(CanonicalSplitAggregationSpec::Original)
857        }
858    }
859}
860
861fn extract_completion_fields(
862    mode: &CompletionMode,
863) -> Result<(Option<usize>, Option<u64>), CamelError> {
864    match mode {
865        CompletionMode::Single(cond) => match cond {
866            CompletionCondition::Size(n) => Ok((Some(*n), None)),
867            CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
868            CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
869                "canonical v1 does not support aggregate predicate completion".to_string(),
870            )),
871        },
872        CompletionMode::Any(conds) => {
873            let mut size = None;
874            let mut timeout_ms = None;
875            for cond in conds {
876                match cond {
877                    CompletionCondition::Size(n) => size = Some(*n),
878                    CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
879                    CompletionCondition::Predicate(_) => {
880                        return Err(CamelError::RouteError(
881                            "canonical v1 does not support aggregate predicate completion"
882                                .to_string(),
883                        ));
884                    }
885                }
886            }
887            Ok((size, timeout_ms))
888        }
889    }
890}
891
892fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
893    let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
894
895    let header = match &config.correlation {
896        CorrelationStrategy::HeaderName(h) => h.clone(),
897        CorrelationStrategy::Expression { expr, .. } => expr.clone(),
898        CorrelationStrategy::Fn(_) => {
899            return Err(CamelError::RouteError(
900                "canonical v1 does not support Fn correlation strategy".to_string(),
901            ));
902        }
903    };
904
905    let correlation_key = match &config.correlation {
906        CorrelationStrategy::HeaderName(_) => None,
907        CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
908        CorrelationStrategy::Fn(_) => unreachable!(),
909    };
910
911    let strategy = match config.strategy {
912        AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
913        AggregationStrategy::Custom(_) => {
914            return Err(CamelError::RouteError(
915                "canonical v1 does not support custom aggregate strategy".to_string(),
916            ));
917        }
918    };
919    let bucket_ttl_ms = config
920        .bucket_ttl
921        .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
922
923    Ok(CanonicalAggregateSpec {
924        header,
925        completion_size,
926        completion_timeout_ms,
927        correlation_key,
928        force_completion_on_stop: if config.force_completion_on_stop {
929            Some(true)
930        } else {
931            None
932        },
933        discard_on_timeout: if config.discard_on_timeout {
934            Some(true)
935        } else {
936            None
937        },
938        strategy,
939        max_buckets: config.max_buckets,
940        bucket_ttl_ms,
941    })
942}
943
944fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
945    CanonicalCircuitBreakerSpec {
946        failure_threshold: config.failure_threshold,
947        open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
948    }
949}
950
951fn canonical_step_name(step: &BuilderStep) -> &'static str {
952    match step {
953        BuilderStep::Processor(_) => "processor",
954        BuilderStep::To(_) => "to",
955        BuilderStep::Stop => "stop",
956        BuilderStep::Log { .. } => "log",
957        BuilderStep::DeclarativeSetHeader { .. } => "set_header",
958        BuilderStep::DeclarativeSetBody { .. } => "set_body",
959        BuilderStep::DeclarativeFilter { .. } => "filter",
960        BuilderStep::DeclarativeChoice { .. } => "choice",
961        BuilderStep::DeclarativeScript { .. } => "script",
962        BuilderStep::DeclarativeSplit { .. } => "split",
963        BuilderStep::Split { .. } => "split",
964        BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
965        BuilderStep::Aggregate { .. } => "aggregate",
966        BuilderStep::Filter { .. } => "filter",
967        BuilderStep::Choice { .. } => "choice",
968        BuilderStep::WireTap { .. } => "wire_tap",
969        BuilderStep::Delay { .. } => "delay",
970        BuilderStep::Multicast { .. } => "multicast",
971        BuilderStep::DeclarativeLog { .. } => "log",
972        BuilderStep::Bean { .. } => "bean",
973        BuilderStep::Script { .. } => "script",
974        BuilderStep::Throttle { .. } => "throttle",
975        BuilderStep::LoadBalance { .. } => "load_balancer",
976        BuilderStep::DynamicRouter { .. } => "dynamic_router",
977        BuilderStep::RoutingSlip { .. } => "routing_slip",
978        BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
979        BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
980        BuilderStep::RecipientList { .. } => "recipient_list",
981        BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
982    }
983}
984
985impl StepAccumulator for RouteBuilder {
986    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
987        &mut self.steps
988    }
989}
990
991/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
992///
993/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
994/// but NOT `.build()` and NOT `.split()` (no nested splits).
995///
996/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
997/// and returns the parent `RouteBuilder`.
998pub struct SplitBuilder {
999    parent: RouteBuilder,
1000    config: SplitterConfig,
1001    steps: Vec<BuilderStep>,
1002}
1003
1004impl SplitBuilder {
1005    /// Open a filter scope within the split sub-pipeline.
1006    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1007    where
1008        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1009    {
1010        FilterInSplitBuilder {
1011            parent: self,
1012            predicate: std::sync::Arc::new(predicate),
1013            steps: vec![],
1014        }
1015    }
1016
1017    /// Close the split scope. Packages the accumulated sub-steps into a
1018    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
1019    pub fn end_split(mut self) -> RouteBuilder {
1020        let split_step = BuilderStep::Split {
1021            config: self.config,
1022            steps: self.steps,
1023        };
1024        self.parent.steps.push(split_step);
1025        self.parent
1026    }
1027}
1028
1029impl StepAccumulator for SplitBuilder {
1030    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1031        &mut self.steps
1032    }
1033}
1034
1035/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
1036pub struct FilterBuilder {
1037    parent: RouteBuilder,
1038    predicate: FilterPredicate,
1039    steps: Vec<BuilderStep>,
1040}
1041
1042impl FilterBuilder {
1043    /// Close the filter scope. Packages the accumulated sub-steps into a
1044    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
1045    pub fn end_filter(mut self) -> RouteBuilder {
1046        let step = BuilderStep::Filter {
1047            predicate: self.predicate,
1048            steps: self.steps,
1049        };
1050        self.parent.steps.push(step);
1051        self.parent
1052    }
1053}
1054
1055impl StepAccumulator for FilterBuilder {
1056    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1057        &mut self.steps
1058    }
1059}
1060
1061/// Builder for a filter scope nested inside a `.split()` block.
1062pub struct FilterInSplitBuilder {
1063    parent: SplitBuilder,
1064    predicate: FilterPredicate,
1065    steps: Vec<BuilderStep>,
1066}
1067
1068impl FilterInSplitBuilder {
1069    /// Close the filter scope and return the parent `SplitBuilder`.
1070    pub fn end_filter(mut self) -> SplitBuilder {
1071        let step = BuilderStep::Filter {
1072            predicate: self.predicate,
1073            steps: self.steps,
1074        };
1075        self.parent.steps.push(step);
1076        self.parent
1077    }
1078}
1079
1080impl StepAccumulator for FilterInSplitBuilder {
1081    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1082        &mut self.steps
1083    }
1084}
1085
1086// ── Choice/When/Otherwise builders ─────────────────────────────────────────
1087
1088/// Builder for a `.choice()` ... `.end_choice()` block.
1089///
1090/// Accumulates `when` clauses and an optional `otherwise` clause.
1091/// Cannot call `.build()` until `.end_choice()` is called.
1092pub struct ChoiceBuilder {
1093    parent: RouteBuilder,
1094    whens: Vec<WhenStep>,
1095    _otherwise: Option<Vec<BuilderStep>>,
1096}
1097
1098impl ChoiceBuilder {
1099    /// Open a `when` clause. Only exchanges matching `predicate` will be
1100    /// processed by the steps inside the `.when()` ... `.end_when()` scope.
1101    pub fn when<F>(self, predicate: F) -> WhenBuilder
1102    where
1103        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1104    {
1105        WhenBuilder {
1106            parent: self,
1107            predicate: std::sync::Arc::new(predicate),
1108            steps: vec![],
1109        }
1110    }
1111
1112    /// Open an `otherwise` clause. Executed when no `when` predicate matched.
1113    ///
1114    /// Only one `otherwise` is allowed per `choice`. Call this after all `.when()` clauses.
1115    pub fn otherwise(self) -> OtherwiseBuilder {
1116        OtherwiseBuilder {
1117            parent: self,
1118            steps: vec![],
1119        }
1120    }
1121
1122    /// Close the choice scope. Packages all accumulated `when` clauses and
1123    /// optional `otherwise` into a `BuilderStep::Choice` and returns the
1124    /// parent `RouteBuilder`.
1125    pub fn end_choice(mut self) -> RouteBuilder {
1126        let step = BuilderStep::Choice {
1127            whens: self.whens,
1128            otherwise: self._otherwise,
1129        };
1130        self.parent.steps.push(step);
1131        self.parent
1132    }
1133}
1134
1135/// Builder for the sub-pipeline within a `.when()` ... `.end_when()` block.
1136pub struct WhenBuilder {
1137    parent: ChoiceBuilder,
1138    predicate: camel_api::FilterPredicate,
1139    steps: Vec<BuilderStep>,
1140}
1141
1142impl WhenBuilder {
1143    /// Close the when scope. Packages the accumulated sub-steps into a
1144    /// `WhenStep` and returns the parent `ChoiceBuilder`.
1145    pub fn end_when(mut self) -> ChoiceBuilder {
1146        self.parent.whens.push(WhenStep {
1147            predicate: self.predicate,
1148            steps: self.steps,
1149        });
1150        self.parent
1151    }
1152}
1153
1154impl StepAccumulator for WhenBuilder {
1155    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1156        &mut self.steps
1157    }
1158}
1159
1160/// Builder for the sub-pipeline within an `.otherwise()` ... `.end_otherwise()` block.
1161pub struct OtherwiseBuilder {
1162    parent: ChoiceBuilder,
1163    steps: Vec<BuilderStep>,
1164}
1165
1166impl OtherwiseBuilder {
1167    /// Close the otherwise scope and return the parent `ChoiceBuilder`.
1168    pub fn end_otherwise(self) -> ChoiceBuilder {
1169        let OtherwiseBuilder { mut parent, steps } = self;
1170        parent._otherwise = Some(steps);
1171        parent
1172    }
1173}
1174
1175impl StepAccumulator for OtherwiseBuilder {
1176    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1177        &mut self.steps
1178    }
1179}
1180
1181/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
1182///
1183/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1184/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
1185///
1186/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
1187/// and returns the parent `RouteBuilder`.
1188pub struct MulticastBuilder {
1189    parent: RouteBuilder,
1190    steps: Vec<BuilderStep>,
1191    config: MulticastConfig,
1192}
1193
1194impl MulticastBuilder {
1195    pub fn parallel(mut self, parallel: bool) -> Self {
1196        self.config = self.config.parallel(parallel);
1197        self
1198    }
1199
1200    pub fn parallel_limit(mut self, limit: usize) -> Self {
1201        self.config = self.config.parallel_limit(limit);
1202        self
1203    }
1204
1205    pub fn stop_on_exception(mut self, stop: bool) -> Self {
1206        self.config = self.config.stop_on_exception(stop);
1207        self
1208    }
1209
1210    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1211        self.config = self.config.timeout(duration);
1212        self
1213    }
1214
1215    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1216        self.config = self.config.aggregation(strategy);
1217        self
1218    }
1219
1220    pub fn end_multicast(mut self) -> RouteBuilder {
1221        let step = BuilderStep::Multicast {
1222            steps: self.steps,
1223            config: self.config,
1224        };
1225        self.parent.steps.push(step);
1226        self.parent
1227    }
1228}
1229
1230impl StepAccumulator for MulticastBuilder {
1231    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1232        &mut self.steps
1233    }
1234}
1235
1236/// Builder for the sub-pipeline within a `.throttle()` ... `.end_throttle()` block.
1237///
1238/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1239/// but NOT `.build()` and NOT `.throttle()` (no nested throttles).
1240///
1241/// Calling `.end_throttle()` packages the sub-steps into a `BuilderStep::Throttle`
1242/// and returns the parent `RouteBuilder`.
1243pub struct ThrottleBuilder {
1244    parent: RouteBuilder,
1245    config: ThrottlerConfig,
1246    steps: Vec<BuilderStep>,
1247}
1248
1249impl ThrottleBuilder {
1250    /// Set the throttle strategy. Default is `Delay`.
1251    ///
1252    /// - `Delay`: Queue messages until capacity available
1253    /// - `Reject`: Return error immediately when throttled
1254    /// - `Drop`: Silently discard excess messages
1255    pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1256        self.config = self.config.strategy(strategy);
1257        self
1258    }
1259
1260    /// Close the throttle scope. Packages the accumulated sub-steps into a
1261    /// `BuilderStep::Throttle` and returns the parent `RouteBuilder`.
1262    pub fn end_throttle(mut self) -> RouteBuilder {
1263        let step = BuilderStep::Throttle {
1264            config: self.config,
1265            steps: self.steps,
1266        };
1267        self.parent.steps.push(step);
1268        self.parent
1269    }
1270}
1271
1272impl StepAccumulator for ThrottleBuilder {
1273    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1274        &mut self.steps
1275    }
1276}
1277
1278/// Builder for the sub-pipeline within a `.loop_count()` / `.loop_while()` ... `.end_loop()` block.
1279pub struct LoopBuilder {
1280    parent: RouteBuilder,
1281    config: LoopConfig,
1282    steps: Vec<BuilderStep>,
1283}
1284
1285impl LoopBuilder {
1286    pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1287        LoopInLoopBuilder {
1288            parent: self,
1289            config: LoopConfig {
1290                mode: LoopMode::Count(count),
1291            },
1292            steps: vec![],
1293        }
1294    }
1295
1296    pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1297    where
1298        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1299    {
1300        LoopInLoopBuilder {
1301            parent: self,
1302            config: LoopConfig {
1303                mode: LoopMode::While(std::sync::Arc::new(predicate)),
1304            },
1305            steps: vec![],
1306        }
1307    }
1308
1309    pub fn end_loop(mut self) -> RouteBuilder {
1310        let step = BuilderStep::Loop {
1311            config: self.config,
1312            steps: self.steps,
1313        };
1314        self.parent.steps.push(step);
1315        self.parent
1316    }
1317}
1318
1319impl StepAccumulator for LoopBuilder {
1320    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1321        &mut self.steps
1322    }
1323}
1324
1325pub struct LoopInLoopBuilder {
1326    parent: LoopBuilder,
1327    config: LoopConfig,
1328    steps: Vec<BuilderStep>,
1329}
1330
1331impl LoopInLoopBuilder {
1332    pub fn end_loop(mut self) -> LoopBuilder {
1333        let step = BuilderStep::Loop {
1334            config: self.config,
1335            steps: self.steps,
1336        };
1337        self.parent.steps.push(step);
1338        self.parent
1339    }
1340}
1341
1342impl StepAccumulator for LoopInLoopBuilder {
1343    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1344        &mut self.steps
1345    }
1346}
1347
1348/// Builder for the sub-pipeline within a `.load_balance()` ... `.end_load_balance()` block.
1349///
1350/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1351/// but NOT `.build()` and NOT `.load_balance()` (no nested load balancers).
1352///
1353/// Calling `.end_load_balance()` packages the sub-steps into a `BuilderStep::LoadBalance`
1354/// and returns the parent `RouteBuilder`.
1355pub struct LoadBalancerBuilder {
1356    parent: RouteBuilder,
1357    config: LoadBalancerConfig,
1358    steps: Vec<BuilderStep>,
1359}
1360
1361impl LoadBalancerBuilder {
1362    /// Set the load balance strategy to round-robin (default).
1363    pub fn round_robin(mut self) -> Self {
1364        self.config = LoadBalancerConfig::round_robin();
1365        self
1366    }
1367
1368    /// Set the load balance strategy to random selection.
1369    pub fn random(mut self) -> Self {
1370        self.config = LoadBalancerConfig::random();
1371        self
1372    }
1373
1374    /// Set the load balance strategy to weighted selection.
1375    ///
1376    /// Each endpoint is assigned a weight that determines its probability
1377    /// of being selected.
1378    pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1379        self.config = LoadBalancerConfig::weighted(weights);
1380        self
1381    }
1382
1383    /// Set the load balance strategy to failover.
1384    ///
1385    /// Exchanges are sent to the first endpoint; on failure, the next endpoint
1386    /// is tried.
1387    pub fn failover(mut self) -> Self {
1388        self.config = LoadBalancerConfig::failover();
1389        self
1390    }
1391
1392    /// Enable or disable parallel execution of endpoints.
1393    ///
1394    /// When enabled, all endpoints receive the exchange simultaneously.
1395    /// When disabled (default), only one endpoint is selected per exchange.
1396    pub fn parallel(mut self, parallel: bool) -> Self {
1397        self.config = self.config.parallel(parallel);
1398        self
1399    }
1400
1401    /// Close the load balance scope. Packages the accumulated sub-steps into a
1402    /// `BuilderStep::LoadBalance` and returns the parent `RouteBuilder`.
1403    pub fn end_load_balance(mut self) -> RouteBuilder {
1404        let step = BuilderStep::LoadBalance {
1405            config: self.config,
1406            steps: self.steps,
1407        };
1408        self.parent.steps.push(step);
1409        self.parent
1410    }
1411}
1412
1413impl StepAccumulator for LoadBalancerBuilder {
1414    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1415        &mut self.steps
1416    }
1417}
1418
1419// ---------------------------------------------------------------------------
1420// Tests
1421// ---------------------------------------------------------------------------
1422
1423#[cfg(test)]
1424mod tests {
1425    use super::*;
1426    use camel_api::error_handler::ErrorHandlerConfig;
1427    use camel_api::load_balancer::LoadBalanceStrategy;
1428    use camel_api::{Exchange, Message};
1429    use camel_core::route::BuilderStep;
1430    use std::sync::Arc;
1431    use std::time::Duration;
1432    use tower::{Service, ServiceExt};
1433
1434    #[test]
1435    fn test_builder_from_creates_definition() {
1436        let definition = RouteBuilder::from("timer:tick")
1437            .route_id("test-route")
1438            .build()
1439            .unwrap();
1440        assert_eq!(definition.from_uri(), "timer:tick");
1441    }
1442
1443    #[test]
1444    fn test_builder_empty_from_uri_errors() {
1445        let result = RouteBuilder::from("").route_id("test-route").build();
1446        assert!(result.is_err());
1447    }
1448
1449    #[test]
1450    fn test_builder_to_adds_step() {
1451        let definition = RouteBuilder::from("timer:tick")
1452            .route_id("test-route")
1453            .to("log:info")
1454            .build()
1455            .unwrap();
1456
1457        assert_eq!(definition.from_uri(), "timer:tick");
1458        // We can verify steps were added by checking the structure
1459        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1460    }
1461
1462    #[test]
1463    fn test_builder_filter_adds_filter_step() {
1464        let definition = RouteBuilder::from("timer:tick")
1465            .route_id("test-route")
1466            .filter(|_ex| true)
1467            .to("mock:result")
1468            .end_filter()
1469            .build()
1470            .unwrap();
1471
1472        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1473    }
1474
1475    #[test]
1476    fn test_builder_set_header_adds_processor_step() {
1477        let definition = RouteBuilder::from("timer:tick")
1478            .route_id("test-route")
1479            .set_header("key", Value::String("value".into()))
1480            .build()
1481            .unwrap();
1482
1483        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1484    }
1485
1486    #[test]
1487    fn test_builder_map_body_adds_processor_step() {
1488        let definition = RouteBuilder::from("timer:tick")
1489            .route_id("test-route")
1490            .map_body(|body| body)
1491            .build()
1492            .unwrap();
1493
1494        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1495    }
1496
1497    #[test]
1498    fn test_builder_process_adds_processor_step() {
1499        let definition = RouteBuilder::from("timer:tick")
1500            .route_id("test-route")
1501            .process(|ex| async move { Ok(ex) })
1502            .build()
1503            .unwrap();
1504
1505        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1506    }
1507
1508    #[test]
1509    fn test_builder_chain_multiple_steps() {
1510        let definition = RouteBuilder::from("timer:tick")
1511            .route_id("test-route")
1512            .set_header("source", Value::String("timer".into()))
1513            .filter(|ex| ex.input.header("source").is_some())
1514            .to("log:info")
1515            .end_filter()
1516            .to("mock:result")
1517            .build()
1518            .unwrap();
1519
1520        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
1521        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
1522        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
1523        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1524    }
1525
1526    #[test]
1527    fn test_loop_count_builder() {
1528        use camel_api::loop_eip::LoopMode;
1529
1530        let def = RouteBuilder::from("direct:start")
1531            .route_id("loop-test")
1532            .loop_count(3)
1533            .to("mock:inside")
1534            .end_loop()
1535            .to("mock:after")
1536            .build()
1537            .unwrap();
1538
1539        assert_eq!(def.steps().len(), 2);
1540        match &def.steps()[0] {
1541            BuilderStep::Loop { config, steps } => {
1542                assert!(matches!(config.mode, LoopMode::Count(3)));
1543                assert_eq!(steps.len(), 1);
1544            }
1545            other => panic!("Expected Loop, got {:?}", other),
1546        }
1547        assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1548    }
1549
1550    #[test]
1551    fn test_loop_while_builder() {
1552        use camel_api::loop_eip::LoopMode;
1553
1554        let def = RouteBuilder::from("direct:start")
1555            .route_id("loop-while-test")
1556            .loop_while(|_ex| true)
1557            .to("mock:retry")
1558            .end_loop()
1559            .build()
1560            .unwrap();
1561
1562        assert_eq!(def.steps().len(), 1);
1563        match &def.steps()[0] {
1564            BuilderStep::Loop { config, steps } => {
1565                assert!(matches!(config.mode, LoopMode::While(_)));
1566                assert_eq!(steps.len(), 1);
1567            }
1568            other => panic!("Expected Loop, got {:?}", other),
1569        }
1570    }
1571
1572    #[test]
1573    fn test_nested_loop_builder() {
1574        use camel_api::loop_eip::LoopMode;
1575
1576        let def = RouteBuilder::from("direct:start")
1577            .route_id("nested-loop-test")
1578            .loop_count(2)
1579            .to("mock:outer")
1580            .loop_count(3)
1581            .to("mock:inner")
1582            .end_loop()
1583            .end_loop()
1584            .to("mock:after")
1585            .build()
1586            .unwrap();
1587
1588        assert_eq!(def.steps().len(), 2);
1589        match &def.steps()[0] {
1590            BuilderStep::Loop { steps, .. } => {
1591                assert_eq!(steps.len(), 2);
1592                match &steps[1] {
1593                    BuilderStep::Loop {
1594                        config,
1595                        steps: inner_steps,
1596                    } => {
1597                        assert!(matches!(config.mode, LoopMode::Count(3)));
1598                        assert_eq!(inner_steps.len(), 1);
1599                    }
1600                    other => panic!("Expected nested Loop, got {:?}", other),
1601                }
1602            }
1603            other => panic!("Expected outer Loop, got {:?}", other),
1604        }
1605    }
1606
1607    // -----------------------------------------------------------------------
1608    // Processor behavior tests — exercise the real Tower services directly
1609    // -----------------------------------------------------------------------
1610
1611    #[tokio::test]
1612    async fn test_set_header_processor_works() {
1613        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1614        let exchange = Exchange::new(Message::new("test"));
1615        let result = svc.call(exchange).await.unwrap();
1616        assert_eq!(
1617            result.input.header("greeting"),
1618            Some(&Value::String("hello".into()))
1619        );
1620    }
1621
1622    #[tokio::test]
1623    async fn test_filter_processor_passes() {
1624        use camel_api::BoxProcessorExt;
1625        use camel_processor::FilterService;
1626
1627        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1628        let mut svc =
1629            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1630        let exchange = Exchange::new(Message::new("pass"));
1631        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1632        assert_eq!(result.input.body.as_text(), Some("pass"));
1633    }
1634
1635    #[tokio::test]
1636    async fn test_filter_processor_blocks() {
1637        use camel_api::BoxProcessorExt;
1638        use camel_processor::FilterService;
1639
1640        let sub = BoxProcessor::from_fn(|_ex| {
1641            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1642        });
1643        let mut svc =
1644            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1645        let exchange = Exchange::new(Message::new("reject"));
1646        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1647        assert_eq!(result.input.body.as_text(), Some("reject"));
1648    }
1649
1650    #[tokio::test]
1651    async fn test_map_body_processor_works() {
1652        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1653            if let Some(text) = body.as_text() {
1654                Body::Text(text.to_uppercase())
1655            } else {
1656                body
1657            }
1658        });
1659        let exchange = Exchange::new(Message::new("hello"));
1660        let result = mapper.oneshot(exchange).await.unwrap();
1661        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1662    }
1663
1664    #[tokio::test]
1665    async fn test_process_custom_processor_works() {
1666        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1667            ex.set_property("custom", Value::Bool(true));
1668            Ok(ex)
1669        });
1670        let exchange = Exchange::new(Message::default());
1671        let result = processor.oneshot(exchange).await.unwrap();
1672        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1673    }
1674
1675    // -----------------------------------------------------------------------
1676    // Sequential pipeline test
1677    // -----------------------------------------------------------------------
1678
1679    #[tokio::test]
1680    async fn test_compose_pipeline_runs_steps_in_order() {
1681        use camel_core::route::compose_pipeline;
1682
1683        let processors = vec![
1684            BoxProcessor::new(SetHeader::new(
1685                IdentityProcessor,
1686                "step",
1687                Value::String("one".into()),
1688            )),
1689            BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1690                if let Some(text) = body.as_text() {
1691                    Body::Text(format!("{}-processed", text))
1692                } else {
1693                    body
1694                }
1695            })),
1696        ];
1697
1698        let pipeline = compose_pipeline(processors);
1699        let exchange = Exchange::new(Message::new("hello"));
1700        let result = pipeline.oneshot(exchange).await.unwrap();
1701
1702        assert_eq!(
1703            result.input.header("step"),
1704            Some(&Value::String("one".into()))
1705        );
1706        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1707    }
1708
1709    #[tokio::test]
1710    async fn test_compose_pipeline_empty_is_identity() {
1711        use camel_core::route::compose_pipeline;
1712
1713        let pipeline = compose_pipeline(vec![]);
1714        let exchange = Exchange::new(Message::new("unchanged"));
1715        let result = pipeline.oneshot(exchange).await.unwrap();
1716        assert_eq!(result.input.body.as_text(), Some("unchanged"));
1717    }
1718
1719    // -----------------------------------------------------------------------
1720    // Circuit breaker builder tests
1721    // -----------------------------------------------------------------------
1722
1723    #[test]
1724    fn test_builder_circuit_breaker_sets_config() {
1725        use camel_api::circuit_breaker::CircuitBreakerConfig;
1726
1727        let config = CircuitBreakerConfig::new().failure_threshold(5);
1728        let definition = RouteBuilder::from("timer:tick")
1729            .route_id("test-route")
1730            .circuit_breaker(config)
1731            .build()
1732            .unwrap();
1733
1734        let cb = definition
1735            .circuit_breaker_config()
1736            .expect("circuit breaker should be set");
1737        assert_eq!(cb.failure_threshold, 5);
1738    }
1739
1740    #[test]
1741    fn test_builder_circuit_breaker_with_error_handler() {
1742        use camel_api::circuit_breaker::CircuitBreakerConfig;
1743        use camel_api::error_handler::ErrorHandlerConfig;
1744
1745        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1746        let eh_config = ErrorHandlerConfig::log_only();
1747
1748        let definition = RouteBuilder::from("timer:tick")
1749            .route_id("test-route")
1750            .to("log:info")
1751            .circuit_breaker(cb_config)
1752            .error_handler(eh_config)
1753            .build()
1754            .unwrap();
1755
1756        assert!(
1757            definition.circuit_breaker_config().is_some(),
1758            "circuit breaker config should be set"
1759        );
1760        // Route definition was built successfully with both configs.
1761    }
1762
1763    #[test]
1764    fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1765        let definition = RouteBuilder::from("direct:start")
1766            .route_id("test-route")
1767            .dead_letter_channel("log:dlc")
1768            .on_exception(|e| matches!(e, CamelError::Io(_)))
1769            .retry(3)
1770            .handled_by("log:io")
1771            .end_on_exception()
1772            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1773            .retry(1)
1774            .end_on_exception()
1775            .to("mock:out")
1776            .build()
1777            .expect("route should build");
1778
1779        let cfg = definition
1780            .error_handler_config()
1781            .expect("error handler should be set");
1782        assert_eq!(cfg.policies.len(), 2);
1783        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1784        assert_eq!(
1785            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1786            Some(3)
1787        );
1788        assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1789        assert_eq!(
1790            cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1791            Some(1)
1792        );
1793    }
1794
1795    #[test]
1796    fn test_builder_on_exception_mixed_mode_rejected() {
1797        let result = RouteBuilder::from("direct:start")
1798            .route_id("test-route")
1799            .error_handler(ErrorHandlerConfig::log_only())
1800            .on_exception(|_e| true)
1801            .end_on_exception()
1802            .to("mock:out")
1803            .build();
1804
1805        let err = result.err().expect("mixed mode should fail with an error");
1806
1807        assert!(
1808            format!("{err}").contains("mixed error handler modes"),
1809            "unexpected error: {err}"
1810        );
1811    }
1812
1813    #[test]
1814    fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1815        let definition = RouteBuilder::from("direct:start")
1816            .route_id("test-route")
1817            .on_exception(|_e| true)
1818            .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1819            .with_jitter(0.5)
1820            .end_on_exception()
1821            .to("mock:out")
1822            .build()
1823            .expect("route should build");
1824
1825        let cfg = definition
1826            .error_handler_config()
1827            .expect("error handler should be set");
1828        assert_eq!(cfg.policies.len(), 1);
1829        assert!(cfg.policies[0].retry.is_none());
1830    }
1831
1832    #[test]
1833    fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1834        let definition = RouteBuilder::from("direct:start")
1835            .route_id("test-route")
1836            .dead_letter_channel("log:dlc")
1837            .to("mock:out")
1838            .build()
1839            .expect("route should build");
1840
1841        let cfg = definition
1842            .error_handler_config()
1843            .expect("error handler should be set");
1844        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1845        assert!(cfg.policies.is_empty());
1846    }
1847
1848    #[test]
1849    fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1850        let definition = RouteBuilder::from("direct:start")
1851            .route_id("test-route")
1852            .dead_letter_channel("log:first")
1853            .on_exception(|e| matches!(e, CamelError::Io(_)))
1854            .retry(2)
1855            .end_on_exception()
1856            .dead_letter_channel("log:second")
1857            .to("mock:out")
1858            .build()
1859            .expect("route should build");
1860
1861        let cfg = definition
1862            .error_handler_config()
1863            .expect("error handler should be set");
1864        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1865        assert_eq!(cfg.policies.len(), 1);
1866        assert_eq!(
1867            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1868            Some(2)
1869        );
1870    }
1871
1872    #[test]
1873    fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
1874        let definition = RouteBuilder::from("direct:start")
1875            .route_id("test-route")
1876            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1877            .retry(1)
1878            .end_on_exception()
1879            .to("mock:out")
1880            .build()
1881            .expect("route should build");
1882
1883        let cfg = definition
1884            .error_handler_config()
1885            .expect("error handler should be set");
1886        assert!(cfg.dlc_uri.is_none());
1887        assert_eq!(cfg.policies.len(), 1);
1888    }
1889
1890    #[test]
1891    fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
1892        let first = ErrorHandlerConfig::dead_letter_channel("log:first");
1893        let second = ErrorHandlerConfig::dead_letter_channel("log:second");
1894
1895        let definition = RouteBuilder::from("direct:start")
1896            .route_id("test-route")
1897            .error_handler(first)
1898            .error_handler(second)
1899            .to("mock:out")
1900            .build()
1901            .expect("route should build");
1902
1903        let cfg = definition
1904            .error_handler_config()
1905            .expect("error handler should be set");
1906        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1907    }
1908
1909    // --- Splitter builder tests ---
1910
1911    #[test]
1912    fn test_split_builder_typestate() {
1913        use camel_api::splitter::{SplitterConfig, split_body_lines};
1914
1915        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
1916        let definition = RouteBuilder::from("timer:test?period=1000")
1917            .route_id("test-route")
1918            .split(SplitterConfig::new(split_body_lines()))
1919            .to("mock:per-fragment")
1920            .end_split()
1921            .to("mock:final")
1922            .build()
1923            .unwrap();
1924
1925        // Should have 2 top-level steps: Split + To("mock:final")
1926        assert_eq!(definition.steps().len(), 2);
1927    }
1928
1929    #[test]
1930    fn test_split_builder_steps_collected() {
1931        use camel_api::splitter::{SplitterConfig, split_body_lines};
1932
1933        let definition = RouteBuilder::from("timer:test?period=1000")
1934            .route_id("test-route")
1935            .split(SplitterConfig::new(split_body_lines()))
1936            .set_header("fragment", Value::String("yes".into()))
1937            .to("mock:per-fragment")
1938            .end_split()
1939            .build()
1940            .unwrap();
1941
1942        // Should have 1 top-level step: Split (containing 2 sub-steps)
1943        assert_eq!(definition.steps().len(), 1);
1944        match &definition.steps()[0] {
1945            BuilderStep::Split { steps, .. } => {
1946                assert_eq!(steps.len(), 2); // SetHeader + To
1947            }
1948            other => panic!("Expected Split, got {:?}", other),
1949        }
1950    }
1951
1952    #[test]
1953    fn test_split_builder_config_propagated() {
1954        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1955
1956        let definition = RouteBuilder::from("timer:test?period=1000")
1957            .route_id("test-route")
1958            .split(
1959                SplitterConfig::new(split_body_lines())
1960                    .parallel(true)
1961                    .parallel_limit(4)
1962                    .aggregation(AggregationStrategy::CollectAll),
1963            )
1964            .to("mock:per-fragment")
1965            .end_split()
1966            .build()
1967            .unwrap();
1968
1969        match &definition.steps()[0] {
1970            BuilderStep::Split { config, .. } => {
1971                assert!(config.parallel);
1972                assert_eq!(config.parallel_limit, Some(4));
1973                assert!(matches!(
1974                    config.aggregation,
1975                    AggregationStrategy::CollectAll
1976                ));
1977            }
1978            other => panic!("Expected Split, got {:?}", other),
1979        }
1980    }
1981
1982    #[test]
1983    fn test_aggregate_builder_adds_step() {
1984        use camel_api::aggregator::AggregatorConfig;
1985        use camel_core::route::BuilderStep;
1986
1987        let definition = RouteBuilder::from("timer:tick")
1988            .route_id("test-route")
1989            .aggregate(
1990                AggregatorConfig::correlate_by("key")
1991                    .complete_when_size(2)
1992                    .build(),
1993            )
1994            .build()
1995            .unwrap();
1996
1997        assert_eq!(definition.steps().len(), 1);
1998        assert!(matches!(
1999            definition.steps()[0],
2000            BuilderStep::Aggregate { .. }
2001        ));
2002    }
2003
2004    #[test]
2005    fn test_aggregate_in_split_builder() {
2006        use camel_api::aggregator::AggregatorConfig;
2007        use camel_api::splitter::{SplitterConfig, split_body_lines};
2008        use camel_core::route::BuilderStep;
2009
2010        let definition = RouteBuilder::from("timer:tick")
2011            .route_id("test-route")
2012            .split(SplitterConfig::new(split_body_lines()))
2013            .aggregate(
2014                AggregatorConfig::correlate_by("key")
2015                    .complete_when_size(1)
2016                    .build(),
2017            )
2018            .end_split()
2019            .build()
2020            .unwrap();
2021
2022        assert_eq!(definition.steps().len(), 1);
2023        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2024            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2025        } else {
2026            panic!("expected Split step");
2027        }
2028    }
2029
2030    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
2031
2032    #[test]
2033    fn test_builder_set_body_static_adds_processor() {
2034        let definition = RouteBuilder::from("timer:tick")
2035            .route_id("test-route")
2036            .set_body("fixed")
2037            .build()
2038            .unwrap();
2039        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2040    }
2041
2042    #[test]
2043    fn test_builder_set_body_fn_adds_processor() {
2044        let definition = RouteBuilder::from("timer:tick")
2045            .route_id("test-route")
2046            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2047            .build()
2048            .unwrap();
2049        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2050    }
2051
2052    #[test]
2053    fn transform_alias_produces_same_as_set_body() {
2054        let route_transform = RouteBuilder::from("timer:tick")
2055            .route_id("test-route")
2056            .transform("hello")
2057            .build()
2058            .unwrap();
2059
2060        let route_set_body = RouteBuilder::from("timer:tick")
2061            .route_id("test-route")
2062            .set_body("hello")
2063            .build()
2064            .unwrap();
2065
2066        assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2067    }
2068
2069    #[test]
2070    fn test_builder_set_header_fn_adds_processor() {
2071        let definition = RouteBuilder::from("timer:tick")
2072            .route_id("test-route")
2073            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2074            .build()
2075            .unwrap();
2076        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2077    }
2078
2079    #[tokio::test]
2080    async fn test_set_body_static_processor_works() {
2081        use camel_core::route::compose_pipeline;
2082        let def = RouteBuilder::from("t:t")
2083            .route_id("test-route")
2084            .set_body("replaced")
2085            .build()
2086            .unwrap();
2087        let pipeline = compose_pipeline(
2088            def.steps()
2089                .iter()
2090                .filter_map(|s| {
2091                    if let BuilderStep::Processor(p) = s {
2092                        Some(p.clone())
2093                    } else {
2094                        None
2095                    }
2096                })
2097                .collect(),
2098        );
2099        let exchange = Exchange::new(Message::new("original"));
2100        let result = pipeline.oneshot(exchange).await.unwrap();
2101        assert_eq!(result.input.body.as_text(), Some("replaced"));
2102    }
2103
2104    #[tokio::test]
2105    async fn test_set_body_fn_processor_works() {
2106        use camel_core::route::compose_pipeline;
2107        let def = RouteBuilder::from("t:t")
2108            .route_id("test-route")
2109            .set_body_fn(|ex: &Exchange| {
2110                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2111            })
2112            .build()
2113            .unwrap();
2114        let pipeline = compose_pipeline(
2115            def.steps()
2116                .iter()
2117                .filter_map(|s| {
2118                    if let BuilderStep::Processor(p) = s {
2119                        Some(p.clone())
2120                    } else {
2121                        None
2122                    }
2123                })
2124                .collect(),
2125        );
2126        let exchange = Exchange::new(Message::new("hello"));
2127        let result = pipeline.oneshot(exchange).await.unwrap();
2128        assert_eq!(result.input.body.as_text(), Some("HELLO"));
2129    }
2130
2131    #[tokio::test]
2132    async fn test_set_header_fn_processor_works() {
2133        use camel_core::route::compose_pipeline;
2134        let def = RouteBuilder::from("t:t")
2135            .route_id("test-route")
2136            .set_header_fn("echo", |ex: &Exchange| {
2137                ex.input
2138                    .body
2139                    .as_text()
2140                    .map(|t| Value::String(t.into()))
2141                    .unwrap_or(Value::Null)
2142            })
2143            .build()
2144            .unwrap();
2145        let pipeline = compose_pipeline(
2146            def.steps()
2147                .iter()
2148                .filter_map(|s| {
2149                    if let BuilderStep::Processor(p) = s {
2150                        Some(p.clone())
2151                    } else {
2152                        None
2153                    }
2154                })
2155                .collect(),
2156        );
2157        let exchange = Exchange::new(Message::new("ping"));
2158        let result = pipeline.oneshot(exchange).await.unwrap();
2159        assert_eq!(
2160            result.input.header("echo"),
2161            Some(&Value::String("ping".into()))
2162        );
2163    }
2164
2165    // ── FilterBuilder typestate tests ─────────────────────────────────────
2166
2167    #[test]
2168    fn test_filter_builder_typestate() {
2169        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2170            .route_id("test-route")
2171            .filter(|_ex| true)
2172            .to("mock:inner")
2173            .end_filter()
2174            .to("mock:outer")
2175            .build();
2176        assert!(result.is_ok());
2177    }
2178
2179    #[test]
2180    fn test_filter_builder_steps_collected() {
2181        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2182            .route_id("test-route")
2183            .filter(|_ex| true)
2184            .to("mock:inner")
2185            .end_filter()
2186            .build()
2187            .unwrap();
2188
2189        assert_eq!(definition.steps().len(), 1);
2190        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2191    }
2192
2193    #[test]
2194    fn test_wire_tap_builder_adds_step() {
2195        let definition = RouteBuilder::from("timer:tick")
2196            .route_id("test-route")
2197            .wire_tap("mock:tap")
2198            .to("mock:result")
2199            .build()
2200            .unwrap();
2201
2202        assert_eq!(definition.steps().len(), 2);
2203        assert!(
2204            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2205        );
2206        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2207    }
2208
2209    // ── MulticastBuilder typestate tests ─────────────────────────────────────
2210
2211    #[test]
2212    fn test_multicast_builder_typestate() {
2213        let definition = RouteBuilder::from("timer:tick")
2214            .route_id("test-route")
2215            .multicast()
2216            .to("direct:a")
2217            .to("direct:b")
2218            .end_multicast()
2219            .to("mock:result")
2220            .build()
2221            .unwrap();
2222
2223        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
2224    }
2225
2226    #[test]
2227    fn test_multicast_builder_steps_collected() {
2228        let definition = RouteBuilder::from("timer:tick")
2229            .route_id("test-route")
2230            .multicast()
2231            .to("direct:a")
2232            .to("direct:b")
2233            .end_multicast()
2234            .build()
2235            .unwrap();
2236
2237        match &definition.steps()[0] {
2238            BuilderStep::Multicast { steps, .. } => {
2239                assert_eq!(steps.len(), 2);
2240            }
2241            other => panic!("Expected Multicast, got {:?}", other),
2242        }
2243    }
2244
2245    // ── Concurrency builder tests ─────────────────────────────────────
2246
2247    #[test]
2248    fn test_builder_concurrent_sets_concurrency() {
2249        use camel_component_api::ConcurrencyModel;
2250
2251        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2252            .route_id("test-route")
2253            .concurrent(16)
2254            .to("log:info")
2255            .build()
2256            .unwrap();
2257
2258        assert_eq!(
2259            definition.concurrency_override(),
2260            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2261        );
2262    }
2263
2264    #[test]
2265    fn test_builder_concurrent_zero_means_unbounded() {
2266        use camel_component_api::ConcurrencyModel;
2267
2268        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2269            .route_id("test-route")
2270            .concurrent(0)
2271            .to("log:info")
2272            .build()
2273            .unwrap();
2274
2275        assert_eq!(
2276            definition.concurrency_override(),
2277            Some(&ConcurrencyModel::Concurrent { max: None })
2278        );
2279    }
2280
2281    #[test]
2282    fn test_builder_sequential_sets_concurrency() {
2283        use camel_component_api::ConcurrencyModel;
2284
2285        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2286            .route_id("test-route")
2287            .sequential()
2288            .to("log:info")
2289            .build()
2290            .unwrap();
2291
2292        assert_eq!(
2293            definition.concurrency_override(),
2294            Some(&ConcurrencyModel::Sequential)
2295        );
2296    }
2297
2298    #[test]
2299    fn test_builder_default_concurrency_is_none() {
2300        let definition = RouteBuilder::from("timer:tick")
2301            .route_id("test-route")
2302            .to("log:info")
2303            .build()
2304            .unwrap();
2305
2306        assert_eq!(definition.concurrency_override(), None);
2307    }
2308
2309    // ── Route lifecycle builder tests ─────────────────────────────────────
2310
2311    #[test]
2312    fn test_builder_route_id_sets_id() {
2313        let definition = RouteBuilder::from("timer:tick")
2314            .route_id("my-route")
2315            .build()
2316            .unwrap();
2317
2318        assert_eq!(definition.route_id(), "my-route");
2319    }
2320
2321    #[test]
2322    fn test_build_without_route_id_fails() {
2323        let result = RouteBuilder::from("timer:tick?period=1000")
2324            .to("log:info")
2325            .build();
2326        let err = match result {
2327            Err(e) => e.to_string(),
2328            Ok(_) => panic!("build() should fail without route_id"),
2329        };
2330        assert!(
2331            err.contains("route_id"),
2332            "error should mention route_id, got: {}",
2333            err
2334        );
2335    }
2336
2337    #[test]
2338    fn test_builder_auto_startup_false() {
2339        let definition = RouteBuilder::from("timer:tick")
2340            .route_id("test-route")
2341            .auto_startup(false)
2342            .build()
2343            .unwrap();
2344
2345        assert!(!definition.auto_startup());
2346    }
2347
2348    #[test]
2349    fn test_builder_startup_order_custom() {
2350        let definition = RouteBuilder::from("timer:tick")
2351            .route_id("test-route")
2352            .startup_order(50)
2353            .build()
2354            .unwrap();
2355
2356        assert_eq!(definition.startup_order(), 50);
2357    }
2358
2359    #[test]
2360    fn test_builder_defaults() {
2361        let definition = RouteBuilder::from("timer:tick")
2362            .route_id("test-route")
2363            .build()
2364            .unwrap();
2365
2366        assert_eq!(definition.route_id(), "test-route");
2367        assert!(definition.auto_startup());
2368        assert_eq!(definition.startup_order(), 1000);
2369    }
2370
2371    // ── Choice typestate tests ──────────────────────────────────────────────────
2372
2373    #[test]
2374    fn test_choice_builder_single_when() {
2375        let definition = RouteBuilder::from("timer:tick")
2376            .route_id("test-route")
2377            .choice()
2378            .when(|ex: &Exchange| ex.input.header("type").is_some())
2379            .to("mock:typed")
2380            .end_when()
2381            .end_choice()
2382            .build()
2383            .unwrap();
2384        assert_eq!(definition.steps().len(), 1);
2385        assert!(
2386            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2387            if whens.len() == 1 && otherwise.is_none())
2388        );
2389    }
2390
2391    #[test]
2392    fn test_choice_builder_when_otherwise() {
2393        let definition = RouteBuilder::from("timer:tick")
2394            .route_id("test-route")
2395            .choice()
2396            .when(|ex: &Exchange| ex.input.header("a").is_some())
2397            .to("mock:a")
2398            .end_when()
2399            .otherwise()
2400            .to("mock:fallback")
2401            .end_otherwise()
2402            .end_choice()
2403            .build()
2404            .unwrap();
2405        assert!(
2406            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2407            if whens.len() == 1 && otherwise.is_some())
2408        );
2409    }
2410
2411    #[test]
2412    fn test_choice_builder_multiple_whens() {
2413        let definition = RouteBuilder::from("timer:tick")
2414            .route_id("test-route")
2415            .choice()
2416            .when(|ex: &Exchange| ex.input.header("a").is_some())
2417            .to("mock:a")
2418            .end_when()
2419            .when(|ex: &Exchange| ex.input.header("b").is_some())
2420            .to("mock:b")
2421            .end_when()
2422            .end_choice()
2423            .build()
2424            .unwrap();
2425        assert!(
2426            matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2427            if whens.len() == 2)
2428        );
2429    }
2430
2431    #[test]
2432    fn test_choice_step_after_choice() {
2433        // Steps after end_choice() are added to the outer pipeline, not inside choice.
2434        let definition = RouteBuilder::from("timer:tick")
2435            .route_id("test-route")
2436            .choice()
2437            .when(|_ex: &Exchange| true)
2438            .to("mock:inner")
2439            .end_when()
2440            .end_choice()
2441            .to("mock:outer") // must be step[1], not inside choice
2442            .build()
2443            .unwrap();
2444        assert_eq!(definition.steps().len(), 2);
2445        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2446    }
2447
2448    // ── Throttle typestate tests ──────────────────────────────────────────────────
2449
2450    #[test]
2451    fn test_throttle_builder_typestate() {
2452        let definition = RouteBuilder::from("timer:tick")
2453            .route_id("test-route")
2454            .throttle(10, std::time::Duration::from_secs(1))
2455            .to("mock:result")
2456            .end_throttle()
2457            .build()
2458            .unwrap();
2459
2460        assert_eq!(definition.steps().len(), 1);
2461        assert!(matches!(
2462            &definition.steps()[0],
2463            BuilderStep::Throttle { .. }
2464        ));
2465    }
2466
2467    #[test]
2468    fn test_throttle_builder_with_strategy() {
2469        let definition = RouteBuilder::from("timer:tick")
2470            .route_id("test-route")
2471            .throttle(10, std::time::Duration::from_secs(1))
2472            .strategy(ThrottleStrategy::Reject)
2473            .to("mock:result")
2474            .end_throttle()
2475            .build()
2476            .unwrap();
2477
2478        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2479            assert_eq!(config.strategy, ThrottleStrategy::Reject);
2480        } else {
2481            panic!("Expected Throttle step");
2482        }
2483    }
2484
2485    #[test]
2486    fn test_throttle_builder_steps_collected() {
2487        let definition = RouteBuilder::from("timer:tick")
2488            .route_id("test-route")
2489            .throttle(5, std::time::Duration::from_secs(1))
2490            .set_header("throttled", Value::Bool(true))
2491            .to("mock:throttled")
2492            .end_throttle()
2493            .build()
2494            .unwrap();
2495
2496        match &definition.steps()[0] {
2497            BuilderStep::Throttle { steps, .. } => {
2498                assert_eq!(steps.len(), 2); // SetHeader + To
2499            }
2500            other => panic!("Expected Throttle, got {:?}", other),
2501        }
2502    }
2503
2504    #[test]
2505    fn test_throttle_step_after_throttle() {
2506        // Steps after end_throttle() are added to the outer pipeline, not inside throttle.
2507        let definition = RouteBuilder::from("timer:tick")
2508            .route_id("test-route")
2509            .throttle(10, std::time::Duration::from_secs(1))
2510            .to("mock:inner")
2511            .end_throttle()
2512            .to("mock:outer")
2513            .build()
2514            .unwrap();
2515
2516        assert_eq!(definition.steps().len(), 2);
2517        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2518    }
2519
2520    // ── LoadBalance typestate tests ──────────────────────────────────────────────────
2521
2522    #[test]
2523    fn test_load_balance_builder_typestate() {
2524        let definition = RouteBuilder::from("timer:tick")
2525            .route_id("test-route")
2526            .load_balance()
2527            .round_robin()
2528            .to("mock:a")
2529            .to("mock:b")
2530            .end_load_balance()
2531            .build()
2532            .unwrap();
2533
2534        assert_eq!(definition.steps().len(), 1);
2535        assert!(matches!(
2536            &definition.steps()[0],
2537            BuilderStep::LoadBalance { .. }
2538        ));
2539    }
2540
2541    #[test]
2542    fn test_load_balance_builder_with_strategy() {
2543        let definition = RouteBuilder::from("timer:tick")
2544            .route_id("test-route")
2545            .load_balance()
2546            .random()
2547            .to("mock:result")
2548            .end_load_balance()
2549            .build()
2550            .unwrap();
2551
2552        if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2553            assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2554        } else {
2555            panic!("Expected LoadBalance step");
2556        }
2557    }
2558
2559    #[test]
2560    fn test_load_balance_builder_steps_collected() {
2561        let definition = RouteBuilder::from("timer:tick")
2562            .route_id("test-route")
2563            .load_balance()
2564            .set_header("lb", Value::Bool(true))
2565            .to("mock:a")
2566            .end_load_balance()
2567            .build()
2568            .unwrap();
2569
2570        match &definition.steps()[0] {
2571            BuilderStep::LoadBalance { steps, .. } => {
2572                assert_eq!(steps.len(), 2); // SetHeader + To
2573            }
2574            other => panic!("Expected LoadBalance, got {:?}", other),
2575        }
2576    }
2577
2578    #[test]
2579    fn test_load_balance_step_after_load_balance() {
2580        // Steps after end_load_balance() are added to the outer pipeline, not inside load_balance.
2581        let definition = RouteBuilder::from("timer:tick")
2582            .route_id("test-route")
2583            .load_balance()
2584            .to("mock:inner")
2585            .end_load_balance()
2586            .to("mock:outer")
2587            .build()
2588            .unwrap();
2589
2590        assert_eq!(definition.steps().len(), 2);
2591        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2592    }
2593
2594    // ── DynamicRouter typestate tests ──────────────────────────────────────────────────
2595
2596    #[test]
2597    fn test_dynamic_router_builder() {
2598        let definition = RouteBuilder::from("timer:tick")
2599            .route_id("test-route")
2600            .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2601            .build()
2602            .unwrap();
2603
2604        assert_eq!(definition.steps().len(), 1);
2605        assert!(matches!(
2606            &definition.steps()[0],
2607            BuilderStep::DynamicRouter { .. }
2608        ));
2609    }
2610
2611    #[test]
2612    fn test_dynamic_router_builder_with_config() {
2613        let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2614            .max_iterations(100)
2615            .cache_size(500);
2616
2617        let definition = RouteBuilder::from("timer:tick")
2618            .route_id("test-route")
2619            .dynamic_router_with_config(config)
2620            .build()
2621            .unwrap();
2622
2623        assert_eq!(definition.steps().len(), 1);
2624        if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2625            assert_eq!(config.max_iterations, 100);
2626            assert_eq!(config.cache_size, 500);
2627        } else {
2628            panic!("Expected DynamicRouter step");
2629        }
2630    }
2631
2632    #[test]
2633    fn test_dynamic_router_step_after_router() {
2634        // Steps after dynamic_router() are added to the outer pipeline.
2635        let definition = RouteBuilder::from("timer:tick")
2636            .route_id("test-route")
2637            .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2638            .to("mock:outer")
2639            .build()
2640            .unwrap();
2641
2642        assert_eq!(definition.steps().len(), 2);
2643        assert!(matches!(
2644            &definition.steps()[0],
2645            BuilderStep::DynamicRouter { .. }
2646        ));
2647        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2648    }
2649
2650    #[test]
2651    fn routing_slip_builder_creates_step() {
2652        use camel_api::RoutingSlipExpression;
2653
2654        let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2655
2656        let route = RouteBuilder::from("direct:start")
2657            .route_id("routing-slip-test")
2658            .routing_slip(expression)
2659            .build()
2660            .unwrap();
2661
2662        assert!(
2663            matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2664            "Expected RoutingSlip step"
2665        );
2666    }
2667
2668    #[test]
2669    fn routing_slip_with_config_builder_creates_step() {
2670        use camel_api::RoutingSlipConfig;
2671
2672        let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2673            .uri_delimiter("|")
2674            .cache_size(50)
2675            .ignore_invalid_endpoints(true);
2676
2677        let route = RouteBuilder::from("direct:start")
2678            .route_id("routing-slip-config-test")
2679            .routing_slip_with_config(config)
2680            .build()
2681            .unwrap();
2682
2683        if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2684            assert_eq!(config.uri_delimiter, "|");
2685            assert_eq!(config.cache_size, 50);
2686            assert!(config.ignore_invalid_endpoints);
2687        } else {
2688            panic!("Expected RoutingSlip step");
2689        }
2690    }
2691
2692    #[test]
2693    fn test_builder_marshal_adds_processor_step() {
2694        let definition = RouteBuilder::from("timer:tick")
2695            .route_id("test-route")
2696            .marshal("json")
2697            .build()
2698            .unwrap();
2699        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2700    }
2701
2702    #[test]
2703    fn test_builder_unmarshal_adds_processor_step() {
2704        let definition = RouteBuilder::from("timer:tick")
2705            .route_id("test-route")
2706            .unmarshal("json")
2707            .build()
2708            .unwrap();
2709        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2710    }
2711
2712    #[test]
2713    fn test_builder_stream_cache_adds_processor_step() {
2714        let definition = RouteBuilder::from("timer:tick")
2715            .route_id("test-route")
2716            .stream_cache(1024)
2717            .build()
2718            .unwrap();
2719        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2720    }
2721
2722    #[test]
2723    fn validate_adds_to_step_with_validator_uri() {
2724        let def = RouteBuilder::from("direct:in")
2725            .route_id("test")
2726            .validate("schemas/order.xsd")
2727            .build()
2728            .unwrap();
2729        let steps = def.steps();
2730        assert_eq!(steps.len(), 1);
2731        assert!(
2732            matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2733            "got: {:?}",
2734            steps[0]
2735        );
2736    }
2737
2738    #[test]
2739    #[should_panic(expected = "unknown data format: 'csv'")]
2740    fn test_builder_marshal_panics_on_unknown_format() {
2741        let _ = RouteBuilder::from("timer:tick")
2742            .route_id("test-route")
2743            .marshal("csv")
2744            .build();
2745    }
2746
2747    #[test]
2748    #[should_panic(expected = "unknown data format: 'csv'")]
2749    fn test_builder_unmarshal_panics_on_unknown_format() {
2750        let _ = RouteBuilder::from("timer:tick")
2751            .route_id("test-route")
2752            .unmarshal("csv")
2753            .build();
2754    }
2755
2756    #[test]
2757    fn test_builder_recipient_list_creates_step() {
2758        let route = RouteBuilder::from("direct:start")
2759            .route_id("recipient-list-test")
2760            .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2761            .build()
2762            .unwrap();
2763
2764        assert!(matches!(
2765            &route.steps()[0],
2766            BuilderStep::RecipientList { .. }
2767        ));
2768    }
2769
2770    #[test]
2771    fn test_builder_recipient_list_with_config_creates_step() {
2772        let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2773
2774        let route = RouteBuilder::from("direct:start")
2775            .route_id("recipient-list-config-test")
2776            .recipient_list_with_config(config)
2777            .build()
2778            .unwrap();
2779
2780        assert!(matches!(
2781            &route.steps()[0],
2782            BuilderStep::RecipientList { .. }
2783        ));
2784    }
2785
2786    #[test]
2787    fn test_builder_script_adds_script_step() {
2788        let route = RouteBuilder::from("direct:start")
2789            .route_id("script-test")
2790            .script("rhai", "headers[\"x\"] = \"y\"")
2791            .build()
2792            .unwrap();
2793
2794        assert!(matches!(
2795            &route.steps()[0],
2796            BuilderStep::Script { language, script }
2797            if language == "rhai" && script == "headers[\"x\"] = \"y\""
2798        ));
2799    }
2800
2801    #[test]
2802    fn test_builder_delay_and_delay_with_header_add_steps() {
2803        let route = RouteBuilder::from("direct:start")
2804            .route_id("delay-test")
2805            .delay(Duration::from_millis(250))
2806            .delay_with_header(Duration::from_millis(500), "x-delay")
2807            .build()
2808            .unwrap();
2809
2810        assert_eq!(route.steps().len(), 2);
2811        assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
2812        assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
2813    }
2814
2815    #[test]
2816    fn test_builder_log_and_stop_add_steps_in_order() {
2817        let route = RouteBuilder::from("direct:start")
2818            .route_id("log-stop-test")
2819            .log("hello", LogLevel::Info)
2820            .stop()
2821            .to("mock:after")
2822            .build()
2823            .unwrap();
2824
2825        assert_eq!(route.steps().len(), 3);
2826        assert!(matches!(
2827            &route.steps()[0],
2828            BuilderStep::Log { message, .. } if message == "hello"
2829        ));
2830        assert!(matches!(&route.steps()[1], BuilderStep::Stop));
2831        assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
2832    }
2833
2834    #[test]
2835    fn test_builder_stream_cache_default_adds_processor_step() {
2836        let route = RouteBuilder::from("direct:start")
2837            .route_id("stream-cache-default-test")
2838            .stream_cache_default()
2839            .build()
2840            .unwrap();
2841
2842        assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
2843    }
2844
2845    #[test]
2846    fn test_validate_preserves_existing_validator_prefix() {
2847        let route = RouteBuilder::from("direct:in")
2848            .route_id("validate-prefix-test")
2849            .validate("validator:schemas/order.xsd")
2850            .build()
2851            .unwrap();
2852
2853        assert!(matches!(
2854            &route.steps()[0],
2855            BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
2856        ));
2857    }
2858
2859    #[test]
2860    fn test_load_balance_builder_weighted_failover_parallel_config() {
2861        let route = RouteBuilder::from("direct:start")
2862            .route_id("lb-weighted-failover-parallel")
2863            .load_balance()
2864            .weighted(vec![
2865                ("direct:a".to_string(), 3),
2866                ("direct:b".to_string(), 1),
2867            ])
2868            .failover()
2869            .parallel(true)
2870            .to("mock:result")
2871            .end_load_balance()
2872            .build()
2873            .unwrap();
2874
2875        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
2876            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
2877            assert!(config.parallel);
2878        } else {
2879            panic!("Expected LoadBalance step");
2880        }
2881    }
2882
2883    #[test]
2884    fn test_multicast_builder_all_config_setters() {
2885        let route = RouteBuilder::from("direct:start")
2886            .route_id("multicast-config-test")
2887            .multicast()
2888            .parallel(true)
2889            .parallel_limit(4)
2890            .stop_on_exception(true)
2891            .timeout(Duration::from_millis(300))
2892            .aggregation(MulticastStrategy::Original)
2893            .to("mock:a")
2894            .end_multicast()
2895            .build()
2896            .unwrap();
2897
2898        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
2899            assert!(config.parallel);
2900            assert_eq!(config.parallel_limit, Some(4));
2901            assert!(config.stop_on_exception);
2902            assert_eq!(config.timeout, Some(Duration::from_millis(300)));
2903            assert!(matches!(config.aggregation, MulticastStrategy::Original));
2904        } else {
2905            panic!("Expected Multicast step");
2906        }
2907    }
2908
2909    #[test]
2910    fn test_build_canonical_rejects_unsupported_processor_step() {
2911        let err = RouteBuilder::from("direct:start")
2912            .route_id("canonical-reject")
2913            .set_header("k", Value::String("v".into()))
2914            .build_canonical()
2915            .unwrap_err();
2916
2917        assert!(format!("{err}").contains("does not support step `processor`"));
2918    }
2919}