Skip to main content

camel_builder/
lib.rs

1use camel_api::DelayConfig;
2use camel_api::aggregator::{
3    AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
4};
5use camel_api::body::Body;
6use camel_api::body_converter::BodyType;
7use camel_api::circuit_breaker::CircuitBreakerConfig;
8use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
9use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
10use camel_api::load_balancer::LoadBalancerConfig;
11use camel_api::loop_eip::{LoopConfig, LoopMode};
12use camel_api::multicast::{MulticastConfig, MulticastStrategy};
13use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
14use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
15use camel_api::splitter::SplitterConfig;
16use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
17use camel_api::{
18    BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
19    ProcessorFn, Value,
20    runtime::{
21        CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
22        CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
23        CanonicalWhenSpec,
24    },
25};
26use camel_component_api::ConcurrencyModel;
27use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
28use camel_processor::{
29    ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
30    StreamCacheService, UnmarshalService, builtin_data_format,
31};
32
33/// Shared step-accumulation methods for all builder types.
34///
35/// Implementors provide `steps_mut()` and get step-adding methods for free.
36/// `filter()` and other branching methods are NOT included — they return
37/// different types per builder and stay as per-builder methods.
38pub trait StepAccumulator: Sized {
39    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
40
41    fn to(mut self, endpoint: impl Into<String>) -> Self {
42        self.steps_mut().push(BuilderStep::To(endpoint.into()));
43        self
44    }
45
46    fn process<F, Fut>(mut self, f: F) -> Self
47    where
48        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
49        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
50    {
51        let svc = ProcessorFn::new(f);
52        self.steps_mut()
53            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
54        self
55    }
56
57    fn process_fn(mut self, processor: BoxProcessor) -> Self {
58        self.steps_mut().push(BuilderStep::Processor(processor));
59        self
60    }
61
62    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
63        let svc = SetHeader::new(IdentityProcessor, key, value);
64        self.steps_mut()
65            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
66        self
67    }
68
69    fn map_body<F>(mut self, mapper: F) -> Self
70    where
71        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
72    {
73        let svc = MapBody::new(IdentityProcessor, mapper);
74        self.steps_mut()
75            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
76        self
77    }
78
79    fn set_body<B>(mut self, body: B) -> Self
80    where
81        B: Into<Body> + Clone + Send + Sync + 'static,
82    {
83        let body: Body = body.into();
84        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
85        self.steps_mut()
86            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
87        self
88    }
89
90    /// Apache Camel-compatible alias for [`set_body`](Self::set_body).
91    ///
92    /// Transforms the message body using the given value. Semantically identical
93    /// to `set_body` — provided for familiarity with Apache Camel route DSLs.
94    fn transform<B>(self, body: B) -> Self
95    where
96        B: Into<Body> + Clone + Send + Sync + 'static,
97    {
98        self.set_body(body)
99    }
100
101    fn set_body_fn<F>(mut self, expr: F) -> Self
102    where
103        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
104    {
105        let svc = SetBody::new(IdentityProcessor, expr);
106        self.steps_mut()
107            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
108        self
109    }
110
111    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
112    where
113        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
114    {
115        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
116        self.steps_mut()
117            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
118        self
119    }
120
121    fn aggregate(mut self, config: AggregatorConfig) -> Self {
122        self.steps_mut().push(BuilderStep::Aggregate { config });
123        self
124    }
125
126    /// Stop processing this exchange immediately. No further steps in the
127    /// current pipeline will run.
128    ///
129    /// Can be used at any point in the route: directly on RouteBuilder,
130    /// inside `.filter()`, inside `.split()`, etc.
131    fn stop(mut self) -> Self {
132        self.steps_mut().push(BuilderStep::Stop);
133        self
134    }
135
136    fn delay(mut self, duration: std::time::Duration) -> Self {
137        self.steps_mut().push(BuilderStep::Delay {
138            config: DelayConfig::from_duration(duration),
139        });
140        self
141    }
142
143    fn delay_with_header(
144        mut self,
145        duration: std::time::Duration,
146        header: impl Into<String>,
147    ) -> Self {
148        self.steps_mut().push(BuilderStep::Delay {
149            config: DelayConfig::from_duration_with_header(duration, header),
150        });
151        self
152    }
153
154    /// Log a message at the specified level.
155    ///
156    /// The message will be logged when an exchange passes through this step.
157    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
158        self.steps_mut().push(BuilderStep::Log {
159            level,
160            message: message.into(),
161        });
162        self
163    }
164
165    /// Convert the message body to the target type.
166    ///
167    /// Supported: Text ↔ Json ↔ Bytes. `Body::Stream` always fails.
168    /// Returns `TypeConversionFailed` if conversion is not possible.
169    ///
170    /// # Example
171    /// ```ignore
172    /// route.set_body(Value::String(r#"{"x":1}"#.into()))
173    ///      .convert_body_to(BodyType::Json)
174    ///      .to("direct:next")
175    /// ```
176    fn convert_body_to(mut self, target: BodyType) -> Self {
177        let svc = ConvertBodyTo::new(IdentityProcessor, target);
178        self.steps_mut()
179            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
180        self
181    }
182
183    fn stream_cache(mut self, threshold: usize) -> Self {
184        let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
185        let svc = StreamCacheService::new(IdentityProcessor, config);
186        self.steps_mut()
187            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
188        self
189    }
190
191    /// Materialize `Body::Stream` into `Body::Bytes` using the default threshold (128 KB).
192    ///
193    /// Equivalent to `.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)`.
194    fn stream_cache_default(self) -> Self {
195        self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
196    }
197
198    /// Marshal the message body using the specified data format.
199    ///
200    /// Supported formats: `"json"`, `"xml"`. Panics if the format name is unknown.
201    /// Converts a structured body (e.g., `Body::Json`) to a wire-format body (e.g., `Body::Text`).
202    ///
203    /// # Example
204    /// ```ignore
205    /// route.marshal("json").to("direct:next")
206    /// ```
207    fn marshal(mut self, format: impl Into<String>) -> Self {
208        let name = format.into();
209        let df =
210            builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
211        let svc = MarshalService::new(IdentityProcessor, df);
212        self.steps_mut()
213            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
214        self
215    }
216
217    /// Unmarshal the message body using the specified data format.
218    ///
219    /// Supported formats: `"json"`, `"xml"`. Panics if the format name is unknown.
220    /// Converts a wire-format body (e.g., `Body::Text`) to a structured body (e.g., `Body::Json`).
221    ///
222    /// # Example
223    /// ```ignore
224    /// route.unmarshal("json").to("direct:next")
225    /// ```
226    fn unmarshal(mut self, format: impl Into<String>) -> Self {
227        let name = format.into();
228        let df =
229            builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
230        let svc = UnmarshalService::new(IdentityProcessor, df);
231        self.steps_mut()
232            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
233        self
234    }
235
236    /// Validate the exchange body against a schema file.
237    ///
238    /// Shorthand for `to("validator:path")`. Supports XSD, JSON Schema, and YAML.
239    ///
240    /// # Example
241    /// ```ignore
242    /// route.validate("schemas/order.xsd").to("direct:out")
243    /// ```
244    fn validate(mut self, schema_path: impl Into<String>) -> Self {
245        let path = schema_path.into();
246        let uri = if path.starts_with("validator:") {
247            path
248        } else {
249            format!("validator:{path}")
250        };
251        self.steps_mut().push(BuilderStep::To(uri));
252        self
253    }
254
255    /// Execute a script that can modify the exchange (headers, properties, body).
256    ///
257    /// The script has access to `headers`, `properties`, and `body` variables
258    /// and can modify them with assignment syntax: `headers["k"] = v`.
259    ///
260    /// # Example
261    /// ```ignore
262    /// // ignore: requires full CamelContext setup with registered language
263    /// route.script("rhai", r#"headers["tenant"] = "acme"; body = body + "_processed""#)
264    /// ```
265    fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
266        self.steps_mut().push(BuilderStep::Script {
267            language: language.into(),
268            script: script.into(),
269        });
270        self
271    }
272
273    fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
274        self.steps_mut().push(BuilderStep::Bean {
275            name: name.into(),
276            method: method.into(),
277        });
278        self
279    }
280}
281
282/// A fluent builder for constructing routes.
283///
284/// # Example
285///
286/// ```ignore
287/// let definition = RouteBuilder::from("timer:tick?period=1000")
288///     .set_header("source", Value::String("timer".into()))
289///     .filter(|ex| ex.input.body.as_text().is_some())
290///     .to("log:info?showHeaders=true")
291///     .build()?;
292/// ```
293pub struct RouteBuilder {
294    from_uri: String,
295    steps: Vec<BuilderStep>,
296    error_handler: Option<ErrorHandlerConfig>,
297    error_handler_mode: ErrorHandlerMode,
298    circuit_breaker_config: Option<CircuitBreakerConfig>,
299    concurrency: Option<ConcurrencyModel>,
300    route_id: Option<String>,
301    auto_startup: Option<bool>,
302    startup_order: Option<i32>,
303}
304
305#[derive(Default)]
306enum ErrorHandlerMode {
307    #[default]
308    None,
309    ExplicitConfig,
310    Shorthand {
311        dlc_uri: Option<String>,
312        specs: Vec<OnExceptionSpec>,
313    },
314    Mixed,
315}
316
317#[derive(Clone)]
318struct OnExceptionSpec {
319    matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
320    retry: Option<RedeliveryPolicy>,
321    handled_by: Option<String>,
322}
323
324impl RouteBuilder {
325    /// Start building a route from the given source endpoint URI.
326    pub fn from(endpoint: &str) -> Self {
327        Self {
328            from_uri: endpoint.to_string(),
329            steps: Vec::new(),
330            error_handler: None,
331            error_handler_mode: ErrorHandlerMode::None,
332            circuit_breaker_config: None,
333            concurrency: None,
334            route_id: None,
335            auto_startup: None,
336            startup_order: None,
337        }
338    }
339
340    /// Open a filter scope. Only exchanges matching `predicate` will be processed
341    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
342    /// and continue to steps after `.end_filter()`.
343    pub fn filter<F>(self, predicate: F) -> FilterBuilder
344    where
345        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
346    {
347        FilterBuilder {
348            parent: self,
349            predicate: std::sync::Arc::new(predicate),
350            steps: vec![],
351        }
352    }
353
354    /// Open a choice scope for content-based routing.
355    ///
356    /// Within the choice, you can define multiple `.when()` clauses and an
357    /// optional `.otherwise()` clause. The first matching `when` predicate
358    /// determines which sub-pipeline executes.
359    pub fn choice(self) -> ChoiceBuilder {
360        ChoiceBuilder {
361            parent: self,
362            whens: vec![],
363            _otherwise: None,
364        }
365    }
366
367    /// Add a WireTap step that sends a clone of the exchange to the given
368    /// endpoint URI (fire-and-forget). The original exchange continues
369    /// downstream unchanged.
370    pub fn wire_tap(mut self, endpoint: &str) -> Self {
371        self.steps.push(BuilderStep::WireTap {
372            uri: endpoint.to_string(),
373        });
374        self
375    }
376
377    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
378    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
379        self.error_handler_mode = match self.error_handler_mode {
380            ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
381                ErrorHandlerMode::ExplicitConfig
382            }
383            ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
384        };
385        self.error_handler = Some(config);
386        self
387    }
388
389    /// Set a dead letter channel URI for shorthand error handler mode.
390    pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
391        let uri = uri.into();
392        self.error_handler_mode = match self.error_handler_mode {
393            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
394                dlc_uri: Some(uri),
395                specs: Vec::new(),
396            },
397            ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
398                dlc_uri: Some(uri),
399                specs,
400            },
401            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
402        };
403        self
404    }
405
406    /// Add a shorthand exception policy scope. Call `.end_on_exception()` to return to route builder.
407    pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
408    where
409        F: Fn(&CamelError) -> bool + Send + Sync + 'static,
410    {
411        self.error_handler_mode = match self.error_handler_mode {
412            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
413                dlc_uri: None,
414                specs: Vec::new(),
415            },
416            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
417            shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
418        };
419
420        OnExceptionBuilder {
421            parent: self,
422            policy: OnExceptionSpec {
423                matches: std::sync::Arc::new(matches),
424                retry: None,
425                handled_by: None,
426            },
427        }
428    }
429
430    /// Set a circuit breaker for this route.
431    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
432        self.circuit_breaker_config = Some(config);
433        self
434    }
435
436    /// Override the consumer's default concurrency model.
437    ///
438    /// When set, the pipeline spawns a task per exchange, processing them
439    /// concurrently. `max` limits the number of simultaneously active
440    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
441    ///
442    /// # Example
443    /// ```ignore
444    /// RouteBuilder::from("http://0.0.0.0:8080/api")
445    ///     .concurrent(16)  // max 16 in-flight pipeline executions
446    ///     .process(handle_request)
447    ///     .build()
448    /// ```
449    pub fn concurrent(mut self, max: usize) -> Self {
450        let max = if max == 0 { None } else { Some(max) };
451        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
452        self
453    }
454
455    /// Force sequential processing, overriding a concurrent-capable consumer.
456    ///
457    /// Useful for HTTP routes that mutate shared state and need ordering
458    /// guarantees.
459    pub fn sequential(mut self) -> Self {
460        self.concurrency = Some(ConcurrencyModel::Sequential);
461        self
462    }
463
464    /// Set the route ID for this route.
465    ///
466    /// If not set, the route will be assigned an auto-generated ID.
467    pub fn route_id(mut self, id: impl Into<String>) -> Self {
468        self.route_id = Some(id.into());
469        self
470    }
471
472    /// Set whether this route should automatically start when the context starts.
473    ///
474    /// Default is `true`.
475    pub fn auto_startup(mut self, auto: bool) -> Self {
476        self.auto_startup = Some(auto);
477        self
478    }
479
480    /// Set the startup order for this route.
481    ///
482    /// Routes with lower values start first. Default is 1000.
483    pub fn startup_order(mut self, order: i32) -> Self {
484        self.startup_order = Some(order);
485        self
486    }
487
488    /// Begin a Splitter sub-pipeline. Steps added after this call (until
489    /// `.end_split()`) will be executed per-fragment.
490    ///
491    /// Returns a `SplitBuilder` — you cannot call `.build()` until
492    /// `.end_split()` closes the split scope (enforced by the type system).
493    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
494        SplitBuilder {
495            parent: self,
496            config,
497            steps: Vec::new(),
498        }
499    }
500
501    /// Begin a Multicast sub-pipeline. Steps added after this call (until
502    /// `.end_multicast()`) will each receive a copy of the exchange.
503    ///
504    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
505    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
506    pub fn multicast(self) -> MulticastBuilder {
507        MulticastBuilder {
508            parent: self,
509            steps: Vec::new(),
510            config: MulticastConfig::new(),
511        }
512    }
513
514    /// Begin a Throttle sub-pipeline. Rate limits message processing to at most
515    /// `max_requests` per `period`. Steps inside the throttle scope are only
516    /// executed when the rate limit allows.
517    ///
518    /// Returns a `ThrottleBuilder` — you cannot call `.build()` until
519    /// `.end_throttle()` closes the throttle scope (enforced by the type system).
520    pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
521        ThrottleBuilder {
522            parent: self,
523            config: ThrottlerConfig::new(max_requests, period),
524            steps: Vec::new(),
525        }
526    }
527
528    /// Begin a Loop sub-pipeline that iterates a fixed number of times.
529    pub fn loop_count(self, count: usize) -> LoopBuilder {
530        LoopBuilder {
531            parent: self,
532            config: LoopConfig {
533                mode: LoopMode::Count(count),
534            },
535            steps: vec![],
536        }
537    }
538
539    /// Begin a Loop sub-pipeline that iterates while a predicate is true.
540    pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
541    where
542        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
543    {
544        LoopBuilder {
545            parent: self,
546            config: LoopConfig {
547                mode: LoopMode::While(std::sync::Arc::new(predicate)),
548            },
549            steps: vec![],
550        }
551    }
552
553    /// Begin a LoadBalance sub-pipeline. Distributes exchanges across multiple
554    /// endpoints using a configurable strategy (round-robin, random, weighted, failover).
555    ///
556    /// Returns a `LoadBalancerBuilder` — you cannot call `.build()` until
557    /// `.end_load_balance()` closes the load balance scope (enforced by the type system).
558    pub fn load_balance(self) -> LoadBalancerBuilder {
559        LoadBalancerBuilder {
560            parent: self,
561            config: LoadBalancerConfig::round_robin(),
562            steps: Vec::new(),
563        }
564    }
565
566    /// Add a dynamic router step that routes exchanges dynamically based on
567    /// expression evaluation at runtime.
568    ///
569    /// The expression receives the exchange and returns `Some(uri)` to route to
570    /// the next endpoint, or `None` to stop routing.
571    ///
572    /// # Example
573    /// ```ignore
574    /// RouteBuilder::from("timer:tick")
575    ///     .route_id("test-route")
576    ///     .dynamic_router(|ex| {
577    ///         ex.input.header("dest").and_then(|v| v.as_str().map(|s| s.to_string()))
578    ///     })
579    ///     .build()
580    /// ```
581    pub fn dynamic_router(self, expression: RouterExpression) -> Self {
582        self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
583    }
584
585    /// Add a dynamic router step with full configuration.
586    ///
587    /// Allows customization of URI delimiter, cache size, timeout, and other options.
588    pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
589        self.steps.push(BuilderStep::DynamicRouter { config });
590        self
591    }
592
593    pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
594        self.routing_slip_with_config(RoutingSlipConfig::new(expression))
595    }
596
597    pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
598        self.steps.push(BuilderStep::RoutingSlip { config });
599        self
600    }
601
602    pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
603        self.recipient_list_with_config(RecipientListConfig::new(expression))
604    }
605
606    pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
607        self.steps.push(BuilderStep::RecipientList { config });
608        self
609    }
610
611    /// Consume the builder and produce a [`RouteDefinition`].
612    pub fn build(self) -> Result<RouteDefinition, CamelError> {
613        if self.from_uri.is_empty() {
614            return Err(CamelError::RouteError(
615                "route must have a 'from' URI".to_string(),
616            ));
617        }
618        let route_id = self.route_id.ok_or_else(|| {
619            CamelError::RouteError(
620                "route must have a 'route_id' — call .route_id(\"name\") on the builder"
621                    .to_string(),
622            )
623        })?;
624        let resolved_error_handler = match self.error_handler_mode {
625            ErrorHandlerMode::None => self.error_handler,
626            ErrorHandlerMode::ExplicitConfig => self.error_handler,
627            ErrorHandlerMode::Mixed => {
628                return Err(CamelError::RouteError(
629                    "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
630                ));
631            }
632            ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
633                let mut config = if let Some(uri) = dlc_uri {
634                    ErrorHandlerConfig::dead_letter_channel(uri)
635                } else {
636                    ErrorHandlerConfig::log_only()
637                };
638
639                for spec in specs {
640                    let matcher = spec.matches.clone();
641                    let mut builder = config.on_exception(move |e| matcher(e));
642
643                    if let Some(retry) = spec.retry {
644                        builder = builder.retry(retry.max_attempts).with_backoff(
645                            retry.initial_delay,
646                            retry.multiplier,
647                            retry.max_delay,
648                        );
649                        if retry.jitter_factor > 0.0 {
650                            builder = builder.with_jitter(retry.jitter_factor);
651                        }
652                    }
653
654                    if let Some(uri) = spec.handled_by {
655                        builder = builder.handled_by(uri);
656                    }
657
658                    config = builder.build();
659                }
660
661                Some(config)
662            }
663        };
664
665        let definition = RouteDefinition::new(self.from_uri, self.steps);
666        let definition = if let Some(eh) = resolved_error_handler {
667            definition.with_error_handler(eh)
668        } else {
669            definition
670        };
671        let definition = if let Some(cb) = self.circuit_breaker_config {
672            definition.with_circuit_breaker(cb)
673        } else {
674            definition
675        };
676        let definition = if let Some(concurrency) = self.concurrency {
677            definition.with_concurrency(concurrency)
678        } else {
679            definition
680        };
681        let definition = definition.with_route_id(route_id);
682        let definition = if let Some(auto) = self.auto_startup {
683            definition.with_auto_startup(auto)
684        } else {
685            definition
686        };
687        let definition = if let Some(order) = self.startup_order {
688            definition.with_startup_order(order)
689        } else {
690            definition
691        };
692        Ok(definition)
693    }
694
695    /// Compile this builder route into canonical v1 spec.
696    pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
697        if self.from_uri.is_empty() {
698            return Err(CamelError::RouteError(
699                "route must have a 'from' URI".to_string(),
700            ));
701        }
702        let route_id = self.route_id.ok_or_else(|| {
703            CamelError::RouteError(
704                "route must have a 'route_id' — call .route_id(\"name\") on the builder"
705                    .to_string(),
706            )
707        })?;
708
709        let steps = canonicalize_steps(self.steps)?;
710        let circuit_breaker = self
711            .circuit_breaker_config
712            .map(canonicalize_circuit_breaker);
713
714        let spec = CanonicalRouteSpec {
715            route_id,
716            from: self.from_uri,
717            steps,
718            circuit_breaker,
719            version: camel_api::CANONICAL_CONTRACT_VERSION,
720        };
721        spec.validate_contract()?;
722        Ok(spec)
723    }
724}
725
726pub struct OnExceptionBuilder {
727    parent: RouteBuilder,
728    policy: OnExceptionSpec,
729}
730
731impl OnExceptionBuilder {
732    pub fn retry(mut self, max_attempts: u32) -> Self {
733        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
734        self
735    }
736
737    pub fn with_backoff(
738        mut self,
739        initial: std::time::Duration,
740        multiplier: f64,
741        max: std::time::Duration,
742    ) -> Self {
743        if let Some(ref mut retry) = self.policy.retry {
744            retry.initial_delay = initial;
745            retry.multiplier = multiplier;
746            retry.max_delay = max;
747        }
748        self
749    }
750
751    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
752        if let Some(ref mut retry) = self.policy.retry {
753            retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
754        }
755        self
756    }
757
758    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
759        self.policy.handled_by = Some(uri.into());
760        self
761    }
762
763    pub fn end_on_exception(mut self) -> RouteBuilder {
764        if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
765            specs.push(self.policy);
766        }
767        self.parent
768    }
769}
770
771fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
772    let mut canonical = Vec::with_capacity(steps.len());
773    for step in steps {
774        canonical.push(canonicalize_step(step)?);
775    }
776    Ok(canonical)
777}
778
779fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
780    match step {
781        BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
782        BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
783        BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
784        BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
785        BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
786            delay_ms: config.delay_ms,
787            dynamic_header: config.dynamic_header,
788        }),
789        BuilderStep::DeclarativeScript { expression } => {
790            Ok(CanonicalStepSpec::Script { expression })
791        }
792        BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
793            predicate,
794            steps: canonicalize_steps(steps)?,
795        }),
796        BuilderStep::DeclarativeChoice { whens, otherwise } => {
797            let mut canonical_whens = Vec::with_capacity(whens.len());
798            for DeclarativeWhenStep { predicate, steps } in whens {
799                canonical_whens.push(CanonicalWhenSpec {
800                    predicate,
801                    steps: canonicalize_steps(steps)?,
802                });
803            }
804            let otherwise = match otherwise {
805                Some(steps) => Some(canonicalize_steps(steps)?),
806                None => None,
807            };
808            Ok(CanonicalStepSpec::Choice {
809                whens: canonical_whens,
810                otherwise,
811            })
812        }
813        BuilderStep::DeclarativeSplit {
814            expression,
815            aggregation,
816            parallel,
817            parallel_limit,
818            stop_on_exception,
819            steps,
820        } => Ok(CanonicalStepSpec::Split {
821            expression: CanonicalSplitExpressionSpec::Language(expression),
822            aggregation: canonicalize_split_aggregation(aggregation)?,
823            parallel,
824            parallel_limit,
825            stop_on_exception,
826            steps: canonicalize_steps(steps)?,
827        }),
828        BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate(
829            canonicalize_aggregate(config)?,
830        )),
831        other => {
832            let step_name = canonical_step_name(&other);
833            let detail = camel_api::canonical_contract_rejection_reason(step_name)
834                .unwrap_or("not included in canonical v1");
835            Err(CamelError::RouteError(format!(
836                "canonical v1 does not support step `{step_name}`: {detail}"
837            )))
838        }
839    }
840}
841
842fn canonicalize_split_aggregation(
843    strategy: camel_api::splitter::AggregationStrategy,
844) -> Result<CanonicalSplitAggregationSpec, CamelError> {
845    match strategy {
846        camel_api::splitter::AggregationStrategy::LastWins => {
847            Ok(CanonicalSplitAggregationSpec::LastWins)
848        }
849        camel_api::splitter::AggregationStrategy::CollectAll => {
850            Ok(CanonicalSplitAggregationSpec::CollectAll)
851        }
852        camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
853            "canonical v1 does not support custom split aggregation".to_string(),
854        )),
855        camel_api::splitter::AggregationStrategy::Original => {
856            Ok(CanonicalSplitAggregationSpec::Original)
857        }
858    }
859}
860
861fn extract_completion_fields(
862    mode: &CompletionMode,
863) -> Result<(Option<usize>, Option<u64>), CamelError> {
864    match mode {
865        CompletionMode::Single(cond) => match cond {
866            CompletionCondition::Size(n) => Ok((Some(*n), None)),
867            CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
868            CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
869                "canonical v1 does not support aggregate predicate completion".to_string(),
870            )),
871        },
872        CompletionMode::Any(conds) => {
873            let mut size = None;
874            let mut timeout_ms = None;
875            for cond in conds {
876                match cond {
877                    CompletionCondition::Size(n) => size = Some(*n),
878                    CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
879                    CompletionCondition::Predicate(_) => {
880                        return Err(CamelError::RouteError(
881                            "canonical v1 does not support aggregate predicate completion"
882                                .to_string(),
883                        ));
884                    }
885                }
886            }
887            Ok((size, timeout_ms))
888        }
889    }
890}
891
892fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
893    let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
894
895    let header = match &config.correlation {
896        CorrelationStrategy::HeaderName(h) => h.clone(),
897        CorrelationStrategy::Expression { expr, .. } => expr.clone(),
898        CorrelationStrategy::Fn(_) => {
899            return Err(CamelError::RouteError(
900                "canonical v1 does not support Fn correlation strategy".to_string(),
901            ));
902        }
903    };
904
905    let correlation_key = match &config.correlation {
906        CorrelationStrategy::HeaderName(_) => None,
907        CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
908        CorrelationStrategy::Fn(_) => unreachable!(),
909    };
910
911    let strategy = match config.strategy {
912        AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
913        AggregationStrategy::Custom(_) => {
914            return Err(CamelError::RouteError(
915                "canonical v1 does not support custom aggregate strategy".to_string(),
916            ));
917        }
918    };
919    let bucket_ttl_ms = config
920        .bucket_ttl
921        .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
922
923    Ok(CanonicalAggregateSpec {
924        header,
925        completion_size,
926        completion_timeout_ms,
927        correlation_key,
928        force_completion_on_stop: if config.force_completion_on_stop {
929            Some(true)
930        } else {
931            None
932        },
933        discard_on_timeout: if config.discard_on_timeout {
934            Some(true)
935        } else {
936            None
937        },
938        strategy,
939        max_buckets: config.max_buckets,
940        bucket_ttl_ms,
941    })
942}
943
944fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
945    CanonicalCircuitBreakerSpec {
946        failure_threshold: config.failure_threshold,
947        open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
948    }
949}
950
951fn canonical_step_name(step: &BuilderStep) -> &'static str {
952    match step {
953        BuilderStep::Processor(_) => "processor",
954        BuilderStep::To(_) => "to",
955        BuilderStep::Stop => "stop",
956        BuilderStep::Log { .. } => "log",
957        BuilderStep::DeclarativeSetHeader { .. } => "set_header",
958        BuilderStep::DeclarativeSetBody { .. } => "set_body",
959        BuilderStep::DeclarativeFilter { .. } => "filter",
960        BuilderStep::DeclarativeChoice { .. } => "choice",
961        BuilderStep::DeclarativeScript { .. } => "script",
962        BuilderStep::DeclarativeFunction { .. } => "function",
963        BuilderStep::DeclarativeSplit { .. } => "split",
964        BuilderStep::Split { .. } => "split",
965        BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
966        BuilderStep::Aggregate { .. } => "aggregate",
967        BuilderStep::Filter { .. } => "filter",
968        BuilderStep::Choice { .. } => "choice",
969        BuilderStep::WireTap { .. } => "wire_tap",
970        BuilderStep::Delay { .. } => "delay",
971        BuilderStep::Multicast { .. } => "multicast",
972        BuilderStep::DeclarativeLog { .. } => "log",
973        BuilderStep::Bean { .. } => "bean",
974        BuilderStep::Script { .. } => "script",
975        BuilderStep::Throttle { .. } => "throttle",
976        BuilderStep::LoadBalance { .. } => "load_balancer",
977        BuilderStep::DynamicRouter { .. } => "dynamic_router",
978        BuilderStep::RoutingSlip { .. } => "routing_slip",
979        BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
980        BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
981        BuilderStep::RecipientList { .. } => "recipient_list",
982        BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
983    }
984}
985
986impl StepAccumulator for RouteBuilder {
987    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
988        &mut self.steps
989    }
990}
991
992/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
993///
994/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
995/// but NOT `.build()` and NOT `.split()` (no nested splits).
996///
997/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
998/// and returns the parent `RouteBuilder`.
999pub struct SplitBuilder {
1000    parent: RouteBuilder,
1001    config: SplitterConfig,
1002    steps: Vec<BuilderStep>,
1003}
1004
1005impl SplitBuilder {
1006    /// Open a filter scope within the split sub-pipeline.
1007    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1008    where
1009        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1010    {
1011        FilterInSplitBuilder {
1012            parent: self,
1013            predicate: std::sync::Arc::new(predicate),
1014            steps: vec![],
1015        }
1016    }
1017
1018    /// Close the split scope. Packages the accumulated sub-steps into a
1019    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
1020    pub fn end_split(mut self) -> RouteBuilder {
1021        let split_step = BuilderStep::Split {
1022            config: self.config,
1023            steps: self.steps,
1024        };
1025        self.parent.steps.push(split_step);
1026        self.parent
1027    }
1028}
1029
1030impl StepAccumulator for SplitBuilder {
1031    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1032        &mut self.steps
1033    }
1034}
1035
1036/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
1037pub struct FilterBuilder {
1038    parent: RouteBuilder,
1039    predicate: FilterPredicate,
1040    steps: Vec<BuilderStep>,
1041}
1042
1043impl FilterBuilder {
1044    /// Close the filter scope. Packages the accumulated sub-steps into a
1045    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
1046    pub fn end_filter(mut self) -> RouteBuilder {
1047        let step = BuilderStep::Filter {
1048            predicate: self.predicate,
1049            steps: self.steps,
1050        };
1051        self.parent.steps.push(step);
1052        self.parent
1053    }
1054}
1055
1056impl StepAccumulator for FilterBuilder {
1057    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1058        &mut self.steps
1059    }
1060}
1061
1062/// Builder for a filter scope nested inside a `.split()` block.
1063pub struct FilterInSplitBuilder {
1064    parent: SplitBuilder,
1065    predicate: FilterPredicate,
1066    steps: Vec<BuilderStep>,
1067}
1068
1069impl FilterInSplitBuilder {
1070    /// Close the filter scope and return the parent `SplitBuilder`.
1071    pub fn end_filter(mut self) -> SplitBuilder {
1072        let step = BuilderStep::Filter {
1073            predicate: self.predicate,
1074            steps: self.steps,
1075        };
1076        self.parent.steps.push(step);
1077        self.parent
1078    }
1079}
1080
1081impl StepAccumulator for FilterInSplitBuilder {
1082    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1083        &mut self.steps
1084    }
1085}
1086
1087// ── Choice/When/Otherwise builders ─────────────────────────────────────────
1088
1089/// Builder for a `.choice()` ... `.end_choice()` block.
1090///
1091/// Accumulates `when` clauses and an optional `otherwise` clause.
1092/// Cannot call `.build()` until `.end_choice()` is called.
1093pub struct ChoiceBuilder {
1094    parent: RouteBuilder,
1095    whens: Vec<WhenStep>,
1096    _otherwise: Option<Vec<BuilderStep>>,
1097}
1098
1099impl ChoiceBuilder {
1100    /// Open a `when` clause. Only exchanges matching `predicate` will be
1101    /// processed by the steps inside the `.when()` ... `.end_when()` scope.
1102    pub fn when<F>(self, predicate: F) -> WhenBuilder
1103    where
1104        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1105    {
1106        WhenBuilder {
1107            parent: self,
1108            predicate: std::sync::Arc::new(predicate),
1109            steps: vec![],
1110        }
1111    }
1112
1113    /// Open an `otherwise` clause. Executed when no `when` predicate matched.
1114    ///
1115    /// Only one `otherwise` is allowed per `choice`. Call this after all `.when()` clauses.
1116    pub fn otherwise(self) -> OtherwiseBuilder {
1117        OtherwiseBuilder {
1118            parent: self,
1119            steps: vec![],
1120        }
1121    }
1122
1123    /// Close the choice scope. Packages all accumulated `when` clauses and
1124    /// optional `otherwise` into a `BuilderStep::Choice` and returns the
1125    /// parent `RouteBuilder`.
1126    pub fn end_choice(mut self) -> RouteBuilder {
1127        let step = BuilderStep::Choice {
1128            whens: self.whens,
1129            otherwise: self._otherwise,
1130        };
1131        self.parent.steps.push(step);
1132        self.parent
1133    }
1134}
1135
1136/// Builder for the sub-pipeline within a `.when()` ... `.end_when()` block.
1137pub struct WhenBuilder {
1138    parent: ChoiceBuilder,
1139    predicate: camel_api::FilterPredicate,
1140    steps: Vec<BuilderStep>,
1141}
1142
1143impl WhenBuilder {
1144    /// Close the when scope. Packages the accumulated sub-steps into a
1145    /// `WhenStep` and returns the parent `ChoiceBuilder`.
1146    pub fn end_when(mut self) -> ChoiceBuilder {
1147        self.parent.whens.push(WhenStep {
1148            predicate: self.predicate,
1149            steps: self.steps,
1150        });
1151        self.parent
1152    }
1153}
1154
1155impl StepAccumulator for WhenBuilder {
1156    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1157        &mut self.steps
1158    }
1159}
1160
1161/// Builder for the sub-pipeline within an `.otherwise()` ... `.end_otherwise()` block.
1162pub struct OtherwiseBuilder {
1163    parent: ChoiceBuilder,
1164    steps: Vec<BuilderStep>,
1165}
1166
1167impl OtherwiseBuilder {
1168    /// Close the otherwise scope and return the parent `ChoiceBuilder`.
1169    pub fn end_otherwise(self) -> ChoiceBuilder {
1170        let OtherwiseBuilder { mut parent, steps } = self;
1171        parent._otherwise = Some(steps);
1172        parent
1173    }
1174}
1175
1176impl StepAccumulator for OtherwiseBuilder {
1177    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1178        &mut self.steps
1179    }
1180}
1181
1182/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
1183///
1184/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1185/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
1186///
1187/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
1188/// and returns the parent `RouteBuilder`.
1189pub struct MulticastBuilder {
1190    parent: RouteBuilder,
1191    steps: Vec<BuilderStep>,
1192    config: MulticastConfig,
1193}
1194
1195impl MulticastBuilder {
1196    pub fn parallel(mut self, parallel: bool) -> Self {
1197        self.config = self.config.parallel(parallel);
1198        self
1199    }
1200
1201    pub fn parallel_limit(mut self, limit: usize) -> Self {
1202        self.config = self.config.parallel_limit(limit);
1203        self
1204    }
1205
1206    pub fn stop_on_exception(mut self, stop: bool) -> Self {
1207        self.config = self.config.stop_on_exception(stop);
1208        self
1209    }
1210
1211    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1212        self.config = self.config.timeout(duration);
1213        self
1214    }
1215
1216    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1217        self.config = self.config.aggregation(strategy);
1218        self
1219    }
1220
1221    pub fn end_multicast(mut self) -> RouteBuilder {
1222        let step = BuilderStep::Multicast {
1223            steps: self.steps,
1224            config: self.config,
1225        };
1226        self.parent.steps.push(step);
1227        self.parent
1228    }
1229}
1230
1231impl StepAccumulator for MulticastBuilder {
1232    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1233        &mut self.steps
1234    }
1235}
1236
1237/// Builder for the sub-pipeline within a `.throttle()` ... `.end_throttle()` block.
1238///
1239/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1240/// but NOT `.build()` and NOT `.throttle()` (no nested throttles).
1241///
1242/// Calling `.end_throttle()` packages the sub-steps into a `BuilderStep::Throttle`
1243/// and returns the parent `RouteBuilder`.
1244pub struct ThrottleBuilder {
1245    parent: RouteBuilder,
1246    config: ThrottlerConfig,
1247    steps: Vec<BuilderStep>,
1248}
1249
1250impl ThrottleBuilder {
1251    /// Set the throttle strategy. Default is `Delay`.
1252    ///
1253    /// - `Delay`: Queue messages until capacity available
1254    /// - `Reject`: Return error immediately when throttled
1255    /// - `Drop`: Silently discard excess messages
1256    pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1257        self.config = self.config.strategy(strategy);
1258        self
1259    }
1260
1261    /// Close the throttle scope. Packages the accumulated sub-steps into a
1262    /// `BuilderStep::Throttle` and returns the parent `RouteBuilder`.
1263    pub fn end_throttle(mut self) -> RouteBuilder {
1264        let step = BuilderStep::Throttle {
1265            config: self.config,
1266            steps: self.steps,
1267        };
1268        self.parent.steps.push(step);
1269        self.parent
1270    }
1271}
1272
1273impl StepAccumulator for ThrottleBuilder {
1274    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1275        &mut self.steps
1276    }
1277}
1278
1279/// Builder for the sub-pipeline within a `.loop_count()` / `.loop_while()` ... `.end_loop()` block.
1280pub struct LoopBuilder {
1281    parent: RouteBuilder,
1282    config: LoopConfig,
1283    steps: Vec<BuilderStep>,
1284}
1285
1286impl LoopBuilder {
1287    pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1288        LoopInLoopBuilder {
1289            parent: self,
1290            config: LoopConfig {
1291                mode: LoopMode::Count(count),
1292            },
1293            steps: vec![],
1294        }
1295    }
1296
1297    pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1298    where
1299        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1300    {
1301        LoopInLoopBuilder {
1302            parent: self,
1303            config: LoopConfig {
1304                mode: LoopMode::While(std::sync::Arc::new(predicate)),
1305            },
1306            steps: vec![],
1307        }
1308    }
1309
1310    pub fn end_loop(mut self) -> RouteBuilder {
1311        let step = BuilderStep::Loop {
1312            config: self.config,
1313            steps: self.steps,
1314        };
1315        self.parent.steps.push(step);
1316        self.parent
1317    }
1318}
1319
1320impl StepAccumulator for LoopBuilder {
1321    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1322        &mut self.steps
1323    }
1324}
1325
1326pub struct LoopInLoopBuilder {
1327    parent: LoopBuilder,
1328    config: LoopConfig,
1329    steps: Vec<BuilderStep>,
1330}
1331
1332impl LoopInLoopBuilder {
1333    pub fn end_loop(mut self) -> LoopBuilder {
1334        let step = BuilderStep::Loop {
1335            config: self.config,
1336            steps: self.steps,
1337        };
1338        self.parent.steps.push(step);
1339        self.parent
1340    }
1341}
1342
1343impl StepAccumulator for LoopInLoopBuilder {
1344    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1345        &mut self.steps
1346    }
1347}
1348
1349/// Builder for the sub-pipeline within a `.load_balance()` ... `.end_load_balance()` block.
1350///
1351/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1352/// but NOT `.build()` and NOT `.load_balance()` (no nested load balancers).
1353///
1354/// Calling `.end_load_balance()` packages the sub-steps into a `BuilderStep::LoadBalance`
1355/// and returns the parent `RouteBuilder`.
1356pub struct LoadBalancerBuilder {
1357    parent: RouteBuilder,
1358    config: LoadBalancerConfig,
1359    steps: Vec<BuilderStep>,
1360}
1361
1362impl LoadBalancerBuilder {
1363    /// Set the load balance strategy to round-robin (default).
1364    pub fn round_robin(mut self) -> Self {
1365        self.config = LoadBalancerConfig::round_robin();
1366        self
1367    }
1368
1369    /// Set the load balance strategy to random selection.
1370    pub fn random(mut self) -> Self {
1371        self.config = LoadBalancerConfig::random();
1372        self
1373    }
1374
1375    /// Set the load balance strategy to weighted selection.
1376    ///
1377    /// Each endpoint is assigned a weight that determines its probability
1378    /// of being selected.
1379    pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1380        self.config = LoadBalancerConfig::weighted(weights);
1381        self
1382    }
1383
1384    /// Set the load balance strategy to failover.
1385    ///
1386    /// Exchanges are sent to the first endpoint; on failure, the next endpoint
1387    /// is tried.
1388    pub fn failover(mut self) -> Self {
1389        self.config = LoadBalancerConfig::failover();
1390        self
1391    }
1392
1393    /// Enable or disable parallel execution of endpoints.
1394    ///
1395    /// When enabled, all endpoints receive the exchange simultaneously.
1396    /// When disabled (default), only one endpoint is selected per exchange.
1397    pub fn parallel(mut self, parallel: bool) -> Self {
1398        self.config = self.config.parallel(parallel);
1399        self
1400    }
1401
1402    /// Close the load balance scope. Packages the accumulated sub-steps into a
1403    /// `BuilderStep::LoadBalance` and returns the parent `RouteBuilder`.
1404    pub fn end_load_balance(mut self) -> RouteBuilder {
1405        let step = BuilderStep::LoadBalance {
1406            config: self.config,
1407            steps: self.steps,
1408        };
1409        self.parent.steps.push(step);
1410        self.parent
1411    }
1412}
1413
1414impl StepAccumulator for LoadBalancerBuilder {
1415    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1416        &mut self.steps
1417    }
1418}
1419
1420// ---------------------------------------------------------------------------
1421// Tests
1422// ---------------------------------------------------------------------------
1423
1424#[cfg(test)]
1425mod tests {
1426    use super::*;
1427    use camel_api::error_handler::ErrorHandlerConfig;
1428    use camel_api::load_balancer::LoadBalanceStrategy;
1429    use camel_api::{Exchange, Message};
1430    use camel_core::route::BuilderStep;
1431    use std::sync::Arc;
1432    use std::time::Duration;
1433    use tower::{Service, ServiceExt};
1434
1435    #[test]
1436    fn test_builder_from_creates_definition() {
1437        let definition = RouteBuilder::from("timer:tick")
1438            .route_id("test-route")
1439            .build()
1440            .unwrap();
1441        assert_eq!(definition.from_uri(), "timer:tick");
1442    }
1443
1444    #[test]
1445    fn test_builder_empty_from_uri_errors() {
1446        let result = RouteBuilder::from("").route_id("test-route").build();
1447        assert!(result.is_err());
1448    }
1449
1450    #[test]
1451    fn test_builder_to_adds_step() {
1452        let definition = RouteBuilder::from("timer:tick")
1453            .route_id("test-route")
1454            .to("log:info")
1455            .build()
1456            .unwrap();
1457
1458        assert_eq!(definition.from_uri(), "timer:tick");
1459        // We can verify steps were added by checking the structure
1460        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1461    }
1462
1463    #[test]
1464    fn test_builder_filter_adds_filter_step() {
1465        let definition = RouteBuilder::from("timer:tick")
1466            .route_id("test-route")
1467            .filter(|_ex| true)
1468            .to("mock:result")
1469            .end_filter()
1470            .build()
1471            .unwrap();
1472
1473        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1474    }
1475
1476    #[test]
1477    fn test_builder_set_header_adds_processor_step() {
1478        let definition = RouteBuilder::from("timer:tick")
1479            .route_id("test-route")
1480            .set_header("key", Value::String("value".into()))
1481            .build()
1482            .unwrap();
1483
1484        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1485    }
1486
1487    #[test]
1488    fn test_builder_map_body_adds_processor_step() {
1489        let definition = RouteBuilder::from("timer:tick")
1490            .route_id("test-route")
1491            .map_body(|body| body)
1492            .build()
1493            .unwrap();
1494
1495        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1496    }
1497
1498    #[test]
1499    fn test_builder_process_adds_processor_step() {
1500        let definition = RouteBuilder::from("timer:tick")
1501            .route_id("test-route")
1502            .process(|ex| async move { Ok(ex) })
1503            .build()
1504            .unwrap();
1505
1506        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1507    }
1508
1509    #[test]
1510    fn test_builder_chain_multiple_steps() {
1511        let definition = RouteBuilder::from("timer:tick")
1512            .route_id("test-route")
1513            .set_header("source", Value::String("timer".into()))
1514            .filter(|ex| ex.input.header("source").is_some())
1515            .to("log:info")
1516            .end_filter()
1517            .to("mock:result")
1518            .build()
1519            .unwrap();
1520
1521        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
1522        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
1523        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
1524        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1525    }
1526
1527    #[test]
1528    fn test_loop_count_builder() {
1529        use camel_api::loop_eip::LoopMode;
1530
1531        let def = RouteBuilder::from("direct:start")
1532            .route_id("loop-test")
1533            .loop_count(3)
1534            .to("mock:inside")
1535            .end_loop()
1536            .to("mock:after")
1537            .build()
1538            .unwrap();
1539
1540        assert_eq!(def.steps().len(), 2);
1541        match &def.steps()[0] {
1542            BuilderStep::Loop { config, steps } => {
1543                assert!(matches!(config.mode, LoopMode::Count(3)));
1544                assert_eq!(steps.len(), 1);
1545            }
1546            other => panic!("Expected Loop, got {:?}", other),
1547        }
1548        assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1549    }
1550
1551    #[test]
1552    fn test_loop_while_builder() {
1553        use camel_api::loop_eip::LoopMode;
1554
1555        let def = RouteBuilder::from("direct:start")
1556            .route_id("loop-while-test")
1557            .loop_while(|_ex| true)
1558            .to("mock:retry")
1559            .end_loop()
1560            .build()
1561            .unwrap();
1562
1563        assert_eq!(def.steps().len(), 1);
1564        match &def.steps()[0] {
1565            BuilderStep::Loop { config, steps } => {
1566                assert!(matches!(config.mode, LoopMode::While(_)));
1567                assert_eq!(steps.len(), 1);
1568            }
1569            other => panic!("Expected Loop, got {:?}", other),
1570        }
1571    }
1572
1573    #[test]
1574    fn test_nested_loop_builder() {
1575        use camel_api::loop_eip::LoopMode;
1576
1577        let def = RouteBuilder::from("direct:start")
1578            .route_id("nested-loop-test")
1579            .loop_count(2)
1580            .to("mock:outer")
1581            .loop_count(3)
1582            .to("mock:inner")
1583            .end_loop()
1584            .end_loop()
1585            .to("mock:after")
1586            .build()
1587            .unwrap();
1588
1589        assert_eq!(def.steps().len(), 2);
1590        match &def.steps()[0] {
1591            BuilderStep::Loop { steps, .. } => {
1592                assert_eq!(steps.len(), 2);
1593                match &steps[1] {
1594                    BuilderStep::Loop {
1595                        config,
1596                        steps: inner_steps,
1597                    } => {
1598                        assert!(matches!(config.mode, LoopMode::Count(3)));
1599                        assert_eq!(inner_steps.len(), 1);
1600                    }
1601                    other => panic!("Expected nested Loop, got {:?}", other),
1602                }
1603            }
1604            other => panic!("Expected outer Loop, got {:?}", other),
1605        }
1606    }
1607
1608    // -----------------------------------------------------------------------
1609    // Processor behavior tests — exercise the real Tower services directly
1610    // -----------------------------------------------------------------------
1611
1612    #[tokio::test]
1613    async fn test_set_header_processor_works() {
1614        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1615        let exchange = Exchange::new(Message::new("test"));
1616        let result = svc.call(exchange).await.unwrap();
1617        assert_eq!(
1618            result.input.header("greeting"),
1619            Some(&Value::String("hello".into()))
1620        );
1621    }
1622
1623    #[tokio::test]
1624    async fn test_filter_processor_passes() {
1625        use camel_api::BoxProcessorExt;
1626        use camel_processor::FilterService;
1627
1628        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1629        let mut svc =
1630            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1631        let exchange = Exchange::new(Message::new("pass"));
1632        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1633        assert_eq!(result.input.body.as_text(), Some("pass"));
1634    }
1635
1636    #[tokio::test]
1637    async fn test_filter_processor_blocks() {
1638        use camel_api::BoxProcessorExt;
1639        use camel_processor::FilterService;
1640
1641        let sub = BoxProcessor::from_fn(|_ex| {
1642            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1643        });
1644        let mut svc =
1645            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1646        let exchange = Exchange::new(Message::new("reject"));
1647        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1648        assert_eq!(result.input.body.as_text(), Some("reject"));
1649    }
1650
1651    #[tokio::test]
1652    async fn test_map_body_processor_works() {
1653        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1654            if let Some(text) = body.as_text() {
1655                Body::Text(text.to_uppercase())
1656            } else {
1657                body
1658            }
1659        });
1660        let exchange = Exchange::new(Message::new("hello"));
1661        let result = mapper.oneshot(exchange).await.unwrap();
1662        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1663    }
1664
1665    #[tokio::test]
1666    async fn test_process_custom_processor_works() {
1667        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1668            ex.set_property("custom", Value::Bool(true));
1669            Ok(ex)
1670        });
1671        let exchange = Exchange::new(Message::default());
1672        let result = processor.oneshot(exchange).await.unwrap();
1673        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1674    }
1675
1676    // -----------------------------------------------------------------------
1677    // Sequential pipeline test
1678    // -----------------------------------------------------------------------
1679
1680    #[tokio::test]
1681    async fn test_compose_pipeline_runs_steps_in_order() {
1682        use camel_core::route::compose_pipeline;
1683
1684        let processors = vec![
1685            BoxProcessor::new(SetHeader::new(
1686                IdentityProcessor,
1687                "step",
1688                Value::String("one".into()),
1689            )),
1690            BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1691                if let Some(text) = body.as_text() {
1692                    Body::Text(format!("{}-processed", text))
1693                } else {
1694                    body
1695                }
1696            })),
1697        ];
1698
1699        let pipeline = compose_pipeline(processors);
1700        let exchange = Exchange::new(Message::new("hello"));
1701        let result = pipeline.oneshot(exchange).await.unwrap();
1702
1703        assert_eq!(
1704            result.input.header("step"),
1705            Some(&Value::String("one".into()))
1706        );
1707        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1708    }
1709
1710    #[tokio::test]
1711    async fn test_compose_pipeline_empty_is_identity() {
1712        use camel_core::route::compose_pipeline;
1713
1714        let pipeline = compose_pipeline(vec![]);
1715        let exchange = Exchange::new(Message::new("unchanged"));
1716        let result = pipeline.oneshot(exchange).await.unwrap();
1717        assert_eq!(result.input.body.as_text(), Some("unchanged"));
1718    }
1719
1720    // -----------------------------------------------------------------------
1721    // Circuit breaker builder tests
1722    // -----------------------------------------------------------------------
1723
1724    #[test]
1725    fn test_builder_circuit_breaker_sets_config() {
1726        use camel_api::circuit_breaker::CircuitBreakerConfig;
1727
1728        let config = CircuitBreakerConfig::new().failure_threshold(5);
1729        let definition = RouteBuilder::from("timer:tick")
1730            .route_id("test-route")
1731            .circuit_breaker(config)
1732            .build()
1733            .unwrap();
1734
1735        let cb = definition
1736            .circuit_breaker_config()
1737            .expect("circuit breaker should be set");
1738        assert_eq!(cb.failure_threshold, 5);
1739    }
1740
1741    #[test]
1742    fn test_builder_circuit_breaker_with_error_handler() {
1743        use camel_api::circuit_breaker::CircuitBreakerConfig;
1744        use camel_api::error_handler::ErrorHandlerConfig;
1745
1746        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1747        let eh_config = ErrorHandlerConfig::log_only();
1748
1749        let definition = RouteBuilder::from("timer:tick")
1750            .route_id("test-route")
1751            .to("log:info")
1752            .circuit_breaker(cb_config)
1753            .error_handler(eh_config)
1754            .build()
1755            .unwrap();
1756
1757        assert!(
1758            definition.circuit_breaker_config().is_some(),
1759            "circuit breaker config should be set"
1760        );
1761        // Route definition was built successfully with both configs.
1762    }
1763
1764    #[test]
1765    fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1766        let definition = RouteBuilder::from("direct:start")
1767            .route_id("test-route")
1768            .dead_letter_channel("log:dlc")
1769            .on_exception(|e| matches!(e, CamelError::Io(_)))
1770            .retry(3)
1771            .handled_by("log:io")
1772            .end_on_exception()
1773            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1774            .retry(1)
1775            .end_on_exception()
1776            .to("mock:out")
1777            .build()
1778            .expect("route should build");
1779
1780        let cfg = definition
1781            .error_handler_config()
1782            .expect("error handler should be set");
1783        assert_eq!(cfg.policies.len(), 2);
1784        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1785        assert_eq!(
1786            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1787            Some(3)
1788        );
1789        assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1790        assert_eq!(
1791            cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1792            Some(1)
1793        );
1794    }
1795
1796    #[test]
1797    fn test_builder_on_exception_mixed_mode_rejected() {
1798        let result = RouteBuilder::from("direct:start")
1799            .route_id("test-route")
1800            .error_handler(ErrorHandlerConfig::log_only())
1801            .on_exception(|_e| true)
1802            .end_on_exception()
1803            .to("mock:out")
1804            .build();
1805
1806        let err = result.err().expect("mixed mode should fail with an error");
1807
1808        assert!(
1809            format!("{err}").contains("mixed error handler modes"),
1810            "unexpected error: {err}"
1811        );
1812    }
1813
1814    #[test]
1815    fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1816        let definition = RouteBuilder::from("direct:start")
1817            .route_id("test-route")
1818            .on_exception(|_e| true)
1819            .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1820            .with_jitter(0.5)
1821            .end_on_exception()
1822            .to("mock:out")
1823            .build()
1824            .expect("route should build");
1825
1826        let cfg = definition
1827            .error_handler_config()
1828            .expect("error handler should be set");
1829        assert_eq!(cfg.policies.len(), 1);
1830        assert!(cfg.policies[0].retry.is_none());
1831    }
1832
1833    #[test]
1834    fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1835        let definition = RouteBuilder::from("direct:start")
1836            .route_id("test-route")
1837            .dead_letter_channel("log:dlc")
1838            .to("mock:out")
1839            .build()
1840            .expect("route should build");
1841
1842        let cfg = definition
1843            .error_handler_config()
1844            .expect("error handler should be set");
1845        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1846        assert!(cfg.policies.is_empty());
1847    }
1848
1849    #[test]
1850    fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1851        let definition = RouteBuilder::from("direct:start")
1852            .route_id("test-route")
1853            .dead_letter_channel("log:first")
1854            .on_exception(|e| matches!(e, CamelError::Io(_)))
1855            .retry(2)
1856            .end_on_exception()
1857            .dead_letter_channel("log:second")
1858            .to("mock:out")
1859            .build()
1860            .expect("route should build");
1861
1862        let cfg = definition
1863            .error_handler_config()
1864            .expect("error handler should be set");
1865        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1866        assert_eq!(cfg.policies.len(), 1);
1867        assert_eq!(
1868            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1869            Some(2)
1870        );
1871    }
1872
1873    #[test]
1874    fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
1875        let definition = RouteBuilder::from("direct:start")
1876            .route_id("test-route")
1877            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1878            .retry(1)
1879            .end_on_exception()
1880            .to("mock:out")
1881            .build()
1882            .expect("route should build");
1883
1884        let cfg = definition
1885            .error_handler_config()
1886            .expect("error handler should be set");
1887        assert!(cfg.dlc_uri.is_none());
1888        assert_eq!(cfg.policies.len(), 1);
1889    }
1890
1891    #[test]
1892    fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
1893        let first = ErrorHandlerConfig::dead_letter_channel("log:first");
1894        let second = ErrorHandlerConfig::dead_letter_channel("log:second");
1895
1896        let definition = RouteBuilder::from("direct:start")
1897            .route_id("test-route")
1898            .error_handler(first)
1899            .error_handler(second)
1900            .to("mock:out")
1901            .build()
1902            .expect("route should build");
1903
1904        let cfg = definition
1905            .error_handler_config()
1906            .expect("error handler should be set");
1907        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1908    }
1909
1910    // --- Splitter builder tests ---
1911
1912    #[test]
1913    fn test_split_builder_typestate() {
1914        use camel_api::splitter::{SplitterConfig, split_body_lines};
1915
1916        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
1917        let definition = RouteBuilder::from("timer:test?period=1000")
1918            .route_id("test-route")
1919            .split(SplitterConfig::new(split_body_lines()))
1920            .to("mock:per-fragment")
1921            .end_split()
1922            .to("mock:final")
1923            .build()
1924            .unwrap();
1925
1926        // Should have 2 top-level steps: Split + To("mock:final")
1927        assert_eq!(definition.steps().len(), 2);
1928    }
1929
1930    #[test]
1931    fn test_split_builder_steps_collected() {
1932        use camel_api::splitter::{SplitterConfig, split_body_lines};
1933
1934        let definition = RouteBuilder::from("timer:test?period=1000")
1935            .route_id("test-route")
1936            .split(SplitterConfig::new(split_body_lines()))
1937            .set_header("fragment", Value::String("yes".into()))
1938            .to("mock:per-fragment")
1939            .end_split()
1940            .build()
1941            .unwrap();
1942
1943        // Should have 1 top-level step: Split (containing 2 sub-steps)
1944        assert_eq!(definition.steps().len(), 1);
1945        match &definition.steps()[0] {
1946            BuilderStep::Split { steps, .. } => {
1947                assert_eq!(steps.len(), 2); // SetHeader + To
1948            }
1949            other => panic!("Expected Split, got {:?}", other),
1950        }
1951    }
1952
1953    #[test]
1954    fn test_split_builder_config_propagated() {
1955        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1956
1957        let definition = RouteBuilder::from("timer:test?period=1000")
1958            .route_id("test-route")
1959            .split(
1960                SplitterConfig::new(split_body_lines())
1961                    .parallel(true)
1962                    .parallel_limit(4)
1963                    .aggregation(AggregationStrategy::CollectAll),
1964            )
1965            .to("mock:per-fragment")
1966            .end_split()
1967            .build()
1968            .unwrap();
1969
1970        match &definition.steps()[0] {
1971            BuilderStep::Split { config, .. } => {
1972                assert!(config.parallel);
1973                assert_eq!(config.parallel_limit, Some(4));
1974                assert!(matches!(
1975                    config.aggregation,
1976                    AggregationStrategy::CollectAll
1977                ));
1978            }
1979            other => panic!("Expected Split, got {:?}", other),
1980        }
1981    }
1982
1983    #[test]
1984    fn test_aggregate_builder_adds_step() {
1985        use camel_api::aggregator::AggregatorConfig;
1986        use camel_core::route::BuilderStep;
1987
1988        let definition = RouteBuilder::from("timer:tick")
1989            .route_id("test-route")
1990            .aggregate(
1991                AggregatorConfig::correlate_by("key")
1992                    .complete_when_size(2)
1993                    .build(),
1994            )
1995            .build()
1996            .unwrap();
1997
1998        assert_eq!(definition.steps().len(), 1);
1999        assert!(matches!(
2000            definition.steps()[0],
2001            BuilderStep::Aggregate { .. }
2002        ));
2003    }
2004
2005    #[test]
2006    fn test_aggregate_in_split_builder() {
2007        use camel_api::aggregator::AggregatorConfig;
2008        use camel_api::splitter::{SplitterConfig, split_body_lines};
2009        use camel_core::route::BuilderStep;
2010
2011        let definition = RouteBuilder::from("timer:tick")
2012            .route_id("test-route")
2013            .split(SplitterConfig::new(split_body_lines()))
2014            .aggregate(
2015                AggregatorConfig::correlate_by("key")
2016                    .complete_when_size(1)
2017                    .build(),
2018            )
2019            .end_split()
2020            .build()
2021            .unwrap();
2022
2023        assert_eq!(definition.steps().len(), 1);
2024        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2025            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2026        } else {
2027            panic!("expected Split step");
2028        }
2029    }
2030
2031    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
2032
2033    #[test]
2034    fn test_builder_set_body_static_adds_processor() {
2035        let definition = RouteBuilder::from("timer:tick")
2036            .route_id("test-route")
2037            .set_body("fixed")
2038            .build()
2039            .unwrap();
2040        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2041    }
2042
2043    #[test]
2044    fn test_builder_set_body_fn_adds_processor() {
2045        let definition = RouteBuilder::from("timer:tick")
2046            .route_id("test-route")
2047            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2048            .build()
2049            .unwrap();
2050        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2051    }
2052
2053    #[test]
2054    fn transform_alias_produces_same_as_set_body() {
2055        let route_transform = RouteBuilder::from("timer:tick")
2056            .route_id("test-route")
2057            .transform("hello")
2058            .build()
2059            .unwrap();
2060
2061        let route_set_body = RouteBuilder::from("timer:tick")
2062            .route_id("test-route")
2063            .set_body("hello")
2064            .build()
2065            .unwrap();
2066
2067        assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2068    }
2069
2070    #[test]
2071    fn test_builder_set_header_fn_adds_processor() {
2072        let definition = RouteBuilder::from("timer:tick")
2073            .route_id("test-route")
2074            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2075            .build()
2076            .unwrap();
2077        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2078    }
2079
2080    #[tokio::test]
2081    async fn test_set_body_static_processor_works() {
2082        use camel_core::route::compose_pipeline;
2083        let def = RouteBuilder::from("t:t")
2084            .route_id("test-route")
2085            .set_body("replaced")
2086            .build()
2087            .unwrap();
2088        let pipeline = compose_pipeline(
2089            def.steps()
2090                .iter()
2091                .filter_map(|s| {
2092                    if let BuilderStep::Processor(p) = s {
2093                        Some(p.clone())
2094                    } else {
2095                        None
2096                    }
2097                })
2098                .collect(),
2099        );
2100        let exchange = Exchange::new(Message::new("original"));
2101        let result = pipeline.oneshot(exchange).await.unwrap();
2102        assert_eq!(result.input.body.as_text(), Some("replaced"));
2103    }
2104
2105    #[tokio::test]
2106    async fn test_set_body_fn_processor_works() {
2107        use camel_core::route::compose_pipeline;
2108        let def = RouteBuilder::from("t:t")
2109            .route_id("test-route")
2110            .set_body_fn(|ex: &Exchange| {
2111                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2112            })
2113            .build()
2114            .unwrap();
2115        let pipeline = compose_pipeline(
2116            def.steps()
2117                .iter()
2118                .filter_map(|s| {
2119                    if let BuilderStep::Processor(p) = s {
2120                        Some(p.clone())
2121                    } else {
2122                        None
2123                    }
2124                })
2125                .collect(),
2126        );
2127        let exchange = Exchange::new(Message::new("hello"));
2128        let result = pipeline.oneshot(exchange).await.unwrap();
2129        assert_eq!(result.input.body.as_text(), Some("HELLO"));
2130    }
2131
2132    #[tokio::test]
2133    async fn test_set_header_fn_processor_works() {
2134        use camel_core::route::compose_pipeline;
2135        let def = RouteBuilder::from("t:t")
2136            .route_id("test-route")
2137            .set_header_fn("echo", |ex: &Exchange| {
2138                ex.input
2139                    .body
2140                    .as_text()
2141                    .map(|t| Value::String(t.into()))
2142                    .unwrap_or(Value::Null)
2143            })
2144            .build()
2145            .unwrap();
2146        let pipeline = compose_pipeline(
2147            def.steps()
2148                .iter()
2149                .filter_map(|s| {
2150                    if let BuilderStep::Processor(p) = s {
2151                        Some(p.clone())
2152                    } else {
2153                        None
2154                    }
2155                })
2156                .collect(),
2157        );
2158        let exchange = Exchange::new(Message::new("ping"));
2159        let result = pipeline.oneshot(exchange).await.unwrap();
2160        assert_eq!(
2161            result.input.header("echo"),
2162            Some(&Value::String("ping".into()))
2163        );
2164    }
2165
2166    // ── FilterBuilder typestate tests ─────────────────────────────────────
2167
2168    #[test]
2169    fn test_filter_builder_typestate() {
2170        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2171            .route_id("test-route")
2172            .filter(|_ex| true)
2173            .to("mock:inner")
2174            .end_filter()
2175            .to("mock:outer")
2176            .build();
2177        assert!(result.is_ok());
2178    }
2179
2180    #[test]
2181    fn test_filter_builder_steps_collected() {
2182        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2183            .route_id("test-route")
2184            .filter(|_ex| true)
2185            .to("mock:inner")
2186            .end_filter()
2187            .build()
2188            .unwrap();
2189
2190        assert_eq!(definition.steps().len(), 1);
2191        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2192    }
2193
2194    #[test]
2195    fn test_wire_tap_builder_adds_step() {
2196        let definition = RouteBuilder::from("timer:tick")
2197            .route_id("test-route")
2198            .wire_tap("mock:tap")
2199            .to("mock:result")
2200            .build()
2201            .unwrap();
2202
2203        assert_eq!(definition.steps().len(), 2);
2204        assert!(
2205            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2206        );
2207        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2208    }
2209
2210    // ── MulticastBuilder typestate tests ─────────────────────────────────────
2211
2212    #[test]
2213    fn test_multicast_builder_typestate() {
2214        let definition = RouteBuilder::from("timer:tick")
2215            .route_id("test-route")
2216            .multicast()
2217            .to("direct:a")
2218            .to("direct:b")
2219            .end_multicast()
2220            .to("mock:result")
2221            .build()
2222            .unwrap();
2223
2224        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
2225    }
2226
2227    #[test]
2228    fn test_multicast_builder_steps_collected() {
2229        let definition = RouteBuilder::from("timer:tick")
2230            .route_id("test-route")
2231            .multicast()
2232            .to("direct:a")
2233            .to("direct:b")
2234            .end_multicast()
2235            .build()
2236            .unwrap();
2237
2238        match &definition.steps()[0] {
2239            BuilderStep::Multicast { steps, .. } => {
2240                assert_eq!(steps.len(), 2);
2241            }
2242            other => panic!("Expected Multicast, got {:?}", other),
2243        }
2244    }
2245
2246    // ── Concurrency builder tests ─────────────────────────────────────
2247
2248    #[test]
2249    fn test_builder_concurrent_sets_concurrency() {
2250        use camel_component_api::ConcurrencyModel;
2251
2252        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2253            .route_id("test-route")
2254            .concurrent(16)
2255            .to("log:info")
2256            .build()
2257            .unwrap();
2258
2259        assert_eq!(
2260            definition.concurrency_override(),
2261            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2262        );
2263    }
2264
2265    #[test]
2266    fn test_builder_concurrent_zero_means_unbounded() {
2267        use camel_component_api::ConcurrencyModel;
2268
2269        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2270            .route_id("test-route")
2271            .concurrent(0)
2272            .to("log:info")
2273            .build()
2274            .unwrap();
2275
2276        assert_eq!(
2277            definition.concurrency_override(),
2278            Some(&ConcurrencyModel::Concurrent { max: None })
2279        );
2280    }
2281
2282    #[test]
2283    fn test_builder_sequential_sets_concurrency() {
2284        use camel_component_api::ConcurrencyModel;
2285
2286        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2287            .route_id("test-route")
2288            .sequential()
2289            .to("log:info")
2290            .build()
2291            .unwrap();
2292
2293        assert_eq!(
2294            definition.concurrency_override(),
2295            Some(&ConcurrencyModel::Sequential)
2296        );
2297    }
2298
2299    #[test]
2300    fn test_builder_default_concurrency_is_none() {
2301        let definition = RouteBuilder::from("timer:tick")
2302            .route_id("test-route")
2303            .to("log:info")
2304            .build()
2305            .unwrap();
2306
2307        assert_eq!(definition.concurrency_override(), None);
2308    }
2309
2310    // ── Route lifecycle builder tests ─────────────────────────────────────
2311
2312    #[test]
2313    fn test_builder_route_id_sets_id() {
2314        let definition = RouteBuilder::from("timer:tick")
2315            .route_id("my-route")
2316            .build()
2317            .unwrap();
2318
2319        assert_eq!(definition.route_id(), "my-route");
2320    }
2321
2322    #[test]
2323    fn test_build_without_route_id_fails() {
2324        let result = RouteBuilder::from("timer:tick?period=1000")
2325            .to("log:info")
2326            .build();
2327        let err = match result {
2328            Err(e) => e.to_string(),
2329            Ok(_) => panic!("build() should fail without route_id"),
2330        };
2331        assert!(
2332            err.contains("route_id"),
2333            "error should mention route_id, got: {}",
2334            err
2335        );
2336    }
2337
2338    #[test]
2339    fn test_builder_auto_startup_false() {
2340        let definition = RouteBuilder::from("timer:tick")
2341            .route_id("test-route")
2342            .auto_startup(false)
2343            .build()
2344            .unwrap();
2345
2346        assert!(!definition.auto_startup());
2347    }
2348
2349    #[test]
2350    fn test_builder_startup_order_custom() {
2351        let definition = RouteBuilder::from("timer:tick")
2352            .route_id("test-route")
2353            .startup_order(50)
2354            .build()
2355            .unwrap();
2356
2357        assert_eq!(definition.startup_order(), 50);
2358    }
2359
2360    #[test]
2361    fn test_builder_defaults() {
2362        let definition = RouteBuilder::from("timer:tick")
2363            .route_id("test-route")
2364            .build()
2365            .unwrap();
2366
2367        assert_eq!(definition.route_id(), "test-route");
2368        assert!(definition.auto_startup());
2369        assert_eq!(definition.startup_order(), 1000);
2370    }
2371
2372    // ── Choice typestate tests ──────────────────────────────────────────────────
2373
2374    #[test]
2375    fn test_choice_builder_single_when() {
2376        let definition = RouteBuilder::from("timer:tick")
2377            .route_id("test-route")
2378            .choice()
2379            .when(|ex: &Exchange| ex.input.header("type").is_some())
2380            .to("mock:typed")
2381            .end_when()
2382            .end_choice()
2383            .build()
2384            .unwrap();
2385        assert_eq!(definition.steps().len(), 1);
2386        assert!(
2387            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2388            if whens.len() == 1 && otherwise.is_none())
2389        );
2390    }
2391
2392    #[test]
2393    fn test_choice_builder_when_otherwise() {
2394        let definition = RouteBuilder::from("timer:tick")
2395            .route_id("test-route")
2396            .choice()
2397            .when(|ex: &Exchange| ex.input.header("a").is_some())
2398            .to("mock:a")
2399            .end_when()
2400            .otherwise()
2401            .to("mock:fallback")
2402            .end_otherwise()
2403            .end_choice()
2404            .build()
2405            .unwrap();
2406        assert!(
2407            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2408            if whens.len() == 1 && otherwise.is_some())
2409        );
2410    }
2411
2412    #[test]
2413    fn test_choice_builder_multiple_whens() {
2414        let definition = RouteBuilder::from("timer:tick")
2415            .route_id("test-route")
2416            .choice()
2417            .when(|ex: &Exchange| ex.input.header("a").is_some())
2418            .to("mock:a")
2419            .end_when()
2420            .when(|ex: &Exchange| ex.input.header("b").is_some())
2421            .to("mock:b")
2422            .end_when()
2423            .end_choice()
2424            .build()
2425            .unwrap();
2426        assert!(
2427            matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2428            if whens.len() == 2)
2429        );
2430    }
2431
2432    #[test]
2433    fn test_choice_step_after_choice() {
2434        // Steps after end_choice() are added to the outer pipeline, not inside choice.
2435        let definition = RouteBuilder::from("timer:tick")
2436            .route_id("test-route")
2437            .choice()
2438            .when(|_ex: &Exchange| true)
2439            .to("mock:inner")
2440            .end_when()
2441            .end_choice()
2442            .to("mock:outer") // must be step[1], not inside choice
2443            .build()
2444            .unwrap();
2445        assert_eq!(definition.steps().len(), 2);
2446        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2447    }
2448
2449    // ── Throttle typestate tests ──────────────────────────────────────────────────
2450
2451    #[test]
2452    fn test_throttle_builder_typestate() {
2453        let definition = RouteBuilder::from("timer:tick")
2454            .route_id("test-route")
2455            .throttle(10, std::time::Duration::from_secs(1))
2456            .to("mock:result")
2457            .end_throttle()
2458            .build()
2459            .unwrap();
2460
2461        assert_eq!(definition.steps().len(), 1);
2462        assert!(matches!(
2463            &definition.steps()[0],
2464            BuilderStep::Throttle { .. }
2465        ));
2466    }
2467
2468    #[test]
2469    fn test_throttle_builder_with_strategy() {
2470        let definition = RouteBuilder::from("timer:tick")
2471            .route_id("test-route")
2472            .throttle(10, std::time::Duration::from_secs(1))
2473            .strategy(ThrottleStrategy::Reject)
2474            .to("mock:result")
2475            .end_throttle()
2476            .build()
2477            .unwrap();
2478
2479        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2480            assert_eq!(config.strategy, ThrottleStrategy::Reject);
2481        } else {
2482            panic!("Expected Throttle step");
2483        }
2484    }
2485
2486    #[test]
2487    fn test_throttle_builder_steps_collected() {
2488        let definition = RouteBuilder::from("timer:tick")
2489            .route_id("test-route")
2490            .throttle(5, std::time::Duration::from_secs(1))
2491            .set_header("throttled", Value::Bool(true))
2492            .to("mock:throttled")
2493            .end_throttle()
2494            .build()
2495            .unwrap();
2496
2497        match &definition.steps()[0] {
2498            BuilderStep::Throttle { steps, .. } => {
2499                assert_eq!(steps.len(), 2); // SetHeader + To
2500            }
2501            other => panic!("Expected Throttle, got {:?}", other),
2502        }
2503    }
2504
2505    #[test]
2506    fn test_throttle_step_after_throttle() {
2507        // Steps after end_throttle() are added to the outer pipeline, not inside throttle.
2508        let definition = RouteBuilder::from("timer:tick")
2509            .route_id("test-route")
2510            .throttle(10, std::time::Duration::from_secs(1))
2511            .to("mock:inner")
2512            .end_throttle()
2513            .to("mock:outer")
2514            .build()
2515            .unwrap();
2516
2517        assert_eq!(definition.steps().len(), 2);
2518        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2519    }
2520
2521    // ── LoadBalance typestate tests ──────────────────────────────────────────────────
2522
2523    #[test]
2524    fn test_load_balance_builder_typestate() {
2525        let definition = RouteBuilder::from("timer:tick")
2526            .route_id("test-route")
2527            .load_balance()
2528            .round_robin()
2529            .to("mock:a")
2530            .to("mock:b")
2531            .end_load_balance()
2532            .build()
2533            .unwrap();
2534
2535        assert_eq!(definition.steps().len(), 1);
2536        assert!(matches!(
2537            &definition.steps()[0],
2538            BuilderStep::LoadBalance { .. }
2539        ));
2540    }
2541
2542    #[test]
2543    fn test_load_balance_builder_with_strategy() {
2544        let definition = RouteBuilder::from("timer:tick")
2545            .route_id("test-route")
2546            .load_balance()
2547            .random()
2548            .to("mock:result")
2549            .end_load_balance()
2550            .build()
2551            .unwrap();
2552
2553        if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2554            assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2555        } else {
2556            panic!("Expected LoadBalance step");
2557        }
2558    }
2559
2560    #[test]
2561    fn test_load_balance_builder_steps_collected() {
2562        let definition = RouteBuilder::from("timer:tick")
2563            .route_id("test-route")
2564            .load_balance()
2565            .set_header("lb", Value::Bool(true))
2566            .to("mock:a")
2567            .end_load_balance()
2568            .build()
2569            .unwrap();
2570
2571        match &definition.steps()[0] {
2572            BuilderStep::LoadBalance { steps, .. } => {
2573                assert_eq!(steps.len(), 2); // SetHeader + To
2574            }
2575            other => panic!("Expected LoadBalance, got {:?}", other),
2576        }
2577    }
2578
2579    #[test]
2580    fn test_load_balance_step_after_load_balance() {
2581        // Steps after end_load_balance() are added to the outer pipeline, not inside load_balance.
2582        let definition = RouteBuilder::from("timer:tick")
2583            .route_id("test-route")
2584            .load_balance()
2585            .to("mock:inner")
2586            .end_load_balance()
2587            .to("mock:outer")
2588            .build()
2589            .unwrap();
2590
2591        assert_eq!(definition.steps().len(), 2);
2592        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2593    }
2594
2595    // ── DynamicRouter typestate tests ──────────────────────────────────────────────────
2596
2597    #[test]
2598    fn test_dynamic_router_builder() {
2599        let definition = RouteBuilder::from("timer:tick")
2600            .route_id("test-route")
2601            .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2602            .build()
2603            .unwrap();
2604
2605        assert_eq!(definition.steps().len(), 1);
2606        assert!(matches!(
2607            &definition.steps()[0],
2608            BuilderStep::DynamicRouter { .. }
2609        ));
2610    }
2611
2612    #[test]
2613    fn test_dynamic_router_builder_with_config() {
2614        let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2615            .max_iterations(100)
2616            .cache_size(500);
2617
2618        let definition = RouteBuilder::from("timer:tick")
2619            .route_id("test-route")
2620            .dynamic_router_with_config(config)
2621            .build()
2622            .unwrap();
2623
2624        assert_eq!(definition.steps().len(), 1);
2625        if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2626            assert_eq!(config.max_iterations, 100);
2627            assert_eq!(config.cache_size, 500);
2628        } else {
2629            panic!("Expected DynamicRouter step");
2630        }
2631    }
2632
2633    #[test]
2634    fn test_dynamic_router_step_after_router() {
2635        // Steps after dynamic_router() are added to the outer pipeline.
2636        let definition = RouteBuilder::from("timer:tick")
2637            .route_id("test-route")
2638            .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2639            .to("mock:outer")
2640            .build()
2641            .unwrap();
2642
2643        assert_eq!(definition.steps().len(), 2);
2644        assert!(matches!(
2645            &definition.steps()[0],
2646            BuilderStep::DynamicRouter { .. }
2647        ));
2648        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2649    }
2650
2651    #[test]
2652    fn routing_slip_builder_creates_step() {
2653        use camel_api::RoutingSlipExpression;
2654
2655        let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2656
2657        let route = RouteBuilder::from("direct:start")
2658            .route_id("routing-slip-test")
2659            .routing_slip(expression)
2660            .build()
2661            .unwrap();
2662
2663        assert!(
2664            matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2665            "Expected RoutingSlip step"
2666        );
2667    }
2668
2669    #[test]
2670    fn routing_slip_with_config_builder_creates_step() {
2671        use camel_api::RoutingSlipConfig;
2672
2673        let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2674            .uri_delimiter("|")
2675            .cache_size(50)
2676            .ignore_invalid_endpoints(true);
2677
2678        let route = RouteBuilder::from("direct:start")
2679            .route_id("routing-slip-config-test")
2680            .routing_slip_with_config(config)
2681            .build()
2682            .unwrap();
2683
2684        if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2685            assert_eq!(config.uri_delimiter, "|");
2686            assert_eq!(config.cache_size, 50);
2687            assert!(config.ignore_invalid_endpoints);
2688        } else {
2689            panic!("Expected RoutingSlip step");
2690        }
2691    }
2692
2693    #[test]
2694    fn test_builder_marshal_adds_processor_step() {
2695        let definition = RouteBuilder::from("timer:tick")
2696            .route_id("test-route")
2697            .marshal("json")
2698            .build()
2699            .unwrap();
2700        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2701    }
2702
2703    #[test]
2704    fn test_builder_unmarshal_adds_processor_step() {
2705        let definition = RouteBuilder::from("timer:tick")
2706            .route_id("test-route")
2707            .unmarshal("json")
2708            .build()
2709            .unwrap();
2710        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2711    }
2712
2713    #[test]
2714    fn test_builder_stream_cache_adds_processor_step() {
2715        let definition = RouteBuilder::from("timer:tick")
2716            .route_id("test-route")
2717            .stream_cache(1024)
2718            .build()
2719            .unwrap();
2720        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2721    }
2722
2723    #[test]
2724    fn validate_adds_to_step_with_validator_uri() {
2725        let def = RouteBuilder::from("direct:in")
2726            .route_id("test")
2727            .validate("schemas/order.xsd")
2728            .build()
2729            .unwrap();
2730        let steps = def.steps();
2731        assert_eq!(steps.len(), 1);
2732        assert!(
2733            matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2734            "got: {:?}",
2735            steps[0]
2736        );
2737    }
2738
2739    #[test]
2740    #[should_panic(expected = "unknown data format: 'csv'")]
2741    fn test_builder_marshal_panics_on_unknown_format() {
2742        let _ = RouteBuilder::from("timer:tick")
2743            .route_id("test-route")
2744            .marshal("csv")
2745            .build();
2746    }
2747
2748    #[test]
2749    #[should_panic(expected = "unknown data format: 'csv'")]
2750    fn test_builder_unmarshal_panics_on_unknown_format() {
2751        let _ = RouteBuilder::from("timer:tick")
2752            .route_id("test-route")
2753            .unmarshal("csv")
2754            .build();
2755    }
2756
2757    #[test]
2758    fn test_builder_recipient_list_creates_step() {
2759        let route = RouteBuilder::from("direct:start")
2760            .route_id("recipient-list-test")
2761            .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2762            .build()
2763            .unwrap();
2764
2765        assert!(matches!(
2766            &route.steps()[0],
2767            BuilderStep::RecipientList { .. }
2768        ));
2769    }
2770
2771    #[test]
2772    fn test_builder_recipient_list_with_config_creates_step() {
2773        let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2774
2775        let route = RouteBuilder::from("direct:start")
2776            .route_id("recipient-list-config-test")
2777            .recipient_list_with_config(config)
2778            .build()
2779            .unwrap();
2780
2781        assert!(matches!(
2782            &route.steps()[0],
2783            BuilderStep::RecipientList { .. }
2784        ));
2785    }
2786
2787    #[test]
2788    fn test_builder_script_adds_script_step() {
2789        let route = RouteBuilder::from("direct:start")
2790            .route_id("script-test")
2791            .script("rhai", "headers[\"x\"] = \"y\"")
2792            .build()
2793            .unwrap();
2794
2795        assert!(matches!(
2796            &route.steps()[0],
2797            BuilderStep::Script { language, script }
2798            if language == "rhai" && script == "headers[\"x\"] = \"y\""
2799        ));
2800    }
2801
2802    #[test]
2803    fn test_builder_delay_and_delay_with_header_add_steps() {
2804        let route = RouteBuilder::from("direct:start")
2805            .route_id("delay-test")
2806            .delay(Duration::from_millis(250))
2807            .delay_with_header(Duration::from_millis(500), "x-delay")
2808            .build()
2809            .unwrap();
2810
2811        assert_eq!(route.steps().len(), 2);
2812        assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
2813        assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
2814    }
2815
2816    #[test]
2817    fn test_builder_log_and_stop_add_steps_in_order() {
2818        let route = RouteBuilder::from("direct:start")
2819            .route_id("log-stop-test")
2820            .log("hello", LogLevel::Info)
2821            .stop()
2822            .to("mock:after")
2823            .build()
2824            .unwrap();
2825
2826        assert_eq!(route.steps().len(), 3);
2827        assert!(matches!(
2828            &route.steps()[0],
2829            BuilderStep::Log { message, .. } if message == "hello"
2830        ));
2831        assert!(matches!(&route.steps()[1], BuilderStep::Stop));
2832        assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
2833    }
2834
2835    #[test]
2836    fn test_builder_stream_cache_default_adds_processor_step() {
2837        let route = RouteBuilder::from("direct:start")
2838            .route_id("stream-cache-default-test")
2839            .stream_cache_default()
2840            .build()
2841            .unwrap();
2842
2843        assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
2844    }
2845
2846    #[test]
2847    fn test_validate_preserves_existing_validator_prefix() {
2848        let route = RouteBuilder::from("direct:in")
2849            .route_id("validate-prefix-test")
2850            .validate("validator:schemas/order.xsd")
2851            .build()
2852            .unwrap();
2853
2854        assert!(matches!(
2855            &route.steps()[0],
2856            BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
2857        ));
2858    }
2859
2860    #[test]
2861    fn test_load_balance_builder_weighted_failover_parallel_config() {
2862        let route = RouteBuilder::from("direct:start")
2863            .route_id("lb-weighted-failover-parallel")
2864            .load_balance()
2865            .weighted(vec![
2866                ("direct:a".to_string(), 3),
2867                ("direct:b".to_string(), 1),
2868            ])
2869            .failover()
2870            .parallel(true)
2871            .to("mock:result")
2872            .end_load_balance()
2873            .build()
2874            .unwrap();
2875
2876        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
2877            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
2878            assert!(config.parallel);
2879        } else {
2880            panic!("Expected LoadBalance step");
2881        }
2882    }
2883
2884    #[test]
2885    fn test_multicast_builder_all_config_setters() {
2886        let route = RouteBuilder::from("direct:start")
2887            .route_id("multicast-config-test")
2888            .multicast()
2889            .parallel(true)
2890            .parallel_limit(4)
2891            .stop_on_exception(true)
2892            .timeout(Duration::from_millis(300))
2893            .aggregation(MulticastStrategy::Original)
2894            .to("mock:a")
2895            .end_multicast()
2896            .build()
2897            .unwrap();
2898
2899        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
2900            assert!(config.parallel);
2901            assert_eq!(config.parallel_limit, Some(4));
2902            assert!(config.stop_on_exception);
2903            assert_eq!(config.timeout, Some(Duration::from_millis(300)));
2904            assert!(matches!(config.aggregation, MulticastStrategy::Original));
2905        } else {
2906            panic!("Expected Multicast step");
2907        }
2908    }
2909
2910    #[test]
2911    fn test_build_canonical_rejects_unsupported_processor_step() {
2912        let err = RouteBuilder::from("direct:start")
2913            .route_id("canonical-reject")
2914            .set_header("k", Value::String("v".into()))
2915            .build_canonical()
2916            .unwrap_err();
2917
2918        assert!(format!("{err}").contains("does not support step `processor`"));
2919    }
2920}