Skip to main content

camel_builder/
lib.rs

1use camel_api::DelayConfig;
2use camel_api::aggregator::{
3    AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
4};
5use camel_api::body::Body;
6use camel_api::body_converter::BodyType;
7use camel_api::circuit_breaker::CircuitBreakerConfig;
8use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
9use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
10use camel_api::load_balancer::LoadBalancerConfig;
11use camel_api::loop_eip::{LoopConfig, LoopMode};
12use camel_api::multicast::{MulticastConfig, MulticastStrategy};
13use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
14use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
15use camel_api::splitter::SplitterConfig;
16use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
17use camel_api::{
18    BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
19    ProcessorFn, Value,
20    runtime::{
21        CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
22        CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
23        CanonicalWhenSpec,
24    },
25};
26use camel_component_api::ConcurrencyModel;
27use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
28use camel_processor::{
29    ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
30    StreamCacheService, UnmarshalService, builtin_data_format,
31};
32
33/// Shared step-accumulation methods for all builder types.
34///
35/// Implementors provide `steps_mut()` and get step-adding methods for free.
36/// `filter()` and other branching methods are NOT included — they return
37/// different types per builder and stay as per-builder methods.
38pub trait StepAccumulator: Sized {
39    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
40
41    fn to(mut self, endpoint: impl Into<String>) -> Self {
42        self.steps_mut().push(BuilderStep::To(endpoint.into()));
43        self
44    }
45
46    fn process<F, Fut>(mut self, f: F) -> Self
47    where
48        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
49        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
50    {
51        let svc = ProcessorFn::new(f);
52        self.steps_mut()
53            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
54        self
55    }
56
57    fn process_fn(mut self, processor: BoxProcessor) -> Self {
58        self.steps_mut().push(BuilderStep::Processor(processor));
59        self
60    }
61
62    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
63        let svc = SetHeader::new(IdentityProcessor, key, value);
64        self.steps_mut()
65            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
66        self
67    }
68
69    fn map_body<F>(mut self, mapper: F) -> Self
70    where
71        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
72    {
73        let svc = MapBody::new(IdentityProcessor, mapper);
74        self.steps_mut()
75            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
76        self
77    }
78
79    fn set_body<B>(mut self, body: B) -> Self
80    where
81        B: Into<Body> + Clone + Send + Sync + 'static,
82    {
83        let body: Body = body.into();
84        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
85        self.steps_mut()
86            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
87        self
88    }
89
90    /// Apache Camel-compatible alias for [`set_body`](Self::set_body).
91    ///
92    /// Transforms the message body using the given value. Semantically identical
93    /// to `set_body` — provided for familiarity with Apache Camel route DSLs.
94    fn transform<B>(self, body: B) -> Self
95    where
96        B: Into<Body> + Clone + Send + Sync + 'static,
97    {
98        self.set_body(body)
99    }
100
101    fn set_body_fn<F>(mut self, expr: F) -> Self
102    where
103        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
104    {
105        let svc = SetBody::new(IdentityProcessor, expr);
106        self.steps_mut()
107            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
108        self
109    }
110
111    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
112    where
113        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
114    {
115        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
116        self.steps_mut()
117            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
118        self
119    }
120
121    fn aggregate(mut self, config: AggregatorConfig) -> Self {
122        self.steps_mut().push(BuilderStep::Aggregate { config });
123        self
124    }
125
126    /// Stop processing this exchange immediately. No further steps in the
127    /// current pipeline will run.
128    ///
129    /// Can be used at any point in the route: directly on RouteBuilder,
130    /// inside `.filter()`, inside `.split()`, etc.
131    fn stop(mut self) -> Self {
132        self.steps_mut().push(BuilderStep::Stop);
133        self
134    }
135
136    fn delay(mut self, duration: std::time::Duration) -> Self {
137        self.steps_mut().push(BuilderStep::Delay {
138            config: DelayConfig::from_duration(duration),
139        });
140        self
141    }
142
143    fn delay_with_header(
144        mut self,
145        duration: std::time::Duration,
146        header: impl Into<String>,
147    ) -> Self {
148        self.steps_mut().push(BuilderStep::Delay {
149            config: DelayConfig::from_duration_with_header(duration, header),
150        });
151        self
152    }
153
154    /// Log a message at the specified level.
155    ///
156    /// The message will be logged when an exchange passes through this step.
157    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
158        self.steps_mut().push(BuilderStep::Log {
159            level,
160            message: message.into(),
161        });
162        self
163    }
164
165    /// Convert the message body to the target type.
166    ///
167    /// Supported: Text ↔ Json ↔ Bytes. `Body::Stream` always fails.
168    /// Returns `TypeConversionFailed` if conversion is not possible.
169    ///
170    /// # Example
171    /// ```ignore
172    /// route.set_body(Value::String(r#"{"x":1}"#.into()))
173    ///      .convert_body_to(BodyType::Json)
174    ///      .to("direct:next")
175    /// ```
176    fn convert_body_to(mut self, target: BodyType) -> Self {
177        let svc = ConvertBodyTo::new(IdentityProcessor, target);
178        self.steps_mut()
179            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
180        self
181    }
182
183    fn stream_cache(mut self, threshold: usize) -> Self {
184        let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
185        let svc = StreamCacheService::new(IdentityProcessor, config);
186        self.steps_mut()
187            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
188        self
189    }
190
191    /// Materialize `Body::Stream` into `Body::Bytes` using the default threshold (128 KB).
192    ///
193    /// Equivalent to `.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)`.
194    fn stream_cache_default(self) -> Self {
195        self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
196    }
197
198    /// Marshal the message body using the specified data format.
199    ///
200    /// Supported formats: `"json"`, `"xml"`. Panics if the format name is unknown.
201    /// Converts a structured body (e.g., `Body::Json`) to a wire-format body (e.g., `Body::Text`).
202    ///
203    /// # Example
204    /// ```ignore
205    /// route.marshal("json").to("direct:next")
206    /// ```
207    fn marshal(mut self, format: impl Into<String>) -> Self {
208        let name = format.into();
209        let df =
210            builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
211        let svc = MarshalService::new(IdentityProcessor, df);
212        self.steps_mut()
213            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
214        self
215    }
216
217    /// Unmarshal the message body using the specified data format.
218    ///
219    /// Supported formats: `"json"`, `"xml"`. Panics if the format name is unknown.
220    /// Converts a wire-format body (e.g., `Body::Text`) to a structured body (e.g., `Body::Json`).
221    ///
222    /// # Example
223    /// ```ignore
224    /// route.unmarshal("json").to("direct:next")
225    /// ```
226    fn unmarshal(mut self, format: impl Into<String>) -> Self {
227        let name = format.into();
228        let df =
229            builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
230        let svc = UnmarshalService::new(IdentityProcessor, df);
231        self.steps_mut()
232            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
233        self
234    }
235
236    /// Validate the exchange body against a schema file.
237    ///
238    /// Shorthand for `to("validator:path")`. Supports XSD, JSON Schema, and YAML.
239    ///
240    /// # Example
241    /// ```ignore
242    /// route.validate("schemas/order.xsd").to("direct:out")
243    /// ```
244    fn validate(mut self, schema_path: impl Into<String>) -> Self {
245        let path = schema_path.into();
246        let uri = if path.starts_with("validator:") {
247            path
248        } else {
249            format!("validator:{path}")
250        };
251        self.steps_mut().push(BuilderStep::To(uri));
252        self
253    }
254
255    /// Execute a script that can modify the exchange (headers, properties, body).
256    ///
257    /// The script has access to `headers`, `properties`, and `body` variables
258    /// and can modify them with assignment syntax: `headers["k"] = v`.
259    ///
260    /// # Example
261    /// ```ignore
262    /// // ignore: requires full CamelContext setup with registered language
263    /// route.script("rhai", r#"headers["tenant"] = "acme"; body = body + "_processed""#)
264    /// ```
265    fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
266        self.steps_mut().push(BuilderStep::Script {
267            language: language.into(),
268            script: script.into(),
269        });
270        self
271    }
272}
273
274/// A fluent builder for constructing routes.
275///
276/// # Example
277///
278/// ```ignore
279/// let definition = RouteBuilder::from("timer:tick?period=1000")
280///     .set_header("source", Value::String("timer".into()))
281///     .filter(|ex| ex.input.body.as_text().is_some())
282///     .to("log:info?showHeaders=true")
283///     .build()?;
284/// ```
285pub struct RouteBuilder {
286    from_uri: String,
287    steps: Vec<BuilderStep>,
288    error_handler: Option<ErrorHandlerConfig>,
289    error_handler_mode: ErrorHandlerMode,
290    circuit_breaker_config: Option<CircuitBreakerConfig>,
291    concurrency: Option<ConcurrencyModel>,
292    route_id: Option<String>,
293    auto_startup: Option<bool>,
294    startup_order: Option<i32>,
295}
296
297#[derive(Default)]
298enum ErrorHandlerMode {
299    #[default]
300    None,
301    ExplicitConfig,
302    Shorthand {
303        dlc_uri: Option<String>,
304        specs: Vec<OnExceptionSpec>,
305    },
306    Mixed,
307}
308
309#[derive(Clone)]
310struct OnExceptionSpec {
311    matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
312    retry: Option<RedeliveryPolicy>,
313    handled_by: Option<String>,
314}
315
316impl RouteBuilder {
317    /// Start building a route from the given source endpoint URI.
318    pub fn from(endpoint: &str) -> Self {
319        Self {
320            from_uri: endpoint.to_string(),
321            steps: Vec::new(),
322            error_handler: None,
323            error_handler_mode: ErrorHandlerMode::None,
324            circuit_breaker_config: None,
325            concurrency: None,
326            route_id: None,
327            auto_startup: None,
328            startup_order: None,
329        }
330    }
331
332    /// Open a filter scope. Only exchanges matching `predicate` will be processed
333    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
334    /// and continue to steps after `.end_filter()`.
335    pub fn filter<F>(self, predicate: F) -> FilterBuilder
336    where
337        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
338    {
339        FilterBuilder {
340            parent: self,
341            predicate: std::sync::Arc::new(predicate),
342            steps: vec![],
343        }
344    }
345
346    /// Open a choice scope for content-based routing.
347    ///
348    /// Within the choice, you can define multiple `.when()` clauses and an
349    /// optional `.otherwise()` clause. The first matching `when` predicate
350    /// determines which sub-pipeline executes.
351    pub fn choice(self) -> ChoiceBuilder {
352        ChoiceBuilder {
353            parent: self,
354            whens: vec![],
355            _otherwise: None,
356        }
357    }
358
359    /// Add a WireTap step that sends a clone of the exchange to the given
360    /// endpoint URI (fire-and-forget). The original exchange continues
361    /// downstream unchanged.
362    pub fn wire_tap(mut self, endpoint: &str) -> Self {
363        self.steps.push(BuilderStep::WireTap {
364            uri: endpoint.to_string(),
365        });
366        self
367    }
368
369    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
370    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
371        self.error_handler_mode = match self.error_handler_mode {
372            ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
373                ErrorHandlerMode::ExplicitConfig
374            }
375            ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
376        };
377        self.error_handler = Some(config);
378        self
379    }
380
381    /// Set a dead letter channel URI for shorthand error handler mode.
382    pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
383        let uri = uri.into();
384        self.error_handler_mode = match self.error_handler_mode {
385            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
386                dlc_uri: Some(uri),
387                specs: Vec::new(),
388            },
389            ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
390                dlc_uri: Some(uri),
391                specs,
392            },
393            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
394        };
395        self
396    }
397
398    /// Add a shorthand exception policy scope. Call `.end_on_exception()` to return to route builder.
399    pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
400    where
401        F: Fn(&CamelError) -> bool + Send + Sync + 'static,
402    {
403        self.error_handler_mode = match self.error_handler_mode {
404            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
405                dlc_uri: None,
406                specs: Vec::new(),
407            },
408            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
409            shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
410        };
411
412        OnExceptionBuilder {
413            parent: self,
414            policy: OnExceptionSpec {
415                matches: std::sync::Arc::new(matches),
416                retry: None,
417                handled_by: None,
418            },
419        }
420    }
421
422    /// Set a circuit breaker for this route.
423    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
424        self.circuit_breaker_config = Some(config);
425        self
426    }
427
428    /// Override the consumer's default concurrency model.
429    ///
430    /// When set, the pipeline spawns a task per exchange, processing them
431    /// concurrently. `max` limits the number of simultaneously active
432    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
433    ///
434    /// # Example
435    /// ```ignore
436    /// RouteBuilder::from("http://0.0.0.0:8080/api")
437    ///     .concurrent(16)  // max 16 in-flight pipeline executions
438    ///     .process(handle_request)
439    ///     .build()
440    /// ```
441    pub fn concurrent(mut self, max: usize) -> Self {
442        let max = if max == 0 { None } else { Some(max) };
443        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
444        self
445    }
446
447    /// Force sequential processing, overriding a concurrent-capable consumer.
448    ///
449    /// Useful for HTTP routes that mutate shared state and need ordering
450    /// guarantees.
451    pub fn sequential(mut self) -> Self {
452        self.concurrency = Some(ConcurrencyModel::Sequential);
453        self
454    }
455
456    /// Set the route ID for this route.
457    ///
458    /// If not set, the route will be assigned an auto-generated ID.
459    pub fn route_id(mut self, id: impl Into<String>) -> Self {
460        self.route_id = Some(id.into());
461        self
462    }
463
464    /// Set whether this route should automatically start when the context starts.
465    ///
466    /// Default is `true`.
467    pub fn auto_startup(mut self, auto: bool) -> Self {
468        self.auto_startup = Some(auto);
469        self
470    }
471
472    /// Set the startup order for this route.
473    ///
474    /// Routes with lower values start first. Default is 1000.
475    pub fn startup_order(mut self, order: i32) -> Self {
476        self.startup_order = Some(order);
477        self
478    }
479
480    /// Begin a Splitter sub-pipeline. Steps added after this call (until
481    /// `.end_split()`) will be executed per-fragment.
482    ///
483    /// Returns a `SplitBuilder` — you cannot call `.build()` until
484    /// `.end_split()` closes the split scope (enforced by the type system).
485    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
486        SplitBuilder {
487            parent: self,
488            config,
489            steps: Vec::new(),
490        }
491    }
492
493    /// Begin a Multicast sub-pipeline. Steps added after this call (until
494    /// `.end_multicast()`) will each receive a copy of the exchange.
495    ///
496    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
497    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
498    pub fn multicast(self) -> MulticastBuilder {
499        MulticastBuilder {
500            parent: self,
501            steps: Vec::new(),
502            config: MulticastConfig::new(),
503        }
504    }
505
506    /// Begin a Throttle sub-pipeline. Rate limits message processing to at most
507    /// `max_requests` per `period`. Steps inside the throttle scope are only
508    /// executed when the rate limit allows.
509    ///
510    /// Returns a `ThrottleBuilder` — you cannot call `.build()` until
511    /// `.end_throttle()` closes the throttle scope (enforced by the type system).
512    pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
513        ThrottleBuilder {
514            parent: self,
515            config: ThrottlerConfig::new(max_requests, period),
516            steps: Vec::new(),
517        }
518    }
519
520    /// Begin a Loop sub-pipeline that iterates a fixed number of times.
521    pub fn loop_count(self, count: usize) -> LoopBuilder {
522        LoopBuilder {
523            parent: self,
524            config: LoopConfig {
525                mode: LoopMode::Count(count),
526            },
527            steps: vec![],
528        }
529    }
530
531    /// Begin a Loop sub-pipeline that iterates while a predicate is true.
532    pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
533    where
534        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
535    {
536        LoopBuilder {
537            parent: self,
538            config: LoopConfig {
539                mode: LoopMode::While(std::sync::Arc::new(predicate)),
540            },
541            steps: vec![],
542        }
543    }
544
545    /// Begin a LoadBalance sub-pipeline. Distributes exchanges across multiple
546    /// endpoints using a configurable strategy (round-robin, random, weighted, failover).
547    ///
548    /// Returns a `LoadBalancerBuilder` — you cannot call `.build()` until
549    /// `.end_load_balance()` closes the load balance scope (enforced by the type system).
550    pub fn load_balance(self) -> LoadBalancerBuilder {
551        LoadBalancerBuilder {
552            parent: self,
553            config: LoadBalancerConfig::round_robin(),
554            steps: Vec::new(),
555        }
556    }
557
558    /// Add a dynamic router step that routes exchanges dynamically based on
559    /// expression evaluation at runtime.
560    ///
561    /// The expression receives the exchange and returns `Some(uri)` to route to
562    /// the next endpoint, or `None` to stop routing.
563    ///
564    /// # Example
565    /// ```ignore
566    /// RouteBuilder::from("timer:tick")
567    ///     .route_id("test-route")
568    ///     .dynamic_router(|ex| {
569    ///         ex.input.header("dest").and_then(|v| v.as_str().map(|s| s.to_string()))
570    ///     })
571    ///     .build()
572    /// ```
573    pub fn dynamic_router(self, expression: RouterExpression) -> Self {
574        self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
575    }
576
577    /// Add a dynamic router step with full configuration.
578    ///
579    /// Allows customization of URI delimiter, cache size, timeout, and other options.
580    pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
581        self.steps.push(BuilderStep::DynamicRouter { config });
582        self
583    }
584
585    pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
586        self.routing_slip_with_config(RoutingSlipConfig::new(expression))
587    }
588
589    pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
590        self.steps.push(BuilderStep::RoutingSlip { config });
591        self
592    }
593
594    pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
595        self.recipient_list_with_config(RecipientListConfig::new(expression))
596    }
597
598    pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
599        self.steps.push(BuilderStep::RecipientList { config });
600        self
601    }
602
603    /// Consume the builder and produce a [`RouteDefinition`].
604    pub fn build(self) -> Result<RouteDefinition, CamelError> {
605        if self.from_uri.is_empty() {
606            return Err(CamelError::RouteError(
607                "route must have a 'from' URI".to_string(),
608            ));
609        }
610        let route_id = self.route_id.ok_or_else(|| {
611            CamelError::RouteError(
612                "route must have a 'route_id' — call .route_id(\"name\") on the builder"
613                    .to_string(),
614            )
615        })?;
616        let resolved_error_handler = match self.error_handler_mode {
617            ErrorHandlerMode::None => self.error_handler,
618            ErrorHandlerMode::ExplicitConfig => self.error_handler,
619            ErrorHandlerMode::Mixed => {
620                return Err(CamelError::RouteError(
621                    "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
622                ));
623            }
624            ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
625                let mut config = if let Some(uri) = dlc_uri {
626                    ErrorHandlerConfig::dead_letter_channel(uri)
627                } else {
628                    ErrorHandlerConfig::log_only()
629                };
630
631                for spec in specs {
632                    let matcher = spec.matches.clone();
633                    let mut builder = config.on_exception(move |e| matcher(e));
634
635                    if let Some(retry) = spec.retry {
636                        builder = builder.retry(retry.max_attempts).with_backoff(
637                            retry.initial_delay,
638                            retry.multiplier,
639                            retry.max_delay,
640                        );
641                        if retry.jitter_factor > 0.0 {
642                            builder = builder.with_jitter(retry.jitter_factor);
643                        }
644                    }
645
646                    if let Some(uri) = spec.handled_by {
647                        builder = builder.handled_by(uri);
648                    }
649
650                    config = builder.build();
651                }
652
653                Some(config)
654            }
655        };
656
657        let definition = RouteDefinition::new(self.from_uri, self.steps);
658        let definition = if let Some(eh) = resolved_error_handler {
659            definition.with_error_handler(eh)
660        } else {
661            definition
662        };
663        let definition = if let Some(cb) = self.circuit_breaker_config {
664            definition.with_circuit_breaker(cb)
665        } else {
666            definition
667        };
668        let definition = if let Some(concurrency) = self.concurrency {
669            definition.with_concurrency(concurrency)
670        } else {
671            definition
672        };
673        let definition = definition.with_route_id(route_id);
674        let definition = if let Some(auto) = self.auto_startup {
675            definition.with_auto_startup(auto)
676        } else {
677            definition
678        };
679        let definition = if let Some(order) = self.startup_order {
680            definition.with_startup_order(order)
681        } else {
682            definition
683        };
684        Ok(definition)
685    }
686
687    /// Compile this builder route into canonical v1 spec.
688    pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
689        if self.from_uri.is_empty() {
690            return Err(CamelError::RouteError(
691                "route must have a 'from' URI".to_string(),
692            ));
693        }
694        let route_id = self.route_id.ok_or_else(|| {
695            CamelError::RouteError(
696                "route must have a 'route_id' — call .route_id(\"name\") on the builder"
697                    .to_string(),
698            )
699        })?;
700
701        let steps = canonicalize_steps(self.steps)?;
702        let circuit_breaker = self
703            .circuit_breaker_config
704            .map(canonicalize_circuit_breaker);
705
706        let spec = CanonicalRouteSpec {
707            route_id,
708            from: self.from_uri,
709            steps,
710            circuit_breaker,
711            version: camel_api::CANONICAL_CONTRACT_VERSION,
712        };
713        spec.validate_contract()?;
714        Ok(spec)
715    }
716}
717
718pub struct OnExceptionBuilder {
719    parent: RouteBuilder,
720    policy: OnExceptionSpec,
721}
722
723impl OnExceptionBuilder {
724    pub fn retry(mut self, max_attempts: u32) -> Self {
725        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
726        self
727    }
728
729    pub fn with_backoff(
730        mut self,
731        initial: std::time::Duration,
732        multiplier: f64,
733        max: std::time::Duration,
734    ) -> Self {
735        if let Some(ref mut retry) = self.policy.retry {
736            retry.initial_delay = initial;
737            retry.multiplier = multiplier;
738            retry.max_delay = max;
739        }
740        self
741    }
742
743    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
744        if let Some(ref mut retry) = self.policy.retry {
745            retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
746        }
747        self
748    }
749
750    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
751        self.policy.handled_by = Some(uri.into());
752        self
753    }
754
755    pub fn end_on_exception(mut self) -> RouteBuilder {
756        if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
757            specs.push(self.policy);
758        }
759        self.parent
760    }
761}
762
763fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
764    let mut canonical = Vec::with_capacity(steps.len());
765    for step in steps {
766        canonical.push(canonicalize_step(step)?);
767    }
768    Ok(canonical)
769}
770
771fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
772    match step {
773        BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
774        BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
775        BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
776        BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
777        BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
778            delay_ms: config.delay_ms,
779            dynamic_header: config.dynamic_header,
780        }),
781        BuilderStep::DeclarativeScript { expression } => {
782            Ok(CanonicalStepSpec::Script { expression })
783        }
784        BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
785            predicate,
786            steps: canonicalize_steps(steps)?,
787        }),
788        BuilderStep::DeclarativeChoice { whens, otherwise } => {
789            let mut canonical_whens = Vec::with_capacity(whens.len());
790            for DeclarativeWhenStep { predicate, steps } in whens {
791                canonical_whens.push(CanonicalWhenSpec {
792                    predicate,
793                    steps: canonicalize_steps(steps)?,
794                });
795            }
796            let otherwise = match otherwise {
797                Some(steps) => Some(canonicalize_steps(steps)?),
798                None => None,
799            };
800            Ok(CanonicalStepSpec::Choice {
801                whens: canonical_whens,
802                otherwise,
803            })
804        }
805        BuilderStep::DeclarativeSplit {
806            expression,
807            aggregation,
808            parallel,
809            parallel_limit,
810            stop_on_exception,
811            steps,
812        } => Ok(CanonicalStepSpec::Split {
813            expression: CanonicalSplitExpressionSpec::Language(expression),
814            aggregation: canonicalize_split_aggregation(aggregation)?,
815            parallel,
816            parallel_limit,
817            stop_on_exception,
818            steps: canonicalize_steps(steps)?,
819        }),
820        BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate {
821            config: canonicalize_aggregate(config)?,
822        }),
823        other => {
824            let step_name = canonical_step_name(&other);
825            let detail = camel_api::canonical_contract_rejection_reason(step_name)
826                .unwrap_or("not included in canonical v1");
827            Err(CamelError::RouteError(format!(
828                "canonical v1 does not support step `{step_name}`: {detail}"
829            )))
830        }
831    }
832}
833
834fn canonicalize_split_aggregation(
835    strategy: camel_api::splitter::AggregationStrategy,
836) -> Result<CanonicalSplitAggregationSpec, CamelError> {
837    match strategy {
838        camel_api::splitter::AggregationStrategy::LastWins => {
839            Ok(CanonicalSplitAggregationSpec::LastWins)
840        }
841        camel_api::splitter::AggregationStrategy::CollectAll => {
842            Ok(CanonicalSplitAggregationSpec::CollectAll)
843        }
844        camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
845            "canonical v1 does not support custom split aggregation".to_string(),
846        )),
847        camel_api::splitter::AggregationStrategy::Original => {
848            Ok(CanonicalSplitAggregationSpec::Original)
849        }
850    }
851}
852
853fn extract_completion_fields(
854    mode: &CompletionMode,
855) -> Result<(Option<usize>, Option<u64>), CamelError> {
856    match mode {
857        CompletionMode::Single(cond) => match cond {
858            CompletionCondition::Size(n) => Ok((Some(*n), None)),
859            CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
860            CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
861                "canonical v1 does not support aggregate predicate completion".to_string(),
862            )),
863        },
864        CompletionMode::Any(conds) => {
865            let mut size = None;
866            let mut timeout_ms = None;
867            for cond in conds {
868                match cond {
869                    CompletionCondition::Size(n) => size = Some(*n),
870                    CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
871                    CompletionCondition::Predicate(_) => {
872                        return Err(CamelError::RouteError(
873                            "canonical v1 does not support aggregate predicate completion"
874                                .to_string(),
875                        ));
876                    }
877                }
878            }
879            Ok((size, timeout_ms))
880        }
881    }
882}
883
884fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
885    let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
886
887    let header = match &config.correlation {
888        CorrelationStrategy::HeaderName(h) => h.clone(),
889        CorrelationStrategy::Expression { expr, .. } => expr.clone(),
890        CorrelationStrategy::Fn(_) => {
891            return Err(CamelError::RouteError(
892                "canonical v1 does not support Fn correlation strategy".to_string(),
893            ));
894        }
895    };
896
897    let correlation_key = match &config.correlation {
898        CorrelationStrategy::HeaderName(_) => None,
899        CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
900        CorrelationStrategy::Fn(_) => unreachable!(),
901    };
902
903    let strategy = match config.strategy {
904        AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
905        AggregationStrategy::Custom(_) => {
906            return Err(CamelError::RouteError(
907                "canonical v1 does not support custom aggregate strategy".to_string(),
908            ));
909        }
910    };
911    let bucket_ttl_ms = config
912        .bucket_ttl
913        .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
914
915    Ok(CanonicalAggregateSpec {
916        header,
917        completion_size,
918        completion_timeout_ms,
919        correlation_key,
920        force_completion_on_stop: if config.force_completion_on_stop {
921            Some(true)
922        } else {
923            None
924        },
925        discard_on_timeout: if config.discard_on_timeout {
926            Some(true)
927        } else {
928            None
929        },
930        strategy,
931        max_buckets: config.max_buckets,
932        bucket_ttl_ms,
933    })
934}
935
936fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
937    CanonicalCircuitBreakerSpec {
938        failure_threshold: config.failure_threshold,
939        open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
940    }
941}
942
943fn canonical_step_name(step: &BuilderStep) -> &'static str {
944    match step {
945        BuilderStep::Processor(_) => "processor",
946        BuilderStep::To(_) => "to",
947        BuilderStep::Stop => "stop",
948        BuilderStep::Log { .. } => "log",
949        BuilderStep::DeclarativeSetHeader { .. } => "set_header",
950        BuilderStep::DeclarativeSetBody { .. } => "set_body",
951        BuilderStep::DeclarativeFilter { .. } => "filter",
952        BuilderStep::DeclarativeChoice { .. } => "choice",
953        BuilderStep::DeclarativeScript { .. } => "script",
954        BuilderStep::DeclarativeSplit { .. } => "split",
955        BuilderStep::Split { .. } => "split",
956        BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
957        BuilderStep::Aggregate { .. } => "aggregate",
958        BuilderStep::Filter { .. } => "filter",
959        BuilderStep::Choice { .. } => "choice",
960        BuilderStep::WireTap { .. } => "wire_tap",
961        BuilderStep::Delay { .. } => "delay",
962        BuilderStep::Multicast { .. } => "multicast",
963        BuilderStep::DeclarativeLog { .. } => "log",
964        BuilderStep::Bean { .. } => "bean",
965        BuilderStep::Script { .. } => "script",
966        BuilderStep::Throttle { .. } => "throttle",
967        BuilderStep::LoadBalance { .. } => "load_balancer",
968        BuilderStep::DynamicRouter { .. } => "dynamic_router",
969        BuilderStep::RoutingSlip { .. } => "routing_slip",
970        BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
971        BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
972        BuilderStep::RecipientList { .. } => "recipient_list",
973        BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
974    }
975}
976
977impl StepAccumulator for RouteBuilder {
978    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
979        &mut self.steps
980    }
981}
982
983/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
984///
985/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
986/// but NOT `.build()` and NOT `.split()` (no nested splits).
987///
988/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
989/// and returns the parent `RouteBuilder`.
990pub struct SplitBuilder {
991    parent: RouteBuilder,
992    config: SplitterConfig,
993    steps: Vec<BuilderStep>,
994}
995
996impl SplitBuilder {
997    /// Open a filter scope within the split sub-pipeline.
998    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
999    where
1000        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1001    {
1002        FilterInSplitBuilder {
1003            parent: self,
1004            predicate: std::sync::Arc::new(predicate),
1005            steps: vec![],
1006        }
1007    }
1008
1009    /// Close the split scope. Packages the accumulated sub-steps into a
1010    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
1011    pub fn end_split(mut self) -> RouteBuilder {
1012        let split_step = BuilderStep::Split {
1013            config: self.config,
1014            steps: self.steps,
1015        };
1016        self.parent.steps.push(split_step);
1017        self.parent
1018    }
1019}
1020
1021impl StepAccumulator for SplitBuilder {
1022    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1023        &mut self.steps
1024    }
1025}
1026
1027/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
1028pub struct FilterBuilder {
1029    parent: RouteBuilder,
1030    predicate: FilterPredicate,
1031    steps: Vec<BuilderStep>,
1032}
1033
1034impl FilterBuilder {
1035    /// Close the filter scope. Packages the accumulated sub-steps into a
1036    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
1037    pub fn end_filter(mut self) -> RouteBuilder {
1038        let step = BuilderStep::Filter {
1039            predicate: self.predicate,
1040            steps: self.steps,
1041        };
1042        self.parent.steps.push(step);
1043        self.parent
1044    }
1045}
1046
1047impl StepAccumulator for FilterBuilder {
1048    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1049        &mut self.steps
1050    }
1051}
1052
1053/// Builder for a filter scope nested inside a `.split()` block.
1054pub struct FilterInSplitBuilder {
1055    parent: SplitBuilder,
1056    predicate: FilterPredicate,
1057    steps: Vec<BuilderStep>,
1058}
1059
1060impl FilterInSplitBuilder {
1061    /// Close the filter scope and return the parent `SplitBuilder`.
1062    pub fn end_filter(mut self) -> SplitBuilder {
1063        let step = BuilderStep::Filter {
1064            predicate: self.predicate,
1065            steps: self.steps,
1066        };
1067        self.parent.steps.push(step);
1068        self.parent
1069    }
1070}
1071
1072impl StepAccumulator for FilterInSplitBuilder {
1073    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1074        &mut self.steps
1075    }
1076}
1077
1078// ── Choice/When/Otherwise builders ─────────────────────────────────────────
1079
1080/// Builder for a `.choice()` ... `.end_choice()` block.
1081///
1082/// Accumulates `when` clauses and an optional `otherwise` clause.
1083/// Cannot call `.build()` until `.end_choice()` is called.
1084pub struct ChoiceBuilder {
1085    parent: RouteBuilder,
1086    whens: Vec<WhenStep>,
1087    _otherwise: Option<Vec<BuilderStep>>,
1088}
1089
1090impl ChoiceBuilder {
1091    /// Open a `when` clause. Only exchanges matching `predicate` will be
1092    /// processed by the steps inside the `.when()` ... `.end_when()` scope.
1093    pub fn when<F>(self, predicate: F) -> WhenBuilder
1094    where
1095        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1096    {
1097        WhenBuilder {
1098            parent: self,
1099            predicate: std::sync::Arc::new(predicate),
1100            steps: vec![],
1101        }
1102    }
1103
1104    /// Open an `otherwise` clause. Executed when no `when` predicate matched.
1105    ///
1106    /// Only one `otherwise` is allowed per `choice`. Call this after all `.when()` clauses.
1107    pub fn otherwise(self) -> OtherwiseBuilder {
1108        OtherwiseBuilder {
1109            parent: self,
1110            steps: vec![],
1111        }
1112    }
1113
1114    /// Close the choice scope. Packages all accumulated `when` clauses and
1115    /// optional `otherwise` into a `BuilderStep::Choice` and returns the
1116    /// parent `RouteBuilder`.
1117    pub fn end_choice(mut self) -> RouteBuilder {
1118        let step = BuilderStep::Choice {
1119            whens: self.whens,
1120            otherwise: self._otherwise,
1121        };
1122        self.parent.steps.push(step);
1123        self.parent
1124    }
1125}
1126
1127/// Builder for the sub-pipeline within a `.when()` ... `.end_when()` block.
1128pub struct WhenBuilder {
1129    parent: ChoiceBuilder,
1130    predicate: camel_api::FilterPredicate,
1131    steps: Vec<BuilderStep>,
1132}
1133
1134impl WhenBuilder {
1135    /// Close the when scope. Packages the accumulated sub-steps into a
1136    /// `WhenStep` and returns the parent `ChoiceBuilder`.
1137    pub fn end_when(mut self) -> ChoiceBuilder {
1138        self.parent.whens.push(WhenStep {
1139            predicate: self.predicate,
1140            steps: self.steps,
1141        });
1142        self.parent
1143    }
1144}
1145
1146impl StepAccumulator for WhenBuilder {
1147    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1148        &mut self.steps
1149    }
1150}
1151
1152/// Builder for the sub-pipeline within an `.otherwise()` ... `.end_otherwise()` block.
1153pub struct OtherwiseBuilder {
1154    parent: ChoiceBuilder,
1155    steps: Vec<BuilderStep>,
1156}
1157
1158impl OtherwiseBuilder {
1159    /// Close the otherwise scope and return the parent `ChoiceBuilder`.
1160    pub fn end_otherwise(self) -> ChoiceBuilder {
1161        let OtherwiseBuilder { mut parent, steps } = self;
1162        parent._otherwise = Some(steps);
1163        parent
1164    }
1165}
1166
1167impl StepAccumulator for OtherwiseBuilder {
1168    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1169        &mut self.steps
1170    }
1171}
1172
1173/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
1174///
1175/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1176/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
1177///
1178/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
1179/// and returns the parent `RouteBuilder`.
1180pub struct MulticastBuilder {
1181    parent: RouteBuilder,
1182    steps: Vec<BuilderStep>,
1183    config: MulticastConfig,
1184}
1185
1186impl MulticastBuilder {
1187    pub fn parallel(mut self, parallel: bool) -> Self {
1188        self.config = self.config.parallel(parallel);
1189        self
1190    }
1191
1192    pub fn parallel_limit(mut self, limit: usize) -> Self {
1193        self.config = self.config.parallel_limit(limit);
1194        self
1195    }
1196
1197    pub fn stop_on_exception(mut self, stop: bool) -> Self {
1198        self.config = self.config.stop_on_exception(stop);
1199        self
1200    }
1201
1202    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1203        self.config = self.config.timeout(duration);
1204        self
1205    }
1206
1207    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1208        self.config = self.config.aggregation(strategy);
1209        self
1210    }
1211
1212    pub fn end_multicast(mut self) -> RouteBuilder {
1213        let step = BuilderStep::Multicast {
1214            steps: self.steps,
1215            config: self.config,
1216        };
1217        self.parent.steps.push(step);
1218        self.parent
1219    }
1220}
1221
1222impl StepAccumulator for MulticastBuilder {
1223    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1224        &mut self.steps
1225    }
1226}
1227
1228/// Builder for the sub-pipeline within a `.throttle()` ... `.end_throttle()` block.
1229///
1230/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1231/// but NOT `.build()` and NOT `.throttle()` (no nested throttles).
1232///
1233/// Calling `.end_throttle()` packages the sub-steps into a `BuilderStep::Throttle`
1234/// and returns the parent `RouteBuilder`.
1235pub struct ThrottleBuilder {
1236    parent: RouteBuilder,
1237    config: ThrottlerConfig,
1238    steps: Vec<BuilderStep>,
1239}
1240
1241impl ThrottleBuilder {
1242    /// Set the throttle strategy. Default is `Delay`.
1243    ///
1244    /// - `Delay`: Queue messages until capacity available
1245    /// - `Reject`: Return error immediately when throttled
1246    /// - `Drop`: Silently discard excess messages
1247    pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1248        self.config = self.config.strategy(strategy);
1249        self
1250    }
1251
1252    /// Close the throttle scope. Packages the accumulated sub-steps into a
1253    /// `BuilderStep::Throttle` and returns the parent `RouteBuilder`.
1254    pub fn end_throttle(mut self) -> RouteBuilder {
1255        let step = BuilderStep::Throttle {
1256            config: self.config,
1257            steps: self.steps,
1258        };
1259        self.parent.steps.push(step);
1260        self.parent
1261    }
1262}
1263
1264impl StepAccumulator for ThrottleBuilder {
1265    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1266        &mut self.steps
1267    }
1268}
1269
1270/// Builder for the sub-pipeline within a `.loop_count()` / `.loop_while()` ... `.end_loop()` block.
1271pub struct LoopBuilder {
1272    parent: RouteBuilder,
1273    config: LoopConfig,
1274    steps: Vec<BuilderStep>,
1275}
1276
1277impl LoopBuilder {
1278    pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1279        LoopInLoopBuilder {
1280            parent: self,
1281            config: LoopConfig {
1282                mode: LoopMode::Count(count),
1283            },
1284            steps: vec![],
1285        }
1286    }
1287
1288    pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1289    where
1290        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1291    {
1292        LoopInLoopBuilder {
1293            parent: self,
1294            config: LoopConfig {
1295                mode: LoopMode::While(std::sync::Arc::new(predicate)),
1296            },
1297            steps: vec![],
1298        }
1299    }
1300
1301    pub fn end_loop(mut self) -> RouteBuilder {
1302        let step = BuilderStep::Loop {
1303            config: self.config,
1304            steps: self.steps,
1305        };
1306        self.parent.steps.push(step);
1307        self.parent
1308    }
1309}
1310
1311impl StepAccumulator for LoopBuilder {
1312    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1313        &mut self.steps
1314    }
1315}
1316
1317pub struct LoopInLoopBuilder {
1318    parent: LoopBuilder,
1319    config: LoopConfig,
1320    steps: Vec<BuilderStep>,
1321}
1322
1323impl LoopInLoopBuilder {
1324    pub fn end_loop(mut self) -> LoopBuilder {
1325        let step = BuilderStep::Loop {
1326            config: self.config,
1327            steps: self.steps,
1328        };
1329        self.parent.steps.push(step);
1330        self.parent
1331    }
1332}
1333
1334impl StepAccumulator for LoopInLoopBuilder {
1335    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1336        &mut self.steps
1337    }
1338}
1339
1340/// Builder for the sub-pipeline within a `.load_balance()` ... `.end_load_balance()` block.
1341///
1342/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1343/// but NOT `.build()` and NOT `.load_balance()` (no nested load balancers).
1344///
1345/// Calling `.end_load_balance()` packages the sub-steps into a `BuilderStep::LoadBalance`
1346/// and returns the parent `RouteBuilder`.
1347pub struct LoadBalancerBuilder {
1348    parent: RouteBuilder,
1349    config: LoadBalancerConfig,
1350    steps: Vec<BuilderStep>,
1351}
1352
1353impl LoadBalancerBuilder {
1354    /// Set the load balance strategy to round-robin (default).
1355    pub fn round_robin(mut self) -> Self {
1356        self.config = LoadBalancerConfig::round_robin();
1357        self
1358    }
1359
1360    /// Set the load balance strategy to random selection.
1361    pub fn random(mut self) -> Self {
1362        self.config = LoadBalancerConfig::random();
1363        self
1364    }
1365
1366    /// Set the load balance strategy to weighted selection.
1367    ///
1368    /// Each endpoint is assigned a weight that determines its probability
1369    /// of being selected.
1370    pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1371        self.config = LoadBalancerConfig::weighted(weights);
1372        self
1373    }
1374
1375    /// Set the load balance strategy to failover.
1376    ///
1377    /// Exchanges are sent to the first endpoint; on failure, the next endpoint
1378    /// is tried.
1379    pub fn failover(mut self) -> Self {
1380        self.config = LoadBalancerConfig::failover();
1381        self
1382    }
1383
1384    /// Enable or disable parallel execution of endpoints.
1385    ///
1386    /// When enabled, all endpoints receive the exchange simultaneously.
1387    /// When disabled (default), only one endpoint is selected per exchange.
1388    pub fn parallel(mut self, parallel: bool) -> Self {
1389        self.config = self.config.parallel(parallel);
1390        self
1391    }
1392
1393    /// Close the load balance scope. Packages the accumulated sub-steps into a
1394    /// `BuilderStep::LoadBalance` and returns the parent `RouteBuilder`.
1395    pub fn end_load_balance(mut self) -> RouteBuilder {
1396        let step = BuilderStep::LoadBalance {
1397            config: self.config,
1398            steps: self.steps,
1399        };
1400        self.parent.steps.push(step);
1401        self.parent
1402    }
1403}
1404
1405impl StepAccumulator for LoadBalancerBuilder {
1406    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1407        &mut self.steps
1408    }
1409}
1410
1411// ---------------------------------------------------------------------------
1412// Tests
1413// ---------------------------------------------------------------------------
1414
1415#[cfg(test)]
1416mod tests {
1417    use super::*;
1418    use camel_api::error_handler::ErrorHandlerConfig;
1419    use camel_api::load_balancer::LoadBalanceStrategy;
1420    use camel_api::{Exchange, Message};
1421    use camel_core::route::BuilderStep;
1422    use std::sync::Arc;
1423    use std::time::Duration;
1424    use tower::{Service, ServiceExt};
1425
1426    #[test]
1427    fn test_builder_from_creates_definition() {
1428        let definition = RouteBuilder::from("timer:tick")
1429            .route_id("test-route")
1430            .build()
1431            .unwrap();
1432        assert_eq!(definition.from_uri(), "timer:tick");
1433    }
1434
1435    #[test]
1436    fn test_builder_empty_from_uri_errors() {
1437        let result = RouteBuilder::from("").route_id("test-route").build();
1438        assert!(result.is_err());
1439    }
1440
1441    #[test]
1442    fn test_builder_to_adds_step() {
1443        let definition = RouteBuilder::from("timer:tick")
1444            .route_id("test-route")
1445            .to("log:info")
1446            .build()
1447            .unwrap();
1448
1449        assert_eq!(definition.from_uri(), "timer:tick");
1450        // We can verify steps were added by checking the structure
1451        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1452    }
1453
1454    #[test]
1455    fn test_builder_filter_adds_filter_step() {
1456        let definition = RouteBuilder::from("timer:tick")
1457            .route_id("test-route")
1458            .filter(|_ex| true)
1459            .to("mock:result")
1460            .end_filter()
1461            .build()
1462            .unwrap();
1463
1464        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1465    }
1466
1467    #[test]
1468    fn test_builder_set_header_adds_processor_step() {
1469        let definition = RouteBuilder::from("timer:tick")
1470            .route_id("test-route")
1471            .set_header("key", Value::String("value".into()))
1472            .build()
1473            .unwrap();
1474
1475        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1476    }
1477
1478    #[test]
1479    fn test_builder_map_body_adds_processor_step() {
1480        let definition = RouteBuilder::from("timer:tick")
1481            .route_id("test-route")
1482            .map_body(|body| body)
1483            .build()
1484            .unwrap();
1485
1486        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1487    }
1488
1489    #[test]
1490    fn test_builder_process_adds_processor_step() {
1491        let definition = RouteBuilder::from("timer:tick")
1492            .route_id("test-route")
1493            .process(|ex| async move { Ok(ex) })
1494            .build()
1495            .unwrap();
1496
1497        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1498    }
1499
1500    #[test]
1501    fn test_builder_chain_multiple_steps() {
1502        let definition = RouteBuilder::from("timer:tick")
1503            .route_id("test-route")
1504            .set_header("source", Value::String("timer".into()))
1505            .filter(|ex| ex.input.header("source").is_some())
1506            .to("log:info")
1507            .end_filter()
1508            .to("mock:result")
1509            .build()
1510            .unwrap();
1511
1512        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
1513        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
1514        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
1515        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1516    }
1517
1518    #[test]
1519    fn test_loop_count_builder() {
1520        use camel_api::loop_eip::LoopMode;
1521
1522        let def = RouteBuilder::from("direct:start")
1523            .route_id("loop-test")
1524            .loop_count(3)
1525            .to("mock:inside")
1526            .end_loop()
1527            .to("mock:after")
1528            .build()
1529            .unwrap();
1530
1531        assert_eq!(def.steps().len(), 2);
1532        match &def.steps()[0] {
1533            BuilderStep::Loop { config, steps } => {
1534                assert!(matches!(config.mode, LoopMode::Count(3)));
1535                assert_eq!(steps.len(), 1);
1536            }
1537            other => panic!("Expected Loop, got {:?}", other),
1538        }
1539        assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1540    }
1541
1542    #[test]
1543    fn test_loop_while_builder() {
1544        use camel_api::loop_eip::LoopMode;
1545
1546        let def = RouteBuilder::from("direct:start")
1547            .route_id("loop-while-test")
1548            .loop_while(|_ex| true)
1549            .to("mock:retry")
1550            .end_loop()
1551            .build()
1552            .unwrap();
1553
1554        assert_eq!(def.steps().len(), 1);
1555        match &def.steps()[0] {
1556            BuilderStep::Loop { config, steps } => {
1557                assert!(matches!(config.mode, LoopMode::While(_)));
1558                assert_eq!(steps.len(), 1);
1559            }
1560            other => panic!("Expected Loop, got {:?}", other),
1561        }
1562    }
1563
1564    #[test]
1565    fn test_nested_loop_builder() {
1566        use camel_api::loop_eip::LoopMode;
1567
1568        let def = RouteBuilder::from("direct:start")
1569            .route_id("nested-loop-test")
1570            .loop_count(2)
1571            .to("mock:outer")
1572            .loop_count(3)
1573            .to("mock:inner")
1574            .end_loop()
1575            .end_loop()
1576            .to("mock:after")
1577            .build()
1578            .unwrap();
1579
1580        assert_eq!(def.steps().len(), 2);
1581        match &def.steps()[0] {
1582            BuilderStep::Loop { steps, .. } => {
1583                assert_eq!(steps.len(), 2);
1584                match &steps[1] {
1585                    BuilderStep::Loop {
1586                        config,
1587                        steps: inner_steps,
1588                    } => {
1589                        assert!(matches!(config.mode, LoopMode::Count(3)));
1590                        assert_eq!(inner_steps.len(), 1);
1591                    }
1592                    other => panic!("Expected nested Loop, got {:?}", other),
1593                }
1594            }
1595            other => panic!("Expected outer Loop, got {:?}", other),
1596        }
1597    }
1598
1599    // -----------------------------------------------------------------------
1600    // Processor behavior tests — exercise the real Tower services directly
1601    // -----------------------------------------------------------------------
1602
1603    #[tokio::test]
1604    async fn test_set_header_processor_works() {
1605        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1606        let exchange = Exchange::new(Message::new("test"));
1607        let result = svc.call(exchange).await.unwrap();
1608        assert_eq!(
1609            result.input.header("greeting"),
1610            Some(&Value::String("hello".into()))
1611        );
1612    }
1613
1614    #[tokio::test]
1615    async fn test_filter_processor_passes() {
1616        use camel_api::BoxProcessorExt;
1617        use camel_processor::FilterService;
1618
1619        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1620        let mut svc =
1621            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1622        let exchange = Exchange::new(Message::new("pass"));
1623        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1624        assert_eq!(result.input.body.as_text(), Some("pass"));
1625    }
1626
1627    #[tokio::test]
1628    async fn test_filter_processor_blocks() {
1629        use camel_api::BoxProcessorExt;
1630        use camel_processor::FilterService;
1631
1632        let sub = BoxProcessor::from_fn(|_ex| {
1633            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1634        });
1635        let mut svc =
1636            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1637        let exchange = Exchange::new(Message::new("reject"));
1638        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1639        assert_eq!(result.input.body.as_text(), Some("reject"));
1640    }
1641
1642    #[tokio::test]
1643    async fn test_map_body_processor_works() {
1644        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1645            if let Some(text) = body.as_text() {
1646                Body::Text(text.to_uppercase())
1647            } else {
1648                body
1649            }
1650        });
1651        let exchange = Exchange::new(Message::new("hello"));
1652        let result = mapper.oneshot(exchange).await.unwrap();
1653        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1654    }
1655
1656    #[tokio::test]
1657    async fn test_process_custom_processor_works() {
1658        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1659            ex.set_property("custom", Value::Bool(true));
1660            Ok(ex)
1661        });
1662        let exchange = Exchange::new(Message::default());
1663        let result = processor.oneshot(exchange).await.unwrap();
1664        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1665    }
1666
1667    // -----------------------------------------------------------------------
1668    // Sequential pipeline test
1669    // -----------------------------------------------------------------------
1670
1671    #[tokio::test]
1672    async fn test_compose_pipeline_runs_steps_in_order() {
1673        use camel_core::route::compose_pipeline;
1674
1675        let processors = vec![
1676            BoxProcessor::new(SetHeader::new(
1677                IdentityProcessor,
1678                "step",
1679                Value::String("one".into()),
1680            )),
1681            BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1682                if let Some(text) = body.as_text() {
1683                    Body::Text(format!("{}-processed", text))
1684                } else {
1685                    body
1686                }
1687            })),
1688        ];
1689
1690        let pipeline = compose_pipeline(processors);
1691        let exchange = Exchange::new(Message::new("hello"));
1692        let result = pipeline.oneshot(exchange).await.unwrap();
1693
1694        assert_eq!(
1695            result.input.header("step"),
1696            Some(&Value::String("one".into()))
1697        );
1698        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1699    }
1700
1701    #[tokio::test]
1702    async fn test_compose_pipeline_empty_is_identity() {
1703        use camel_core::route::compose_pipeline;
1704
1705        let pipeline = compose_pipeline(vec![]);
1706        let exchange = Exchange::new(Message::new("unchanged"));
1707        let result = pipeline.oneshot(exchange).await.unwrap();
1708        assert_eq!(result.input.body.as_text(), Some("unchanged"));
1709    }
1710
1711    // -----------------------------------------------------------------------
1712    // Circuit breaker builder tests
1713    // -----------------------------------------------------------------------
1714
1715    #[test]
1716    fn test_builder_circuit_breaker_sets_config() {
1717        use camel_api::circuit_breaker::CircuitBreakerConfig;
1718
1719        let config = CircuitBreakerConfig::new().failure_threshold(5);
1720        let definition = RouteBuilder::from("timer:tick")
1721            .route_id("test-route")
1722            .circuit_breaker(config)
1723            .build()
1724            .unwrap();
1725
1726        let cb = definition
1727            .circuit_breaker_config()
1728            .expect("circuit breaker should be set");
1729        assert_eq!(cb.failure_threshold, 5);
1730    }
1731
1732    #[test]
1733    fn test_builder_circuit_breaker_with_error_handler() {
1734        use camel_api::circuit_breaker::CircuitBreakerConfig;
1735        use camel_api::error_handler::ErrorHandlerConfig;
1736
1737        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1738        let eh_config = ErrorHandlerConfig::log_only();
1739
1740        let definition = RouteBuilder::from("timer:tick")
1741            .route_id("test-route")
1742            .to("log:info")
1743            .circuit_breaker(cb_config)
1744            .error_handler(eh_config)
1745            .build()
1746            .unwrap();
1747
1748        assert!(
1749            definition.circuit_breaker_config().is_some(),
1750            "circuit breaker config should be set"
1751        );
1752        // Route definition was built successfully with both configs.
1753    }
1754
1755    #[test]
1756    fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1757        let definition = RouteBuilder::from("direct:start")
1758            .route_id("test-route")
1759            .dead_letter_channel("log:dlc")
1760            .on_exception(|e| matches!(e, CamelError::Io(_)))
1761            .retry(3)
1762            .handled_by("log:io")
1763            .end_on_exception()
1764            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1765            .retry(1)
1766            .end_on_exception()
1767            .to("mock:out")
1768            .build()
1769            .expect("route should build");
1770
1771        let cfg = definition
1772            .error_handler_config()
1773            .expect("error handler should be set");
1774        assert_eq!(cfg.policies.len(), 2);
1775        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1776        assert_eq!(
1777            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1778            Some(3)
1779        );
1780        assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1781        assert_eq!(
1782            cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1783            Some(1)
1784        );
1785    }
1786
1787    #[test]
1788    fn test_builder_on_exception_mixed_mode_rejected() {
1789        let result = RouteBuilder::from("direct:start")
1790            .route_id("test-route")
1791            .error_handler(ErrorHandlerConfig::log_only())
1792            .on_exception(|_e| true)
1793            .end_on_exception()
1794            .to("mock:out")
1795            .build();
1796
1797        let err = result.err().expect("mixed mode should fail with an error");
1798
1799        assert!(
1800            format!("{err}").contains("mixed error handler modes"),
1801            "unexpected error: {err}"
1802        );
1803    }
1804
1805    #[test]
1806    fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1807        let definition = RouteBuilder::from("direct:start")
1808            .route_id("test-route")
1809            .on_exception(|_e| true)
1810            .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1811            .with_jitter(0.5)
1812            .end_on_exception()
1813            .to("mock:out")
1814            .build()
1815            .expect("route should build");
1816
1817        let cfg = definition
1818            .error_handler_config()
1819            .expect("error handler should be set");
1820        assert_eq!(cfg.policies.len(), 1);
1821        assert!(cfg.policies[0].retry.is_none());
1822    }
1823
1824    #[test]
1825    fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1826        let definition = RouteBuilder::from("direct:start")
1827            .route_id("test-route")
1828            .dead_letter_channel("log:dlc")
1829            .to("mock:out")
1830            .build()
1831            .expect("route should build");
1832
1833        let cfg = definition
1834            .error_handler_config()
1835            .expect("error handler should be set");
1836        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1837        assert!(cfg.policies.is_empty());
1838    }
1839
1840    #[test]
1841    fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1842        let definition = RouteBuilder::from("direct:start")
1843            .route_id("test-route")
1844            .dead_letter_channel("log:first")
1845            .on_exception(|e| matches!(e, CamelError::Io(_)))
1846            .retry(2)
1847            .end_on_exception()
1848            .dead_letter_channel("log:second")
1849            .to("mock:out")
1850            .build()
1851            .expect("route should build");
1852
1853        let cfg = definition
1854            .error_handler_config()
1855            .expect("error handler should be set");
1856        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1857        assert_eq!(cfg.policies.len(), 1);
1858        assert_eq!(
1859            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1860            Some(2)
1861        );
1862    }
1863
1864    #[test]
1865    fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
1866        let definition = RouteBuilder::from("direct:start")
1867            .route_id("test-route")
1868            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1869            .retry(1)
1870            .end_on_exception()
1871            .to("mock:out")
1872            .build()
1873            .expect("route should build");
1874
1875        let cfg = definition
1876            .error_handler_config()
1877            .expect("error handler should be set");
1878        assert!(cfg.dlc_uri.is_none());
1879        assert_eq!(cfg.policies.len(), 1);
1880    }
1881
1882    #[test]
1883    fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
1884        let first = ErrorHandlerConfig::dead_letter_channel("log:first");
1885        let second = ErrorHandlerConfig::dead_letter_channel("log:second");
1886
1887        let definition = RouteBuilder::from("direct:start")
1888            .route_id("test-route")
1889            .error_handler(first)
1890            .error_handler(second)
1891            .to("mock:out")
1892            .build()
1893            .expect("route should build");
1894
1895        let cfg = definition
1896            .error_handler_config()
1897            .expect("error handler should be set");
1898        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1899    }
1900
1901    // --- Splitter builder tests ---
1902
1903    #[test]
1904    fn test_split_builder_typestate() {
1905        use camel_api::splitter::{SplitterConfig, split_body_lines};
1906
1907        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
1908        let definition = RouteBuilder::from("timer:test?period=1000")
1909            .route_id("test-route")
1910            .split(SplitterConfig::new(split_body_lines()))
1911            .to("mock:per-fragment")
1912            .end_split()
1913            .to("mock:final")
1914            .build()
1915            .unwrap();
1916
1917        // Should have 2 top-level steps: Split + To("mock:final")
1918        assert_eq!(definition.steps().len(), 2);
1919    }
1920
1921    #[test]
1922    fn test_split_builder_steps_collected() {
1923        use camel_api::splitter::{SplitterConfig, split_body_lines};
1924
1925        let definition = RouteBuilder::from("timer:test?period=1000")
1926            .route_id("test-route")
1927            .split(SplitterConfig::new(split_body_lines()))
1928            .set_header("fragment", Value::String("yes".into()))
1929            .to("mock:per-fragment")
1930            .end_split()
1931            .build()
1932            .unwrap();
1933
1934        // Should have 1 top-level step: Split (containing 2 sub-steps)
1935        assert_eq!(definition.steps().len(), 1);
1936        match &definition.steps()[0] {
1937            BuilderStep::Split { steps, .. } => {
1938                assert_eq!(steps.len(), 2); // SetHeader + To
1939            }
1940            other => panic!("Expected Split, got {:?}", other),
1941        }
1942    }
1943
1944    #[test]
1945    fn test_split_builder_config_propagated() {
1946        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1947
1948        let definition = RouteBuilder::from("timer:test?period=1000")
1949            .route_id("test-route")
1950            .split(
1951                SplitterConfig::new(split_body_lines())
1952                    .parallel(true)
1953                    .parallel_limit(4)
1954                    .aggregation(AggregationStrategy::CollectAll),
1955            )
1956            .to("mock:per-fragment")
1957            .end_split()
1958            .build()
1959            .unwrap();
1960
1961        match &definition.steps()[0] {
1962            BuilderStep::Split { config, .. } => {
1963                assert!(config.parallel);
1964                assert_eq!(config.parallel_limit, Some(4));
1965                assert!(matches!(
1966                    config.aggregation,
1967                    AggregationStrategy::CollectAll
1968                ));
1969            }
1970            other => panic!("Expected Split, got {:?}", other),
1971        }
1972    }
1973
1974    #[test]
1975    fn test_aggregate_builder_adds_step() {
1976        use camel_api::aggregator::AggregatorConfig;
1977        use camel_core::route::BuilderStep;
1978
1979        let definition = RouteBuilder::from("timer:tick")
1980            .route_id("test-route")
1981            .aggregate(
1982                AggregatorConfig::correlate_by("key")
1983                    .complete_when_size(2)
1984                    .build(),
1985            )
1986            .build()
1987            .unwrap();
1988
1989        assert_eq!(definition.steps().len(), 1);
1990        assert!(matches!(
1991            definition.steps()[0],
1992            BuilderStep::Aggregate { .. }
1993        ));
1994    }
1995
1996    #[test]
1997    fn test_aggregate_in_split_builder() {
1998        use camel_api::aggregator::AggregatorConfig;
1999        use camel_api::splitter::{SplitterConfig, split_body_lines};
2000        use camel_core::route::BuilderStep;
2001
2002        let definition = RouteBuilder::from("timer:tick")
2003            .route_id("test-route")
2004            .split(SplitterConfig::new(split_body_lines()))
2005            .aggregate(
2006                AggregatorConfig::correlate_by("key")
2007                    .complete_when_size(1)
2008                    .build(),
2009            )
2010            .end_split()
2011            .build()
2012            .unwrap();
2013
2014        assert_eq!(definition.steps().len(), 1);
2015        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2016            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2017        } else {
2018            panic!("expected Split step");
2019        }
2020    }
2021
2022    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
2023
2024    #[test]
2025    fn test_builder_set_body_static_adds_processor() {
2026        let definition = RouteBuilder::from("timer:tick")
2027            .route_id("test-route")
2028            .set_body("fixed")
2029            .build()
2030            .unwrap();
2031        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2032    }
2033
2034    #[test]
2035    fn test_builder_set_body_fn_adds_processor() {
2036        let definition = RouteBuilder::from("timer:tick")
2037            .route_id("test-route")
2038            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2039            .build()
2040            .unwrap();
2041        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2042    }
2043
2044    #[test]
2045    fn transform_alias_produces_same_as_set_body() {
2046        let route_transform = RouteBuilder::from("timer:tick")
2047            .route_id("test-route")
2048            .transform("hello")
2049            .build()
2050            .unwrap();
2051
2052        let route_set_body = RouteBuilder::from("timer:tick")
2053            .route_id("test-route")
2054            .set_body("hello")
2055            .build()
2056            .unwrap();
2057
2058        assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2059    }
2060
2061    #[test]
2062    fn test_builder_set_header_fn_adds_processor() {
2063        let definition = RouteBuilder::from("timer:tick")
2064            .route_id("test-route")
2065            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2066            .build()
2067            .unwrap();
2068        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2069    }
2070
2071    #[tokio::test]
2072    async fn test_set_body_static_processor_works() {
2073        use camel_core::route::compose_pipeline;
2074        let def = RouteBuilder::from("t:t")
2075            .route_id("test-route")
2076            .set_body("replaced")
2077            .build()
2078            .unwrap();
2079        let pipeline = compose_pipeline(
2080            def.steps()
2081                .iter()
2082                .filter_map(|s| {
2083                    if let BuilderStep::Processor(p) = s {
2084                        Some(p.clone())
2085                    } else {
2086                        None
2087                    }
2088                })
2089                .collect(),
2090        );
2091        let exchange = Exchange::new(Message::new("original"));
2092        let result = pipeline.oneshot(exchange).await.unwrap();
2093        assert_eq!(result.input.body.as_text(), Some("replaced"));
2094    }
2095
2096    #[tokio::test]
2097    async fn test_set_body_fn_processor_works() {
2098        use camel_core::route::compose_pipeline;
2099        let def = RouteBuilder::from("t:t")
2100            .route_id("test-route")
2101            .set_body_fn(|ex: &Exchange| {
2102                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2103            })
2104            .build()
2105            .unwrap();
2106        let pipeline = compose_pipeline(
2107            def.steps()
2108                .iter()
2109                .filter_map(|s| {
2110                    if let BuilderStep::Processor(p) = s {
2111                        Some(p.clone())
2112                    } else {
2113                        None
2114                    }
2115                })
2116                .collect(),
2117        );
2118        let exchange = Exchange::new(Message::new("hello"));
2119        let result = pipeline.oneshot(exchange).await.unwrap();
2120        assert_eq!(result.input.body.as_text(), Some("HELLO"));
2121    }
2122
2123    #[tokio::test]
2124    async fn test_set_header_fn_processor_works() {
2125        use camel_core::route::compose_pipeline;
2126        let def = RouteBuilder::from("t:t")
2127            .route_id("test-route")
2128            .set_header_fn("echo", |ex: &Exchange| {
2129                ex.input
2130                    .body
2131                    .as_text()
2132                    .map(|t| Value::String(t.into()))
2133                    .unwrap_or(Value::Null)
2134            })
2135            .build()
2136            .unwrap();
2137        let pipeline = compose_pipeline(
2138            def.steps()
2139                .iter()
2140                .filter_map(|s| {
2141                    if let BuilderStep::Processor(p) = s {
2142                        Some(p.clone())
2143                    } else {
2144                        None
2145                    }
2146                })
2147                .collect(),
2148        );
2149        let exchange = Exchange::new(Message::new("ping"));
2150        let result = pipeline.oneshot(exchange).await.unwrap();
2151        assert_eq!(
2152            result.input.header("echo"),
2153            Some(&Value::String("ping".into()))
2154        );
2155    }
2156
2157    // ── FilterBuilder typestate tests ─────────────────────────────────────
2158
2159    #[test]
2160    fn test_filter_builder_typestate() {
2161        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2162            .route_id("test-route")
2163            .filter(|_ex| true)
2164            .to("mock:inner")
2165            .end_filter()
2166            .to("mock:outer")
2167            .build();
2168        assert!(result.is_ok());
2169    }
2170
2171    #[test]
2172    fn test_filter_builder_steps_collected() {
2173        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2174            .route_id("test-route")
2175            .filter(|_ex| true)
2176            .to("mock:inner")
2177            .end_filter()
2178            .build()
2179            .unwrap();
2180
2181        assert_eq!(definition.steps().len(), 1);
2182        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2183    }
2184
2185    #[test]
2186    fn test_wire_tap_builder_adds_step() {
2187        let definition = RouteBuilder::from("timer:tick")
2188            .route_id("test-route")
2189            .wire_tap("mock:tap")
2190            .to("mock:result")
2191            .build()
2192            .unwrap();
2193
2194        assert_eq!(definition.steps().len(), 2);
2195        assert!(
2196            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2197        );
2198        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2199    }
2200
2201    // ── MulticastBuilder typestate tests ─────────────────────────────────────
2202
2203    #[test]
2204    fn test_multicast_builder_typestate() {
2205        let definition = RouteBuilder::from("timer:tick")
2206            .route_id("test-route")
2207            .multicast()
2208            .to("direct:a")
2209            .to("direct:b")
2210            .end_multicast()
2211            .to("mock:result")
2212            .build()
2213            .unwrap();
2214
2215        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
2216    }
2217
2218    #[test]
2219    fn test_multicast_builder_steps_collected() {
2220        let definition = RouteBuilder::from("timer:tick")
2221            .route_id("test-route")
2222            .multicast()
2223            .to("direct:a")
2224            .to("direct:b")
2225            .end_multicast()
2226            .build()
2227            .unwrap();
2228
2229        match &definition.steps()[0] {
2230            BuilderStep::Multicast { steps, .. } => {
2231                assert_eq!(steps.len(), 2);
2232            }
2233            other => panic!("Expected Multicast, got {:?}", other),
2234        }
2235    }
2236
2237    // ── Concurrency builder tests ─────────────────────────────────────
2238
2239    #[test]
2240    fn test_builder_concurrent_sets_concurrency() {
2241        use camel_component_api::ConcurrencyModel;
2242
2243        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2244            .route_id("test-route")
2245            .concurrent(16)
2246            .to("log:info")
2247            .build()
2248            .unwrap();
2249
2250        assert_eq!(
2251            definition.concurrency_override(),
2252            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2253        );
2254    }
2255
2256    #[test]
2257    fn test_builder_concurrent_zero_means_unbounded() {
2258        use camel_component_api::ConcurrencyModel;
2259
2260        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2261            .route_id("test-route")
2262            .concurrent(0)
2263            .to("log:info")
2264            .build()
2265            .unwrap();
2266
2267        assert_eq!(
2268            definition.concurrency_override(),
2269            Some(&ConcurrencyModel::Concurrent { max: None })
2270        );
2271    }
2272
2273    #[test]
2274    fn test_builder_sequential_sets_concurrency() {
2275        use camel_component_api::ConcurrencyModel;
2276
2277        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2278            .route_id("test-route")
2279            .sequential()
2280            .to("log:info")
2281            .build()
2282            .unwrap();
2283
2284        assert_eq!(
2285            definition.concurrency_override(),
2286            Some(&ConcurrencyModel::Sequential)
2287        );
2288    }
2289
2290    #[test]
2291    fn test_builder_default_concurrency_is_none() {
2292        let definition = RouteBuilder::from("timer:tick")
2293            .route_id("test-route")
2294            .to("log:info")
2295            .build()
2296            .unwrap();
2297
2298        assert_eq!(definition.concurrency_override(), None);
2299    }
2300
2301    // ── Route lifecycle builder tests ─────────────────────────────────────
2302
2303    #[test]
2304    fn test_builder_route_id_sets_id() {
2305        let definition = RouteBuilder::from("timer:tick")
2306            .route_id("my-route")
2307            .build()
2308            .unwrap();
2309
2310        assert_eq!(definition.route_id(), "my-route");
2311    }
2312
2313    #[test]
2314    fn test_build_without_route_id_fails() {
2315        let result = RouteBuilder::from("timer:tick?period=1000")
2316            .to("log:info")
2317            .build();
2318        let err = match result {
2319            Err(e) => e.to_string(),
2320            Ok(_) => panic!("build() should fail without route_id"),
2321        };
2322        assert!(
2323            err.contains("route_id"),
2324            "error should mention route_id, got: {}",
2325            err
2326        );
2327    }
2328
2329    #[test]
2330    fn test_builder_auto_startup_false() {
2331        let definition = RouteBuilder::from("timer:tick")
2332            .route_id("test-route")
2333            .auto_startup(false)
2334            .build()
2335            .unwrap();
2336
2337        assert!(!definition.auto_startup());
2338    }
2339
2340    #[test]
2341    fn test_builder_startup_order_custom() {
2342        let definition = RouteBuilder::from("timer:tick")
2343            .route_id("test-route")
2344            .startup_order(50)
2345            .build()
2346            .unwrap();
2347
2348        assert_eq!(definition.startup_order(), 50);
2349    }
2350
2351    #[test]
2352    fn test_builder_defaults() {
2353        let definition = RouteBuilder::from("timer:tick")
2354            .route_id("test-route")
2355            .build()
2356            .unwrap();
2357
2358        assert_eq!(definition.route_id(), "test-route");
2359        assert!(definition.auto_startup());
2360        assert_eq!(definition.startup_order(), 1000);
2361    }
2362
2363    // ── Choice typestate tests ──────────────────────────────────────────────────
2364
2365    #[test]
2366    fn test_choice_builder_single_when() {
2367        let definition = RouteBuilder::from("timer:tick")
2368            .route_id("test-route")
2369            .choice()
2370            .when(|ex: &Exchange| ex.input.header("type").is_some())
2371            .to("mock:typed")
2372            .end_when()
2373            .end_choice()
2374            .build()
2375            .unwrap();
2376        assert_eq!(definition.steps().len(), 1);
2377        assert!(
2378            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2379            if whens.len() == 1 && otherwise.is_none())
2380        );
2381    }
2382
2383    #[test]
2384    fn test_choice_builder_when_otherwise() {
2385        let definition = RouteBuilder::from("timer:tick")
2386            .route_id("test-route")
2387            .choice()
2388            .when(|ex: &Exchange| ex.input.header("a").is_some())
2389            .to("mock:a")
2390            .end_when()
2391            .otherwise()
2392            .to("mock:fallback")
2393            .end_otherwise()
2394            .end_choice()
2395            .build()
2396            .unwrap();
2397        assert!(
2398            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2399            if whens.len() == 1 && otherwise.is_some())
2400        );
2401    }
2402
2403    #[test]
2404    fn test_choice_builder_multiple_whens() {
2405        let definition = RouteBuilder::from("timer:tick")
2406            .route_id("test-route")
2407            .choice()
2408            .when(|ex: &Exchange| ex.input.header("a").is_some())
2409            .to("mock:a")
2410            .end_when()
2411            .when(|ex: &Exchange| ex.input.header("b").is_some())
2412            .to("mock:b")
2413            .end_when()
2414            .end_choice()
2415            .build()
2416            .unwrap();
2417        assert!(
2418            matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2419            if whens.len() == 2)
2420        );
2421    }
2422
2423    #[test]
2424    fn test_choice_step_after_choice() {
2425        // Steps after end_choice() are added to the outer pipeline, not inside choice.
2426        let definition = RouteBuilder::from("timer:tick")
2427            .route_id("test-route")
2428            .choice()
2429            .when(|_ex: &Exchange| true)
2430            .to("mock:inner")
2431            .end_when()
2432            .end_choice()
2433            .to("mock:outer") // must be step[1], not inside choice
2434            .build()
2435            .unwrap();
2436        assert_eq!(definition.steps().len(), 2);
2437        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2438    }
2439
2440    // ── Throttle typestate tests ──────────────────────────────────────────────────
2441
2442    #[test]
2443    fn test_throttle_builder_typestate() {
2444        let definition = RouteBuilder::from("timer:tick")
2445            .route_id("test-route")
2446            .throttle(10, std::time::Duration::from_secs(1))
2447            .to("mock:result")
2448            .end_throttle()
2449            .build()
2450            .unwrap();
2451
2452        assert_eq!(definition.steps().len(), 1);
2453        assert!(matches!(
2454            &definition.steps()[0],
2455            BuilderStep::Throttle { .. }
2456        ));
2457    }
2458
2459    #[test]
2460    fn test_throttle_builder_with_strategy() {
2461        let definition = RouteBuilder::from("timer:tick")
2462            .route_id("test-route")
2463            .throttle(10, std::time::Duration::from_secs(1))
2464            .strategy(ThrottleStrategy::Reject)
2465            .to("mock:result")
2466            .end_throttle()
2467            .build()
2468            .unwrap();
2469
2470        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2471            assert_eq!(config.strategy, ThrottleStrategy::Reject);
2472        } else {
2473            panic!("Expected Throttle step");
2474        }
2475    }
2476
2477    #[test]
2478    fn test_throttle_builder_steps_collected() {
2479        let definition = RouteBuilder::from("timer:tick")
2480            .route_id("test-route")
2481            .throttle(5, std::time::Duration::from_secs(1))
2482            .set_header("throttled", Value::Bool(true))
2483            .to("mock:throttled")
2484            .end_throttle()
2485            .build()
2486            .unwrap();
2487
2488        match &definition.steps()[0] {
2489            BuilderStep::Throttle { steps, .. } => {
2490                assert_eq!(steps.len(), 2); // SetHeader + To
2491            }
2492            other => panic!("Expected Throttle, got {:?}", other),
2493        }
2494    }
2495
2496    #[test]
2497    fn test_throttle_step_after_throttle() {
2498        // Steps after end_throttle() are added to the outer pipeline, not inside throttle.
2499        let definition = RouteBuilder::from("timer:tick")
2500            .route_id("test-route")
2501            .throttle(10, std::time::Duration::from_secs(1))
2502            .to("mock:inner")
2503            .end_throttle()
2504            .to("mock:outer")
2505            .build()
2506            .unwrap();
2507
2508        assert_eq!(definition.steps().len(), 2);
2509        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2510    }
2511
2512    // ── LoadBalance typestate tests ──────────────────────────────────────────────────
2513
2514    #[test]
2515    fn test_load_balance_builder_typestate() {
2516        let definition = RouteBuilder::from("timer:tick")
2517            .route_id("test-route")
2518            .load_balance()
2519            .round_robin()
2520            .to("mock:a")
2521            .to("mock:b")
2522            .end_load_balance()
2523            .build()
2524            .unwrap();
2525
2526        assert_eq!(definition.steps().len(), 1);
2527        assert!(matches!(
2528            &definition.steps()[0],
2529            BuilderStep::LoadBalance { .. }
2530        ));
2531    }
2532
2533    #[test]
2534    fn test_load_balance_builder_with_strategy() {
2535        let definition = RouteBuilder::from("timer:tick")
2536            .route_id("test-route")
2537            .load_balance()
2538            .random()
2539            .to("mock:result")
2540            .end_load_balance()
2541            .build()
2542            .unwrap();
2543
2544        if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2545            assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2546        } else {
2547            panic!("Expected LoadBalance step");
2548        }
2549    }
2550
2551    #[test]
2552    fn test_load_balance_builder_steps_collected() {
2553        let definition = RouteBuilder::from("timer:tick")
2554            .route_id("test-route")
2555            .load_balance()
2556            .set_header("lb", Value::Bool(true))
2557            .to("mock:a")
2558            .end_load_balance()
2559            .build()
2560            .unwrap();
2561
2562        match &definition.steps()[0] {
2563            BuilderStep::LoadBalance { steps, .. } => {
2564                assert_eq!(steps.len(), 2); // SetHeader + To
2565            }
2566            other => panic!("Expected LoadBalance, got {:?}", other),
2567        }
2568    }
2569
2570    #[test]
2571    fn test_load_balance_step_after_load_balance() {
2572        // Steps after end_load_balance() are added to the outer pipeline, not inside load_balance.
2573        let definition = RouteBuilder::from("timer:tick")
2574            .route_id("test-route")
2575            .load_balance()
2576            .to("mock:inner")
2577            .end_load_balance()
2578            .to("mock:outer")
2579            .build()
2580            .unwrap();
2581
2582        assert_eq!(definition.steps().len(), 2);
2583        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2584    }
2585
2586    // ── DynamicRouter typestate tests ──────────────────────────────────────────────────
2587
2588    #[test]
2589    fn test_dynamic_router_builder() {
2590        let definition = RouteBuilder::from("timer:tick")
2591            .route_id("test-route")
2592            .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2593            .build()
2594            .unwrap();
2595
2596        assert_eq!(definition.steps().len(), 1);
2597        assert!(matches!(
2598            &definition.steps()[0],
2599            BuilderStep::DynamicRouter { .. }
2600        ));
2601    }
2602
2603    #[test]
2604    fn test_dynamic_router_builder_with_config() {
2605        let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2606            .max_iterations(100)
2607            .cache_size(500);
2608
2609        let definition = RouteBuilder::from("timer:tick")
2610            .route_id("test-route")
2611            .dynamic_router_with_config(config)
2612            .build()
2613            .unwrap();
2614
2615        assert_eq!(definition.steps().len(), 1);
2616        if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2617            assert_eq!(config.max_iterations, 100);
2618            assert_eq!(config.cache_size, 500);
2619        } else {
2620            panic!("Expected DynamicRouter step");
2621        }
2622    }
2623
2624    #[test]
2625    fn test_dynamic_router_step_after_router() {
2626        // Steps after dynamic_router() are added to the outer pipeline.
2627        let definition = RouteBuilder::from("timer:tick")
2628            .route_id("test-route")
2629            .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2630            .to("mock:outer")
2631            .build()
2632            .unwrap();
2633
2634        assert_eq!(definition.steps().len(), 2);
2635        assert!(matches!(
2636            &definition.steps()[0],
2637            BuilderStep::DynamicRouter { .. }
2638        ));
2639        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2640    }
2641
2642    #[test]
2643    fn routing_slip_builder_creates_step() {
2644        use camel_api::RoutingSlipExpression;
2645
2646        let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2647
2648        let route = RouteBuilder::from("direct:start")
2649            .route_id("routing-slip-test")
2650            .routing_slip(expression)
2651            .build()
2652            .unwrap();
2653
2654        assert!(
2655            matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2656            "Expected RoutingSlip step"
2657        );
2658    }
2659
2660    #[test]
2661    fn routing_slip_with_config_builder_creates_step() {
2662        use camel_api::RoutingSlipConfig;
2663
2664        let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2665            .uri_delimiter("|")
2666            .cache_size(50)
2667            .ignore_invalid_endpoints(true);
2668
2669        let route = RouteBuilder::from("direct:start")
2670            .route_id("routing-slip-config-test")
2671            .routing_slip_with_config(config)
2672            .build()
2673            .unwrap();
2674
2675        if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2676            assert_eq!(config.uri_delimiter, "|");
2677            assert_eq!(config.cache_size, 50);
2678            assert!(config.ignore_invalid_endpoints);
2679        } else {
2680            panic!("Expected RoutingSlip step");
2681        }
2682    }
2683
2684    #[test]
2685    fn test_builder_marshal_adds_processor_step() {
2686        let definition = RouteBuilder::from("timer:tick")
2687            .route_id("test-route")
2688            .marshal("json")
2689            .build()
2690            .unwrap();
2691        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2692    }
2693
2694    #[test]
2695    fn test_builder_unmarshal_adds_processor_step() {
2696        let definition = RouteBuilder::from("timer:tick")
2697            .route_id("test-route")
2698            .unmarshal("json")
2699            .build()
2700            .unwrap();
2701        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2702    }
2703
2704    #[test]
2705    fn test_builder_stream_cache_adds_processor_step() {
2706        let definition = RouteBuilder::from("timer:tick")
2707            .route_id("test-route")
2708            .stream_cache(1024)
2709            .build()
2710            .unwrap();
2711        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2712    }
2713
2714    #[test]
2715    fn validate_adds_to_step_with_validator_uri() {
2716        let def = RouteBuilder::from("direct:in")
2717            .route_id("test")
2718            .validate("schemas/order.xsd")
2719            .build()
2720            .unwrap();
2721        let steps = def.steps();
2722        assert_eq!(steps.len(), 1);
2723        assert!(
2724            matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2725            "got: {:?}",
2726            steps[0]
2727        );
2728    }
2729
2730    #[test]
2731    #[should_panic(expected = "unknown data format: 'csv'")]
2732    fn test_builder_marshal_panics_on_unknown_format() {
2733        let _ = RouteBuilder::from("timer:tick")
2734            .route_id("test-route")
2735            .marshal("csv")
2736            .build();
2737    }
2738
2739    #[test]
2740    #[should_panic(expected = "unknown data format: 'csv'")]
2741    fn test_builder_unmarshal_panics_on_unknown_format() {
2742        let _ = RouteBuilder::from("timer:tick")
2743            .route_id("test-route")
2744            .unmarshal("csv")
2745            .build();
2746    }
2747
2748    #[test]
2749    fn test_builder_recipient_list_creates_step() {
2750        let route = RouteBuilder::from("direct:start")
2751            .route_id("recipient-list-test")
2752            .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2753            .build()
2754            .unwrap();
2755
2756        assert!(matches!(
2757            &route.steps()[0],
2758            BuilderStep::RecipientList { .. }
2759        ));
2760    }
2761
2762    #[test]
2763    fn test_builder_recipient_list_with_config_creates_step() {
2764        let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2765
2766        let route = RouteBuilder::from("direct:start")
2767            .route_id("recipient-list-config-test")
2768            .recipient_list_with_config(config)
2769            .build()
2770            .unwrap();
2771
2772        assert!(matches!(
2773            &route.steps()[0],
2774            BuilderStep::RecipientList { .. }
2775        ));
2776    }
2777
2778    #[test]
2779    fn test_builder_script_adds_script_step() {
2780        let route = RouteBuilder::from("direct:start")
2781            .route_id("script-test")
2782            .script("rhai", "headers[\"x\"] = \"y\"")
2783            .build()
2784            .unwrap();
2785
2786        assert!(matches!(
2787            &route.steps()[0],
2788            BuilderStep::Script { language, script }
2789            if language == "rhai" && script == "headers[\"x\"] = \"y\""
2790        ));
2791    }
2792
2793    #[test]
2794    fn test_builder_delay_and_delay_with_header_add_steps() {
2795        let route = RouteBuilder::from("direct:start")
2796            .route_id("delay-test")
2797            .delay(Duration::from_millis(250))
2798            .delay_with_header(Duration::from_millis(500), "x-delay")
2799            .build()
2800            .unwrap();
2801
2802        assert_eq!(route.steps().len(), 2);
2803        assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
2804        assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
2805    }
2806
2807    #[test]
2808    fn test_builder_log_and_stop_add_steps_in_order() {
2809        let route = RouteBuilder::from("direct:start")
2810            .route_id("log-stop-test")
2811            .log("hello", LogLevel::Info)
2812            .stop()
2813            .to("mock:after")
2814            .build()
2815            .unwrap();
2816
2817        assert_eq!(route.steps().len(), 3);
2818        assert!(matches!(
2819            &route.steps()[0],
2820            BuilderStep::Log { message, .. } if message == "hello"
2821        ));
2822        assert!(matches!(&route.steps()[1], BuilderStep::Stop));
2823        assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
2824    }
2825
2826    #[test]
2827    fn test_builder_stream_cache_default_adds_processor_step() {
2828        let route = RouteBuilder::from("direct:start")
2829            .route_id("stream-cache-default-test")
2830            .stream_cache_default()
2831            .build()
2832            .unwrap();
2833
2834        assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
2835    }
2836
2837    #[test]
2838    fn test_validate_preserves_existing_validator_prefix() {
2839        let route = RouteBuilder::from("direct:in")
2840            .route_id("validate-prefix-test")
2841            .validate("validator:schemas/order.xsd")
2842            .build()
2843            .unwrap();
2844
2845        assert!(matches!(
2846            &route.steps()[0],
2847            BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
2848        ));
2849    }
2850
2851    #[test]
2852    fn test_load_balance_builder_weighted_failover_parallel_config() {
2853        let route = RouteBuilder::from("direct:start")
2854            .route_id("lb-weighted-failover-parallel")
2855            .load_balance()
2856            .weighted(vec![
2857                ("direct:a".to_string(), 3),
2858                ("direct:b".to_string(), 1),
2859            ])
2860            .failover()
2861            .parallel(true)
2862            .to("mock:result")
2863            .end_load_balance()
2864            .build()
2865            .unwrap();
2866
2867        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
2868            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
2869            assert!(config.parallel);
2870        } else {
2871            panic!("Expected LoadBalance step");
2872        }
2873    }
2874
2875    #[test]
2876    fn test_multicast_builder_all_config_setters() {
2877        let route = RouteBuilder::from("direct:start")
2878            .route_id("multicast-config-test")
2879            .multicast()
2880            .parallel(true)
2881            .parallel_limit(4)
2882            .stop_on_exception(true)
2883            .timeout(Duration::from_millis(300))
2884            .aggregation(MulticastStrategy::Original)
2885            .to("mock:a")
2886            .end_multicast()
2887            .build()
2888            .unwrap();
2889
2890        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
2891            assert!(config.parallel);
2892            assert_eq!(config.parallel_limit, Some(4));
2893            assert!(config.stop_on_exception);
2894            assert_eq!(config.timeout, Some(Duration::from_millis(300)));
2895            assert!(matches!(config.aggregation, MulticastStrategy::Original));
2896        } else {
2897            panic!("Expected Multicast step");
2898        }
2899    }
2900
2901    #[test]
2902    fn test_build_canonical_rejects_unsupported_processor_step() {
2903        let err = RouteBuilder::from("direct:start")
2904            .route_id("canonical-reject")
2905            .set_header("k", Value::String("v".into()))
2906            .build_canonical()
2907            .unwrap_err();
2908
2909        assert!(format!("{err}").contains("does not support step `processor`"));
2910    }
2911}