Skip to main content

camel_builder/
lib.rs

1//! Fluent builder API for constructing Camel routes programmatically with EIP patterns.
2//!
3//! Main types: `RouteBuilder`, `StepAccumulator`, `SplitBuilder`, `ChoiceBuilder`, `MulticastBuilder`,
4//! `ThrottleBuilder`, `LoopBuilder`, `LoadBalancerBuilder`, `OnExceptionBuilder`.
5
6use camel_api::DelayConfig;
7use camel_api::aggregator::{
8    AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
9};
10use camel_api::body::Body;
11use camel_api::body_converter::BodyType;
12use camel_api::circuit_breaker::CircuitBreakerConfig;
13use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
14use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
15use camel_api::load_balancer::LoadBalancerConfig;
16use camel_api::loop_eip::{LoopConfig, LoopMode};
17use camel_api::multicast::{MulticastConfig, MulticastStrategy};
18use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
19use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
20use camel_api::splitter::SplitterConfig;
21use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
22use camel_api::{
23    BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
24    ProcessorFn, Value,
25    runtime::{
26        CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
27        CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
28        CanonicalWhenSpec,
29    },
30};
31use camel_component_api::ConcurrencyModel;
32use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
33use camel_processor::{
34    ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
35    StreamCacheService, UnmarshalService, builtin_data_format,
36};
37
38/// Shared step-accumulation methods for all builder types.
39///
40/// Implementors provide `steps_mut()` and get step-adding methods for free.
41/// `filter()` and other branching methods are NOT included — they return
42/// different types per builder and stay as per-builder methods.
43pub trait StepAccumulator: Sized {
44    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
45
46    fn to(mut self, endpoint: impl Into<String>) -> Self {
47        self.steps_mut().push(BuilderStep::To(endpoint.into()));
48        self
49    }
50
51    fn process<F, Fut>(mut self, f: F) -> Self
52    where
53        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
54        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
55    {
56        let svc = ProcessorFn::new(f);
57        self.steps_mut()
58            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
59        self
60    }
61
62    fn process_fn(mut self, processor: BoxProcessor) -> Self {
63        self.steps_mut().push(BuilderStep::Processor(processor));
64        self
65    }
66
67    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
68        let svc = SetHeader::new(IdentityProcessor, key, value);
69        self.steps_mut()
70            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
71        self
72    }
73
74    fn map_body<F>(mut self, mapper: F) -> Self
75    where
76        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
77    {
78        let svc = MapBody::new(IdentityProcessor, mapper);
79        self.steps_mut()
80            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
81        self
82    }
83
84    fn set_body<B>(mut self, body: B) -> Self
85    where
86        B: Into<Body> + Clone + Send + Sync + 'static,
87    {
88        let body: Body = body.into();
89        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
90        self.steps_mut()
91            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
92        self
93    }
94
95    /// Apache Camel-compatible alias for [`set_body`](Self::set_body).
96    ///
97    /// Transforms the message body using the given value. Semantically identical
98    /// to `set_body` — provided for familiarity with Apache Camel route DSLs.
99    fn transform<B>(self, body: B) -> Self
100    where
101        B: Into<Body> + Clone + Send + Sync + 'static,
102    {
103        self.set_body(body)
104    }
105
106    fn set_body_fn<F>(mut self, expr: F) -> Self
107    where
108        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
109    {
110        let svc = SetBody::new(IdentityProcessor, expr);
111        self.steps_mut()
112            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
113        self
114    }
115
116    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
117    where
118        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
119    {
120        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
121        self.steps_mut()
122            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
123        self
124    }
125
126    fn aggregate(mut self, config: AggregatorConfig) -> Self {
127        self.steps_mut().push(BuilderStep::Aggregate { config });
128        self
129    }
130
131    /// Stop processing this exchange immediately. No further steps in the
132    /// current pipeline will run.
133    ///
134    /// Can be used at any point in the route: directly on RouteBuilder,
135    /// inside `.filter()`, inside `.split()`, etc.
136    fn stop(mut self) -> Self {
137        self.steps_mut().push(BuilderStep::Stop);
138        self
139    }
140
141    fn delay(mut self, duration: std::time::Duration) -> Self {
142        self.steps_mut().push(BuilderStep::Delay {
143            config: DelayConfig::from_duration(duration),
144        });
145        self
146    }
147
148    fn delay_with_header(
149        mut self,
150        duration: std::time::Duration,
151        header: impl Into<String>,
152    ) -> Self {
153        self.steps_mut().push(BuilderStep::Delay {
154            config: DelayConfig::from_duration_with_header(duration, header),
155        });
156        self
157    }
158
159    /// Log a message at the specified level.
160    ///
161    /// The message will be logged when an exchange passes through this step.
162    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
163        self.steps_mut().push(BuilderStep::Log {
164            level,
165            message: message.into(),
166        });
167        self
168    }
169
170    /// Convert the message body to the target type.
171    ///
172    /// Supported: Text ↔ Json ↔ Bytes. `Body::Stream` always fails.
173    /// Returns `TypeConversionFailed` if conversion is not possible.
174    ///
175    /// # Example
176    /// ```ignore
177    /// route.set_body(Value::String(r#"{"x":1}"#.into()))
178    ///      .convert_body_to(BodyType::Json)
179    ///      .to("direct:next")
180    /// ```
181    fn convert_body_to(mut self, target: BodyType) -> Self {
182        let svc = ConvertBodyTo::new(IdentityProcessor, target);
183        self.steps_mut()
184            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
185        self
186    }
187
188    fn stream_cache(mut self, threshold: usize) -> Self {
189        let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
190        let svc = StreamCacheService::new(IdentityProcessor, config);
191        self.steps_mut()
192            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
193        self
194    }
195
196    /// Materialize `Body::Stream` into `Body::Bytes` using the default threshold (128 KB).
197    ///
198    /// Equivalent to `.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)`.
199    fn stream_cache_default(self) -> Self {
200        self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
201    }
202
203    /// Marshal the message body using the specified data format.
204    ///
205    /// Supported formats: `"json"`, `"xml"`. Returns `Err(CamelError::Config)` if
206    /// the format name is unknown.
207    /// Converts a structured body (e.g., `Body::Json`) to a wire-format body (e.g., `Body::Text`).
208    ///
209    /// # Example
210    /// ```ignore
211    /// route.marshal("json")?.to("direct:next")
212    /// ```
213    fn marshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
214        let name = format.into();
215        let df = builtin_data_format(&name)
216            .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
217        let svc = MarshalService::new(IdentityProcessor, df);
218        self.steps_mut()
219            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
220        Ok(self)
221    }
222
223    /// Unmarshal the message body using the specified data format.
224    ///
225    /// Supported formats: `"json"`, `"xml"`. Returns `Err(CamelError::Config)` if
226    /// the format name is unknown.
227    /// Converts a wire-format body (e.g., `Body::Text`) to a structured body (e.g., `Body::Json`).
228    ///
229    /// # Example
230    /// ```ignore
231    /// route.unmarshal("json")?.to("direct:next")
232    /// ```
233    fn unmarshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
234        let name = format.into();
235        let df = builtin_data_format(&name)
236            .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
237        let svc = UnmarshalService::new(IdentityProcessor, df);
238        self.steps_mut()
239            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
240        Ok(self)
241    }
242
243    /// Validate the exchange body against a schema file.
244    ///
245    /// Shorthand for `to("validator:path")`. Supports XSD, JSON Schema, and YAML.
246    ///
247    /// # Example
248    /// ```ignore
249    /// route.validate("schemas/order.xsd").to("direct:out")
250    /// ```
251    fn validate(mut self, schema_path: impl Into<String>) -> Self {
252        let path = schema_path.into();
253        let uri = if path.starts_with("validator:") {
254            path
255        } else {
256            format!("validator:{path}")
257        };
258        self.steps_mut().push(BuilderStep::To(uri));
259        self
260    }
261
262    /// Execute a script that can modify the exchange (headers, properties, body).
263    ///
264    /// The script has access to `headers`, `properties`, and `body` variables
265    /// and can modify them with assignment syntax: `headers["k"] = v`.
266    ///
267    /// # Example
268    /// ```ignore
269    /// // ignore: requires full CamelContext setup with registered language
270    /// route.script("rhai", r#"headers["tenant"] = "acme"; body = body + "_processed""#)
271    /// ```
272    fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
273        self.steps_mut().push(BuilderStep::Script {
274            language: language.into(),
275            script: script.into(),
276        });
277        self
278    }
279
280    fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
281        self.steps_mut().push(BuilderStep::Bean {
282            name: name.into(),
283            method: method.into(),
284        });
285        self
286    }
287}
288
289/// A fluent builder for constructing routes.
290///
291/// # Example
292///
293/// ```ignore
294/// let definition = RouteBuilder::from("timer:tick?period=1000")
295///     .set_header("source", Value::String("timer".into()))
296///     .filter(|ex| ex.input.body.as_text().is_some())
297///     .to("log:info?showHeaders=true")
298///     .build()?;
299/// ```
300// TODO(BUILDER-003): RouteBuilder does not support clone-and-reuse semantics.
301// Once built, the builder is consumed. Add `Clone` or a `fork()` method to allow
302// reusing a partially-built route as a template for multiple routes.
303pub struct RouteBuilder {
304    from_uri: String,
305    steps: Vec<BuilderStep>,
306    error_handler: Option<ErrorHandlerConfig>,
307    error_handler_mode: ErrorHandlerMode,
308    circuit_breaker_config: Option<CircuitBreakerConfig>,
309    concurrency: Option<ConcurrencyModel>,
310    route_id: Option<String>,
311    auto_startup: Option<bool>,
312    startup_order: Option<i32>,
313}
314
315#[derive(Default)]
316enum ErrorHandlerMode {
317    #[default]
318    None,
319    ExplicitConfig,
320    Shorthand {
321        dlc_uri: Option<String>,
322        specs: Vec<OnExceptionSpec>,
323    },
324    Mixed,
325}
326
327#[derive(Clone)]
328struct OnExceptionSpec {
329    matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
330    retry: Option<RedeliveryPolicy>,
331    handled_by: Option<String>,
332}
333
334impl RouteBuilder {
335    /// Start building a route from the given source endpoint URI.
336    pub fn from(endpoint: &str) -> Self {
337        Self {
338            from_uri: endpoint.to_string(),
339            steps: Vec::new(),
340            error_handler: None,
341            error_handler_mode: ErrorHandlerMode::None,
342            circuit_breaker_config: None,
343            concurrency: None,
344            route_id: None,
345            auto_startup: None,
346            startup_order: None,
347        }
348    }
349
350    /// Open a filter scope. Only exchanges matching `predicate` will be processed
351    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
352    /// and continue to steps after `.end_filter()`.
353    pub fn filter<F>(self, predicate: F) -> FilterBuilder
354    where
355        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
356    {
357        FilterBuilder {
358            parent: self,
359            predicate: std::sync::Arc::new(predicate),
360            steps: vec![],
361        }
362    }
363
364    /// Open a choice scope for content-based routing.
365    ///
366    /// Within the choice, you can define multiple `.when()` clauses and an
367    /// optional `.otherwise()` clause. The first matching `when` predicate
368    /// determines which sub-pipeline executes.
369    pub fn choice(self) -> ChoiceBuilder {
370        ChoiceBuilder {
371            parent: self,
372            whens: vec![],
373            _otherwise: None,
374        }
375    }
376
377    /// Add a WireTap step that sends a clone of the exchange to the given
378    /// endpoint URI (fire-and-forget). The original exchange continues
379    /// downstream unchanged.
380    pub fn wire_tap(mut self, endpoint: &str) -> Self {
381        self.steps.push(BuilderStep::WireTap {
382            uri: endpoint.to_string(),
383        });
384        self
385    }
386
387    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
388    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
389        self.error_handler_mode = match self.error_handler_mode {
390            ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
391                ErrorHandlerMode::ExplicitConfig
392            }
393            ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
394        };
395        self.error_handler = Some(config);
396        self
397    }
398
399    /// Set a dead letter channel URI for shorthand error handler mode.
400    pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
401        let uri = uri.into();
402        self.error_handler_mode = match self.error_handler_mode {
403            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
404                dlc_uri: Some(uri),
405                specs: Vec::new(),
406            },
407            ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
408                dlc_uri: Some(uri),
409                specs,
410            },
411            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
412        };
413        self
414    }
415
416    /// Add a shorthand exception policy scope. Call `.end_on_exception()` to return to route builder.
417    pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
418    where
419        F: Fn(&CamelError) -> bool + Send + Sync + 'static,
420    {
421        self.error_handler_mode = match self.error_handler_mode {
422            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
423                dlc_uri: None,
424                specs: Vec::new(),
425            },
426            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
427            shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
428        };
429
430        OnExceptionBuilder {
431            parent: self,
432            policy: OnExceptionSpec {
433                matches: std::sync::Arc::new(matches),
434                retry: None,
435                handled_by: None,
436            },
437        }
438    }
439
440    /// Set a circuit breaker for this route.
441    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
442        self.circuit_breaker_config = Some(config);
443        self
444    }
445
446    /// Override the consumer's default concurrency model.
447    ///
448    /// When set, the pipeline spawns a task per exchange, processing them
449    /// concurrently. `max` limits the number of simultaneously active
450    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
451    ///
452    /// # Example
453    /// ```ignore
454    /// RouteBuilder::from("http://0.0.0.0:8080/api")
455    ///     .concurrent(16)  // max 16 in-flight pipeline executions
456    ///     .process(handle_request)
457    ///     .build()
458    /// ```
459    pub fn concurrent(mut self, max: usize) -> Self {
460        let max = if max == 0 { None } else { Some(max) };
461        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
462        self
463    }
464
465    /// Force sequential processing, overriding a concurrent-capable consumer.
466    ///
467    /// Useful for HTTP routes that mutate shared state and need ordering
468    /// guarantees.
469    pub fn sequential(mut self) -> Self {
470        self.concurrency = Some(ConcurrencyModel::Sequential);
471        self
472    }
473
474    /// Set the route ID for this route.
475    ///
476    /// If not set, the route will be assigned an auto-generated ID.
477    pub fn route_id(mut self, id: impl Into<String>) -> Self {
478        self.route_id = Some(id.into());
479        self
480    }
481
482    /// Set whether this route should automatically start when the context starts.
483    ///
484    /// Default is `true`.
485    pub fn auto_startup(mut self, auto: bool) -> Self {
486        self.auto_startup = Some(auto);
487        self
488    }
489
490    /// Set the startup order for this route.
491    ///
492    /// Routes with lower values start first. Default is 1000.
493    pub fn startup_order(mut self, order: i32) -> Self {
494        self.startup_order = Some(order);
495        self
496    }
497
498    /// Begin a Splitter sub-pipeline. Steps added after this call (until
499    /// `.end_split()`) will be executed per-fragment.
500    ///
501    /// Returns a `SplitBuilder` — you cannot call `.build()` until
502    /// `.end_split()` closes the split scope (enforced by the type system).
503    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
504        SplitBuilder {
505            parent: self,
506            config,
507            steps: Vec::new(),
508        }
509    }
510
511    /// Begin a Multicast sub-pipeline. Steps added after this call (until
512    /// `.end_multicast()`) will each receive a copy of the exchange.
513    ///
514    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
515    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
516    pub fn multicast(self) -> MulticastBuilder {
517        MulticastBuilder {
518            parent: self,
519            steps: Vec::new(),
520            config: MulticastConfig::new(),
521        }
522    }
523
524    /// Begin a Throttle sub-pipeline. Rate limits message processing to at most
525    /// `max_requests` per `period`. Steps inside the throttle scope are only
526    /// executed when the rate limit allows.
527    ///
528    /// Returns a `ThrottleBuilder` — you cannot call `.build()` until
529    /// `.end_throttle()` closes the throttle scope (enforced by the type system).
530    pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
531        ThrottleBuilder {
532            parent: self,
533            config: ThrottlerConfig::new(max_requests, period),
534            steps: Vec::new(),
535        }
536    }
537
538    /// Begin a Loop sub-pipeline that iterates a fixed number of times.
539    pub fn loop_count(self, count: usize) -> LoopBuilder {
540        LoopBuilder {
541            parent: self,
542            config: LoopConfig {
543                mode: LoopMode::Count(count),
544            },
545            steps: vec![],
546        }
547    }
548
549    /// Begin a Loop sub-pipeline that iterates while a predicate is true.
550    pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
551    where
552        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
553    {
554        LoopBuilder {
555            parent: self,
556            config: LoopConfig {
557                mode: LoopMode::While(std::sync::Arc::new(predicate)),
558            },
559            steps: vec![],
560        }
561    }
562
563    /// Begin a LoadBalance sub-pipeline. Distributes exchanges across multiple
564    /// endpoints using a configurable strategy (round-robin, random, weighted, failover).
565    ///
566    /// Returns a `LoadBalancerBuilder` — you cannot call `.build()` until
567    /// `.end_load_balance()` closes the load balance scope (enforced by the type system).
568    pub fn load_balance(self) -> LoadBalancerBuilder {
569        LoadBalancerBuilder {
570            parent: self,
571            config: LoadBalancerConfig::round_robin(),
572            steps: Vec::new(),
573        }
574    }
575
576    /// Add a dynamic router step that routes exchanges dynamically based on
577    /// expression evaluation at runtime.
578    ///
579    /// The expression receives the exchange and returns `Some(uri)` to route to
580    /// the next endpoint, or `None` to stop routing.
581    ///
582    /// # Example
583    /// ```ignore
584    /// RouteBuilder::from("timer:tick")
585    ///     .route_id("test-route")
586    ///     .dynamic_router(|ex| {
587    ///         ex.input.header("dest").and_then(|v| v.as_str().map(|s| s.to_string()))
588    ///     })
589    ///     .build()
590    /// ```
591    pub fn dynamic_router(self, expression: RouterExpression) -> Self {
592        self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
593    }
594
595    /// Add a dynamic router step with full configuration.
596    ///
597    /// Allows customization of URI delimiter, cache size, timeout, and other options.
598    pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
599        self.steps.push(BuilderStep::DynamicRouter { config });
600        self
601    }
602
603    pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
604        self.routing_slip_with_config(RoutingSlipConfig::new(expression))
605    }
606
607    pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
608        self.steps.push(BuilderStep::RoutingSlip { config });
609        self
610    }
611
612    pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
613        self.recipient_list_with_config(RecipientListConfig::new(expression))
614    }
615
616    pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
617        self.steps.push(BuilderStep::RecipientList { config });
618        self
619    }
620
621    /// Consume the builder and produce a [`RouteDefinition`].
622    // TODO(BUILDER-006): Validate duplicate route IDs. When a route with the same
623    // ID is already registered in the context, return `Err(CamelError::RouteError)`.
624    // Currently, duplicate IDs are silently accepted; detection should happen at
625    // `CamelContext::add_route_definition` time.
626    pub fn build(self) -> Result<RouteDefinition, CamelError> {
627        validate_uri(&self.from_uri)?;
628        let route_id = self
629            .route_id
630            .filter(|s| !s.trim().is_empty())
631            .ok_or_else(|| {
632                CamelError::RouteError(
633                    "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
634                        .to_string(),
635                )
636            })?;
637        let resolved_error_handler = match self.error_handler_mode {
638            ErrorHandlerMode::None => self.error_handler,
639            ErrorHandlerMode::ExplicitConfig => self.error_handler,
640            ErrorHandlerMode::Mixed => {
641                return Err(CamelError::RouteError(
642                    "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
643                ));
644            }
645            ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
646                let mut config = if let Some(uri) = dlc_uri {
647                    ErrorHandlerConfig::dead_letter_channel(uri)
648                } else {
649                    ErrorHandlerConfig::log_only()
650                };
651
652                for spec in specs {
653                    let matcher = spec.matches.clone();
654                    let mut builder = config.on_exception(move |e| matcher(e));
655
656                    if let Some(retry) = spec.retry {
657                        builder = builder.retry(retry.max_attempts).with_backoff(
658                            retry.initial_delay,
659                            retry.multiplier,
660                            retry.max_delay,
661                        );
662                        if retry.jitter_factor > 0.0 {
663                            builder = builder.with_jitter(retry.jitter_factor);
664                        }
665                    }
666
667                    if let Some(uri) = spec.handled_by {
668                        builder = builder.handled_by(uri);
669                    }
670
671                    config = builder.build();
672                }
673
674                Some(config)
675            }
676        };
677
678        let definition = RouteDefinition::new(self.from_uri, self.steps);
679        let definition = if let Some(eh) = resolved_error_handler {
680            definition.with_error_handler(eh)
681        } else {
682            definition
683        };
684        let definition = if let Some(cb) = self.circuit_breaker_config {
685            definition.with_circuit_breaker(cb)
686        } else {
687            definition
688        };
689        let definition = if let Some(concurrency) = self.concurrency {
690            definition.with_concurrency(concurrency)
691        } else {
692            definition
693        };
694        let definition = definition.with_route_id(route_id);
695        let definition = if let Some(auto) = self.auto_startup {
696            definition.with_auto_startup(auto)
697        } else {
698            definition
699        };
700        let definition = if let Some(order) = self.startup_order {
701            definition.with_startup_order(order)
702        } else {
703            definition
704        };
705        Ok(definition)
706    }
707
708    /// Compile this builder route into canonical v1 spec.
709    pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
710        validate_uri(&self.from_uri)?;
711        let route_id = self
712            .route_id
713            .filter(|s| !s.trim().is_empty())
714            .ok_or_else(|| {
715                CamelError::RouteError(
716                    "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
717                        .to_string(),
718                )
719            })?;
720
721        let steps = canonicalize_steps(self.steps)?;
722        let circuit_breaker = self
723            .circuit_breaker_config
724            .map(canonicalize_circuit_breaker);
725
726        let spec = CanonicalRouteSpec {
727            route_id,
728            from: self.from_uri,
729            steps,
730            circuit_breaker,
731            version: camel_api::CANONICAL_CONTRACT_VERSION,
732        };
733        spec.validate_contract()?;
734        Ok(spec)
735    }
736}
737
738pub struct OnExceptionBuilder {
739    parent: RouteBuilder,
740    policy: OnExceptionSpec,
741}
742
743impl OnExceptionBuilder {
744    pub fn retry(mut self, max_attempts: u32) -> Self {
745        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
746        self
747    }
748
749    pub fn with_backoff(
750        mut self,
751        initial: std::time::Duration,
752        multiplier: f64,
753        max: std::time::Duration,
754    ) -> Self {
755        if let Some(ref mut retry) = self.policy.retry {
756            retry.initial_delay = initial;
757            retry.multiplier = multiplier;
758            retry.max_delay = max;
759        } else {
760            tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
761        }
762        self
763    }
764
765    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
766        if let Some(ref mut retry) = self.policy.retry {
767            retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
768        } else {
769            tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
770        }
771        self
772    }
773
774    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
775        self.policy.handled_by = Some(uri.into());
776        self
777    }
778
779    pub fn end_on_exception(mut self) -> RouteBuilder {
780        if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
781            specs.push(self.policy);
782        }
783        self.parent
784    }
785}
786
787/// Validate that a URI is non-empty and contains a scheme component.
788fn validate_uri(uri: &str) -> Result<(), CamelError> {
789    let trimmed = uri.trim();
790    if trimmed.is_empty() {
791        return Err(CamelError::RouteError(
792            "route must have a 'from' URI".to_string(),
793        ));
794    }
795    if !trimmed.contains(':') {
796        return Err(CamelError::RouteError(
797            "URI must have a scheme (e.g. 'timer:tick')".to_string(),
798        ));
799    }
800    let scheme = trimmed.split(':').next().unwrap_or("");
801    if scheme.trim().is_empty() {
802        return Err(CamelError::RouteError(
803            "URI scheme must not be empty".to_string(),
804        ));
805    }
806    Ok(())
807}
808
809fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
810    let mut canonical = Vec::with_capacity(steps.len());
811    for step in steps {
812        canonical.push(canonicalize_step(step)?);
813    }
814    Ok(canonical)
815}
816
817fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
818    match step {
819        BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
820        BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
821        BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
822        BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
823        BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
824            delay_ms: config.delay_ms,
825            dynamic_header: config.dynamic_header,
826        }),
827        BuilderStep::DeclarativeScript { expression } => {
828            Ok(CanonicalStepSpec::Script { expression })
829        }
830        BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
831            predicate,
832            steps: canonicalize_steps(steps)?,
833        }),
834        BuilderStep::DeclarativeChoice { whens, otherwise } => {
835            let mut canonical_whens = Vec::with_capacity(whens.len());
836            for DeclarativeWhenStep { predicate, steps } in whens {
837                canonical_whens.push(CanonicalWhenSpec {
838                    predicate,
839                    steps: canonicalize_steps(steps)?,
840                });
841            }
842            let otherwise = match otherwise {
843                Some(steps) => Some(canonicalize_steps(steps)?),
844                None => None,
845            };
846            Ok(CanonicalStepSpec::Choice {
847                whens: canonical_whens,
848                otherwise,
849            })
850        }
851        BuilderStep::DeclarativeSplit {
852            expression,
853            aggregation,
854            parallel,
855            parallel_limit,
856            stop_on_exception,
857            steps,
858        } => Ok(CanonicalStepSpec::Split {
859            expression: CanonicalSplitExpressionSpec::Language(expression),
860            aggregation: canonicalize_split_aggregation(aggregation)?,
861            parallel,
862            parallel_limit,
863            stop_on_exception,
864            steps: canonicalize_steps(steps)?,
865        }),
866        BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate(
867            canonicalize_aggregate(config)?,
868        )),
869        other => {
870            let step_name = canonical_step_name(&other);
871            let detail = camel_api::canonical_contract_rejection_reason(step_name)
872                .unwrap_or("not included in canonical v1");
873            Err(CamelError::RouteError(format!(
874                "canonical v1 does not support step `{step_name}`: {detail}"
875            )))
876        }
877    }
878}
879
880fn canonicalize_split_aggregation(
881    strategy: camel_api::splitter::AggregationStrategy,
882) -> Result<CanonicalSplitAggregationSpec, CamelError> {
883    match strategy {
884        camel_api::splitter::AggregationStrategy::LastWins => {
885            Ok(CanonicalSplitAggregationSpec::LastWins)
886        }
887        camel_api::splitter::AggregationStrategy::CollectAll => {
888            Ok(CanonicalSplitAggregationSpec::CollectAll)
889        }
890        camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
891            "canonical v1 does not support custom split aggregation".to_string(),
892        )),
893        camel_api::splitter::AggregationStrategy::Original => {
894            Ok(CanonicalSplitAggregationSpec::Original)
895        }
896    }
897}
898
899fn extract_completion_fields(
900    mode: &CompletionMode,
901) -> Result<(Option<usize>, Option<u64>), CamelError> {
902    match mode {
903        CompletionMode::Single(cond) => match cond {
904            CompletionCondition::Size(n) => Ok((Some(*n), None)),
905            CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
906            CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
907                "canonical v1 does not support aggregate predicate completion".to_string(),
908            )),
909        },
910        CompletionMode::Any(conds) => {
911            let mut size = None;
912            let mut timeout_ms = None;
913            for cond in conds {
914                match cond {
915                    CompletionCondition::Size(n) => size = Some(*n),
916                    CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
917                    CompletionCondition::Predicate(_) => {
918                        return Err(CamelError::RouteError(
919                            "canonical v1 does not support aggregate predicate completion"
920                                .to_string(),
921                        ));
922                    }
923                }
924            }
925            Ok((size, timeout_ms))
926        }
927    }
928}
929
930fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
931    let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
932
933    let header = match &config.correlation {
934        CorrelationStrategy::HeaderName(h) => h.clone(),
935        CorrelationStrategy::Expression { expr, .. } => expr.clone(),
936        CorrelationStrategy::Fn(_) => {
937            return Err(CamelError::RouteError(
938                "canonical v1 does not support Fn correlation strategy".to_string(),
939            ));
940        }
941    };
942
943    let correlation_key = match &config.correlation {
944        CorrelationStrategy::HeaderName(_) => None,
945        CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
946        CorrelationStrategy::Fn(_) => unreachable!(),
947    };
948
949    let strategy = match config.strategy {
950        AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
951        AggregationStrategy::Custom(_) => {
952            return Err(CamelError::RouteError(
953                "canonical v1 does not support custom aggregate strategy".to_string(),
954            ));
955        }
956    };
957    let bucket_ttl_ms = config
958        .bucket_ttl
959        .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
960
961    Ok(CanonicalAggregateSpec {
962        header,
963        completion_size,
964        completion_timeout_ms,
965        correlation_key,
966        force_completion_on_stop: if config.force_completion_on_stop {
967            Some(true)
968        } else {
969            None
970        },
971        discard_on_timeout: if config.discard_on_timeout {
972            Some(true)
973        } else {
974            None
975        },
976        strategy,
977        max_buckets: config.max_buckets,
978        bucket_ttl_ms,
979    })
980}
981
982fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
983    CanonicalCircuitBreakerSpec {
984        failure_threshold: config.failure_threshold,
985        open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
986    }
987}
988
989fn canonical_step_name(step: &BuilderStep) -> &'static str {
990    match step {
991        BuilderStep::Processor(_) => "processor",
992        BuilderStep::To(_) => "to",
993        BuilderStep::Stop => "stop",
994        BuilderStep::Log { .. } => "log",
995        BuilderStep::DeclarativeSetHeader { .. } => "set_header",
996        BuilderStep::DeclarativeSetBody { .. } => "set_body",
997        BuilderStep::DeclarativeFilter { .. } => "filter",
998        BuilderStep::DeclarativeChoice { .. } => "choice",
999        BuilderStep::DeclarativeScript { .. } => "script",
1000        BuilderStep::DeclarativeFunction { .. } => "function",
1001        BuilderStep::DeclarativeSplit { .. } => "split",
1002        BuilderStep::Split { .. } => "split",
1003        BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
1004        BuilderStep::Aggregate { .. } => "aggregate",
1005        BuilderStep::Filter { .. } => "filter",
1006        BuilderStep::Choice { .. } => "choice",
1007        BuilderStep::WireTap { .. } => "wire_tap",
1008        BuilderStep::Delay { .. } => "delay",
1009        BuilderStep::Multicast { .. } => "multicast",
1010        BuilderStep::DeclarativeLog { .. } => "log",
1011        BuilderStep::Bean { .. } => "bean",
1012        BuilderStep::Script { .. } => "script",
1013        BuilderStep::Throttle { .. } => "throttle",
1014        BuilderStep::LoadBalance { .. } => "load_balancer",
1015        BuilderStep::DynamicRouter { .. } => "dynamic_router",
1016        BuilderStep::RoutingSlip { .. } => "routing_slip",
1017        BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
1018        BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
1019        BuilderStep::RecipientList { .. } => "recipient_list",
1020        BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
1021        BuilderStep::DeclarativeSetProperty { .. } => "set_property",
1022    }
1023}
1024
1025impl StepAccumulator for RouteBuilder {
1026    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1027        &mut self.steps
1028    }
1029}
1030
1031/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
1032///
1033/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1034/// but NOT `.build()` and NOT `.split()` (no nested splits).
1035///
1036/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
1037/// and returns the parent `RouteBuilder`.
1038pub struct SplitBuilder {
1039    parent: RouteBuilder,
1040    config: SplitterConfig,
1041    steps: Vec<BuilderStep>,
1042}
1043
1044impl SplitBuilder {
1045    /// Open a filter scope within the split sub-pipeline.
1046    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1047    where
1048        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1049    {
1050        FilterInSplitBuilder {
1051            parent: self,
1052            predicate: std::sync::Arc::new(predicate),
1053            steps: vec![],
1054        }
1055    }
1056
1057    /// Close the split scope. Packages the accumulated sub-steps into a
1058    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
1059    pub fn end_split(mut self) -> RouteBuilder {
1060        let split_step = BuilderStep::Split {
1061            config: self.config,
1062            steps: self.steps,
1063        };
1064        self.parent.steps.push(split_step);
1065        self.parent
1066    }
1067}
1068
1069impl StepAccumulator for SplitBuilder {
1070    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1071        &mut self.steps
1072    }
1073}
1074
1075/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
1076pub struct FilterBuilder {
1077    parent: RouteBuilder,
1078    predicate: FilterPredicate,
1079    steps: Vec<BuilderStep>,
1080}
1081
1082impl FilterBuilder {
1083    /// Close the filter scope. Packages the accumulated sub-steps into a
1084    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
1085    pub fn end_filter(mut self) -> RouteBuilder {
1086        let step = BuilderStep::Filter {
1087            predicate: self.predicate,
1088            steps: self.steps,
1089        };
1090        self.parent.steps.push(step);
1091        self.parent
1092    }
1093}
1094
1095impl StepAccumulator for FilterBuilder {
1096    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1097        &mut self.steps
1098    }
1099}
1100
1101/// Builder for a filter scope nested inside a `.split()` block.
1102pub struct FilterInSplitBuilder {
1103    parent: SplitBuilder,
1104    predicate: FilterPredicate,
1105    steps: Vec<BuilderStep>,
1106}
1107
1108impl FilterInSplitBuilder {
1109    /// Close the filter scope and return the parent `SplitBuilder`.
1110    pub fn end_filter(mut self) -> SplitBuilder {
1111        let step = BuilderStep::Filter {
1112            predicate: self.predicate,
1113            steps: self.steps,
1114        };
1115        self.parent.steps.push(step);
1116        self.parent
1117    }
1118}
1119
1120impl StepAccumulator for FilterInSplitBuilder {
1121    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1122        &mut self.steps
1123    }
1124}
1125
1126// ── Choice/When/Otherwise builders ─────────────────────────────────────────
1127
1128/// Builder for a `.choice()` ... `.end_choice()` block.
1129///
1130/// Accumulates `when` clauses and an optional `otherwise` clause.
1131/// Cannot call `.build()` until `.end_choice()` is called.
1132pub struct ChoiceBuilder {
1133    parent: RouteBuilder,
1134    whens: Vec<WhenStep>,
1135    _otherwise: Option<Vec<BuilderStep>>,
1136}
1137
1138impl ChoiceBuilder {
1139    /// Open a `when` clause. Only exchanges matching `predicate` will be
1140    /// processed by the steps inside the `.when()` ... `.end_when()` scope.
1141    pub fn when<F>(self, predicate: F) -> WhenBuilder
1142    where
1143        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1144    {
1145        WhenBuilder {
1146            parent: self,
1147            predicate: std::sync::Arc::new(predicate),
1148            steps: vec![],
1149        }
1150    }
1151
1152    /// Open an `otherwise` clause. Executed when no `when` predicate matched.
1153    ///
1154    /// Only one `otherwise` is allowed per `choice`. Call this after all `.when()` clauses.
1155    pub fn otherwise(self) -> OtherwiseBuilder {
1156        OtherwiseBuilder {
1157            parent: self,
1158            steps: vec![],
1159        }
1160    }
1161
1162    /// Close the choice scope. Packages all accumulated `when` clauses and
1163    /// optional `otherwise` into a `BuilderStep::Choice` and returns the
1164    /// parent `RouteBuilder`.
1165    pub fn end_choice(mut self) -> RouteBuilder {
1166        let step = BuilderStep::Choice {
1167            whens: self.whens,
1168            otherwise: self._otherwise,
1169        };
1170        self.parent.steps.push(step);
1171        self.parent
1172    }
1173}
1174
1175/// Builder for the sub-pipeline within a `.when()` ... `.end_when()` block.
1176pub struct WhenBuilder {
1177    parent: ChoiceBuilder,
1178    predicate: camel_api::FilterPredicate,
1179    steps: Vec<BuilderStep>,
1180}
1181
1182impl WhenBuilder {
1183    /// Close the when scope. Packages the accumulated sub-steps into a
1184    /// `WhenStep` and returns the parent `ChoiceBuilder`.
1185    pub fn end_when(mut self) -> ChoiceBuilder {
1186        self.parent.whens.push(WhenStep {
1187            predicate: self.predicate,
1188            steps: self.steps,
1189        });
1190        self.parent
1191    }
1192}
1193
1194impl StepAccumulator for WhenBuilder {
1195    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1196        &mut self.steps
1197    }
1198}
1199
1200/// Builder for the sub-pipeline within an `.otherwise()` ... `.end_otherwise()` block.
1201pub struct OtherwiseBuilder {
1202    parent: ChoiceBuilder,
1203    steps: Vec<BuilderStep>,
1204}
1205
1206impl OtherwiseBuilder {
1207    /// Close the otherwise scope and return the parent `ChoiceBuilder`.
1208    pub fn end_otherwise(self) -> ChoiceBuilder {
1209        let OtherwiseBuilder { mut parent, steps } = self;
1210        parent._otherwise = Some(steps);
1211        parent
1212    }
1213}
1214
1215impl StepAccumulator for OtherwiseBuilder {
1216    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1217        &mut self.steps
1218    }
1219}
1220
1221/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
1222///
1223/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1224/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
1225///
1226/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
1227/// and returns the parent `RouteBuilder`.
1228pub struct MulticastBuilder {
1229    parent: RouteBuilder,
1230    steps: Vec<BuilderStep>,
1231    config: MulticastConfig,
1232}
1233
1234impl MulticastBuilder {
1235    pub fn parallel(mut self, parallel: bool) -> Self {
1236        self.config = self.config.parallel(parallel);
1237        self
1238    }
1239
1240    pub fn parallel_limit(mut self, limit: usize) -> Self {
1241        self.config = self.config.parallel_limit(limit);
1242        self
1243    }
1244
1245    pub fn stop_on_exception(mut self, stop: bool) -> Self {
1246        self.config = self.config.stop_on_exception(stop);
1247        self
1248    }
1249
1250    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1251        self.config = self.config.timeout(duration);
1252        self
1253    }
1254
1255    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1256        self.config = self.config.aggregation(strategy);
1257        self
1258    }
1259
1260    pub fn end_multicast(mut self) -> RouteBuilder {
1261        let step = BuilderStep::Multicast {
1262            steps: self.steps,
1263            config: self.config,
1264        };
1265        self.parent.steps.push(step);
1266        self.parent
1267    }
1268}
1269
1270impl StepAccumulator for MulticastBuilder {
1271    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1272        &mut self.steps
1273    }
1274}
1275
1276/// Builder for the sub-pipeline within a `.throttle()` ... `.end_throttle()` block.
1277///
1278/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1279/// but NOT `.build()` and NOT `.throttle()` (no nested throttles).
1280///
1281/// Calling `.end_throttle()` packages the sub-steps into a `BuilderStep::Throttle`
1282/// and returns the parent `RouteBuilder`.
1283pub struct ThrottleBuilder {
1284    parent: RouteBuilder,
1285    config: ThrottlerConfig,
1286    steps: Vec<BuilderStep>,
1287}
1288
1289impl ThrottleBuilder {
1290    /// Set the throttle strategy. Default is `Delay`.
1291    ///
1292    /// - `Delay`: Queue messages until capacity available
1293    /// - `Reject`: Return error immediately when throttled
1294    /// - `Drop`: Silently discard excess messages
1295    pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1296        self.config = self.config.strategy(strategy);
1297        self
1298    }
1299
1300    /// Close the throttle scope. Packages the accumulated sub-steps into a
1301    /// `BuilderStep::Throttle` and returns the parent `RouteBuilder`.
1302    pub fn end_throttle(mut self) -> RouteBuilder {
1303        let step = BuilderStep::Throttle {
1304            config: self.config,
1305            steps: self.steps,
1306        };
1307        self.parent.steps.push(step);
1308        self.parent
1309    }
1310}
1311
1312impl StepAccumulator for ThrottleBuilder {
1313    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1314        &mut self.steps
1315    }
1316}
1317
1318/// Builder for the sub-pipeline within a `.loop_count()` / `.loop_while()` ... `.end_loop()` block.
1319pub struct LoopBuilder {
1320    parent: RouteBuilder,
1321    config: LoopConfig,
1322    steps: Vec<BuilderStep>,
1323}
1324
1325impl LoopBuilder {
1326    pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1327        LoopInLoopBuilder {
1328            parent: self,
1329            config: LoopConfig {
1330                mode: LoopMode::Count(count),
1331            },
1332            steps: vec![],
1333        }
1334    }
1335
1336    pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1337    where
1338        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1339    {
1340        LoopInLoopBuilder {
1341            parent: self,
1342            config: LoopConfig {
1343                mode: LoopMode::While(std::sync::Arc::new(predicate)),
1344            },
1345            steps: vec![],
1346        }
1347    }
1348
1349    pub fn end_loop(mut self) -> RouteBuilder {
1350        let step = BuilderStep::Loop {
1351            config: self.config,
1352            steps: self.steps,
1353        };
1354        self.parent.steps.push(step);
1355        self.parent
1356    }
1357}
1358
1359impl StepAccumulator for LoopBuilder {
1360    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1361        &mut self.steps
1362    }
1363}
1364
1365pub struct LoopInLoopBuilder {
1366    parent: LoopBuilder,
1367    config: LoopConfig,
1368    steps: Vec<BuilderStep>,
1369}
1370
1371impl LoopInLoopBuilder {
1372    pub fn end_loop(mut self) -> LoopBuilder {
1373        let step = BuilderStep::Loop {
1374            config: self.config,
1375            steps: self.steps,
1376        };
1377        self.parent.steps.push(step);
1378        self.parent
1379    }
1380}
1381
1382impl StepAccumulator for LoopInLoopBuilder {
1383    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1384        &mut self.steps
1385    }
1386}
1387
1388/// Builder for the sub-pipeline within a `.load_balance()` ... `.end_load_balance()` block.
1389///
1390/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1391/// but NOT `.build()` and NOT `.load_balance()` (no nested load balancers).
1392///
1393/// Calling `.end_load_balance()` packages the sub-steps into a `BuilderStep::LoadBalance`
1394/// and returns the parent `RouteBuilder`.
1395pub struct LoadBalancerBuilder {
1396    parent: RouteBuilder,
1397    config: LoadBalancerConfig,
1398    steps: Vec<BuilderStep>,
1399}
1400
1401impl LoadBalancerBuilder {
1402    /// Set the load balance strategy to round-robin (default).
1403    pub fn round_robin(mut self) -> Self {
1404        self.config = LoadBalancerConfig::round_robin();
1405        self
1406    }
1407
1408    /// Set the load balance strategy to random selection.
1409    pub fn random(mut self) -> Self {
1410        self.config = LoadBalancerConfig::random();
1411        self
1412    }
1413
1414    /// Set the load balance strategy to weighted selection.
1415    ///
1416    /// Each endpoint is assigned a weight that determines its probability
1417    /// of being selected.
1418    pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1419        self.config = LoadBalancerConfig::weighted(weights);
1420        self
1421    }
1422
1423    /// Set the load balance strategy to failover.
1424    ///
1425    /// Exchanges are sent to the first endpoint; on failure, the next endpoint
1426    /// is tried.
1427    pub fn failover(mut self) -> Self {
1428        self.config = LoadBalancerConfig::failover();
1429        self
1430    }
1431
1432    /// Enable or disable parallel execution of endpoints.
1433    ///
1434    /// When enabled, all endpoints receive the exchange simultaneously.
1435    /// When disabled (default), only one endpoint is selected per exchange.
1436    pub fn parallel(mut self, parallel: bool) -> Self {
1437        self.config = self.config.parallel(parallel);
1438        self
1439    }
1440
1441    /// Close the load balance scope. Packages the accumulated sub-steps into a
1442    /// `BuilderStep::LoadBalance` and returns the parent `RouteBuilder`.
1443    pub fn end_load_balance(mut self) -> RouteBuilder {
1444        let step = BuilderStep::LoadBalance {
1445            config: self.config,
1446            steps: self.steps,
1447        };
1448        self.parent.steps.push(step);
1449        self.parent
1450    }
1451}
1452
1453impl StepAccumulator for LoadBalancerBuilder {
1454    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1455        &mut self.steps
1456    }
1457}
1458
1459// ---------------------------------------------------------------------------
1460// Tests
1461// ---------------------------------------------------------------------------
1462
1463#[cfg(test)]
1464mod tests {
1465    use super::*;
1466    use camel_api::error_handler::ErrorHandlerConfig;
1467    use camel_api::load_balancer::LoadBalanceStrategy;
1468    use camel_api::{Exchange, Message};
1469    use camel_core::route::BuilderStep;
1470    use std::sync::Arc;
1471    use std::time::Duration;
1472    use tower::{Service, ServiceExt};
1473
1474    #[test]
1475    fn test_builder_from_creates_definition() {
1476        let definition = RouteBuilder::from("timer:tick")
1477            .route_id("test-route")
1478            .build()
1479            .unwrap();
1480        assert_eq!(definition.from_uri(), "timer:tick");
1481    }
1482
1483    #[test]
1484    fn test_builder_empty_from_uri_errors() {
1485        let result = RouteBuilder::from("").route_id("test-route").build();
1486        assert!(result.is_err());
1487    }
1488
1489    #[test]
1490    fn test_build_rejects_schemeless_uri() {
1491        let result = RouteBuilder::from("no-scheme-here")
1492            .route_id("test-route")
1493            .build();
1494        match result {
1495            Err(err) => {
1496                let err_msg = format!("{err}");
1497                assert!(
1498                    err_msg.contains("scheme"),
1499                    "expected scheme-related error, got: {err_msg}"
1500                );
1501            }
1502            Ok(_) => panic!("schemeless URI should fail"),
1503        }
1504    }
1505
1506    #[test]
1507    fn test_build_rejects_empty_scheme_uri() {
1508        let result = RouteBuilder::from(":missing-scheme")
1509            .route_id("test-route")
1510            .build();
1511        match result {
1512            Err(err) => {
1513                let err_msg = format!("{err}");
1514                assert!(
1515                    err_msg.contains("scheme"),
1516                    "expected scheme-related error, got: {err_msg}"
1517                );
1518            }
1519            Ok(_) => panic!("empty-scheme URI should fail"),
1520        }
1521    }
1522
1523    #[test]
1524    fn test_build_accepts_valid_uri() {
1525        let result = RouteBuilder::from("timer:tick")
1526            .route_id("test-route")
1527            .build();
1528        assert!(result.is_ok());
1529    }
1530
1531    #[test]
1532    fn test_build_canonical_rejects_schemeless_uri() {
1533        let result = RouteBuilder::from("no-scheme-here")
1534            .route_id("test-route")
1535            .build_canonical();
1536        assert!(result.is_err());
1537    }
1538
1539    #[test]
1540    fn test_builder_to_adds_step() {
1541        let definition = RouteBuilder::from("timer:tick")
1542            .route_id("test-route")
1543            .to("log:info")
1544            .build()
1545            .unwrap();
1546
1547        assert_eq!(definition.from_uri(), "timer:tick");
1548        // We can verify steps were added by checking the structure
1549        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1550    }
1551
1552    #[test]
1553    fn test_builder_filter_adds_filter_step() {
1554        let definition = RouteBuilder::from("timer:tick")
1555            .route_id("test-route")
1556            .filter(|_ex| true)
1557            .to("mock:result")
1558            .end_filter()
1559            .build()
1560            .unwrap();
1561
1562        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1563    }
1564
1565    #[test]
1566    fn test_builder_set_header_adds_processor_step() {
1567        let definition = RouteBuilder::from("timer:tick")
1568            .route_id("test-route")
1569            .set_header("key", Value::String("value".into()))
1570            .build()
1571            .unwrap();
1572
1573        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1574    }
1575
1576    #[test]
1577    fn test_builder_map_body_adds_processor_step() {
1578        let definition = RouteBuilder::from("timer:tick")
1579            .route_id("test-route")
1580            .map_body(|body| body)
1581            .build()
1582            .unwrap();
1583
1584        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1585    }
1586
1587    #[test]
1588    fn test_builder_process_adds_processor_step() {
1589        let definition = RouteBuilder::from("timer:tick")
1590            .route_id("test-route")
1591            .process(|ex| async move { Ok(ex) })
1592            .build()
1593            .unwrap();
1594
1595        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1596    }
1597
1598    #[test]
1599    fn test_builder_chain_multiple_steps() {
1600        let definition = RouteBuilder::from("timer:tick")
1601            .route_id("test-route")
1602            .set_header("source", Value::String("timer".into()))
1603            .filter(|ex| ex.input.header("source").is_some())
1604            .to("log:info")
1605            .end_filter()
1606            .to("mock:result")
1607            .build()
1608            .unwrap();
1609
1610        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
1611        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
1612        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
1613        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1614    }
1615
1616    #[test]
1617    fn test_loop_count_builder() {
1618        use camel_api::loop_eip::LoopMode;
1619
1620        let def = RouteBuilder::from("direct:start")
1621            .route_id("loop-test")
1622            .loop_count(3)
1623            .to("mock:inside")
1624            .end_loop()
1625            .to("mock:after")
1626            .build()
1627            .unwrap();
1628
1629        assert_eq!(def.steps().len(), 2);
1630        match &def.steps()[0] {
1631            BuilderStep::Loop { config, steps } => {
1632                assert!(matches!(config.mode, LoopMode::Count(3)));
1633                assert_eq!(steps.len(), 1);
1634            }
1635            other => panic!("Expected Loop, got {:?}", other),
1636        }
1637        assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1638    }
1639
1640    #[test]
1641    fn test_loop_while_builder() {
1642        use camel_api::loop_eip::LoopMode;
1643
1644        let def = RouteBuilder::from("direct:start")
1645            .route_id("loop-while-test")
1646            .loop_while(|_ex| true)
1647            .to("mock:retry")
1648            .end_loop()
1649            .build()
1650            .unwrap();
1651
1652        assert_eq!(def.steps().len(), 1);
1653        match &def.steps()[0] {
1654            BuilderStep::Loop { config, steps } => {
1655                assert!(matches!(config.mode, LoopMode::While(_)));
1656                assert_eq!(steps.len(), 1);
1657            }
1658            other => panic!("Expected Loop, got {:?}", other),
1659        }
1660    }
1661
1662    #[test]
1663    fn test_nested_loop_builder() {
1664        use camel_api::loop_eip::LoopMode;
1665
1666        let def = RouteBuilder::from("direct:start")
1667            .route_id("nested-loop-test")
1668            .loop_count(2)
1669            .to("mock:outer")
1670            .loop_count(3)
1671            .to("mock:inner")
1672            .end_loop()
1673            .end_loop()
1674            .to("mock:after")
1675            .build()
1676            .unwrap();
1677
1678        assert_eq!(def.steps().len(), 2);
1679        match &def.steps()[0] {
1680            BuilderStep::Loop { steps, .. } => {
1681                assert_eq!(steps.len(), 2);
1682                match &steps[1] {
1683                    BuilderStep::Loop {
1684                        config,
1685                        steps: inner_steps,
1686                    } => {
1687                        assert!(matches!(config.mode, LoopMode::Count(3)));
1688                        assert_eq!(inner_steps.len(), 1);
1689                    }
1690                    other => panic!("Expected nested Loop, got {:?}", other),
1691                }
1692            }
1693            other => panic!("Expected outer Loop, got {:?}", other),
1694        }
1695    }
1696
1697    // -----------------------------------------------------------------------
1698    // Processor behavior tests — exercise the real Tower services directly
1699    // -----------------------------------------------------------------------
1700
1701    #[tokio::test]
1702    async fn test_set_header_processor_works() {
1703        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1704        let exchange = Exchange::new(Message::new("test"));
1705        let result = svc.call(exchange).await.unwrap();
1706        assert_eq!(
1707            result.input.header("greeting"),
1708            Some(&Value::String("hello".into()))
1709        );
1710    }
1711
1712    #[tokio::test]
1713    async fn test_filter_processor_passes() {
1714        use camel_api::BoxProcessorExt;
1715        use camel_processor::FilterService;
1716
1717        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1718        let mut svc =
1719            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1720        let exchange = Exchange::new(Message::new("pass"));
1721        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1722        assert_eq!(result.input.body.as_text(), Some("pass"));
1723    }
1724
1725    #[tokio::test]
1726    async fn test_filter_processor_blocks() {
1727        use camel_api::BoxProcessorExt;
1728        use camel_processor::FilterService;
1729
1730        let sub = BoxProcessor::from_fn(|_ex| {
1731            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1732        });
1733        let mut svc =
1734            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1735        let exchange = Exchange::new(Message::new("reject"));
1736        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1737        assert_eq!(result.input.body.as_text(), Some("reject"));
1738    }
1739
1740    #[tokio::test]
1741    async fn test_map_body_processor_works() {
1742        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1743            if let Some(text) = body.as_text() {
1744                Body::Text(text.to_uppercase())
1745            } else {
1746                body
1747            }
1748        });
1749        let exchange = Exchange::new(Message::new("hello"));
1750        let result = mapper.oneshot(exchange).await.unwrap();
1751        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1752    }
1753
1754    #[tokio::test]
1755    async fn test_process_custom_processor_works() {
1756        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1757            ex.set_property("custom", Value::Bool(true));
1758            Ok(ex)
1759        });
1760        let exchange = Exchange::new(Message::default());
1761        let result = processor.oneshot(exchange).await.unwrap();
1762        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1763    }
1764
1765    // -----------------------------------------------------------------------
1766    // Sequential pipeline test
1767    // -----------------------------------------------------------------------
1768
1769    #[tokio::test]
1770    async fn test_compose_pipeline_runs_steps_in_order() {
1771        use camel_core::route::compose_pipeline;
1772
1773        let processors = vec![
1774            BoxProcessor::new(SetHeader::new(
1775                IdentityProcessor,
1776                "step",
1777                Value::String("one".into()),
1778            )),
1779            BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1780                if let Some(text) = body.as_text() {
1781                    Body::Text(format!("{}-processed", text))
1782                } else {
1783                    body
1784                }
1785            })),
1786        ];
1787
1788        let pipeline = compose_pipeline(processors);
1789        let exchange = Exchange::new(Message::new("hello"));
1790        let result = pipeline.oneshot(exchange).await.unwrap();
1791
1792        assert_eq!(
1793            result.input.header("step"),
1794            Some(&Value::String("one".into()))
1795        );
1796        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1797    }
1798
1799    #[tokio::test]
1800    async fn test_compose_pipeline_empty_is_identity() {
1801        use camel_core::route::compose_pipeline;
1802
1803        let pipeline = compose_pipeline(vec![]);
1804        let exchange = Exchange::new(Message::new("unchanged"));
1805        let result = pipeline.oneshot(exchange).await.unwrap();
1806        assert_eq!(result.input.body.as_text(), Some("unchanged"));
1807    }
1808
1809    // -----------------------------------------------------------------------
1810    // Circuit breaker builder tests
1811    // -----------------------------------------------------------------------
1812
1813    #[test]
1814    fn test_builder_circuit_breaker_sets_config() {
1815        use camel_api::circuit_breaker::CircuitBreakerConfig;
1816
1817        let config = CircuitBreakerConfig::new().failure_threshold(5);
1818        let definition = RouteBuilder::from("timer:tick")
1819            .route_id("test-route")
1820            .circuit_breaker(config)
1821            .build()
1822            .unwrap();
1823
1824        let cb = definition
1825            .circuit_breaker_config()
1826            .expect("circuit breaker should be set");
1827        assert_eq!(cb.failure_threshold, 5);
1828    }
1829
1830    #[test]
1831    fn test_builder_circuit_breaker_with_error_handler() {
1832        use camel_api::circuit_breaker::CircuitBreakerConfig;
1833        use camel_api::error_handler::ErrorHandlerConfig;
1834
1835        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1836        let eh_config = ErrorHandlerConfig::log_only();
1837
1838        let definition = RouteBuilder::from("timer:tick")
1839            .route_id("test-route")
1840            .to("log:info")
1841            .circuit_breaker(cb_config)
1842            .error_handler(eh_config)
1843            .build()
1844            .unwrap();
1845
1846        assert!(
1847            definition.circuit_breaker_config().is_some(),
1848            "circuit breaker config should be set"
1849        );
1850        // Route definition was built successfully with both configs.
1851    }
1852
1853    #[test]
1854    fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1855        let definition = RouteBuilder::from("direct:start")
1856            .route_id("test-route")
1857            .dead_letter_channel("log:dlc")
1858            .on_exception(|e| matches!(e, CamelError::Io(_)))
1859            .retry(3)
1860            .handled_by("log:io")
1861            .end_on_exception()
1862            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1863            .retry(1)
1864            .end_on_exception()
1865            .to("mock:out")
1866            .build()
1867            .expect("route should build");
1868
1869        let cfg = definition
1870            .error_handler_config()
1871            .expect("error handler should be set");
1872        assert_eq!(cfg.policies.len(), 2);
1873        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1874        assert_eq!(
1875            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1876            Some(3)
1877        );
1878        assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1879        assert_eq!(
1880            cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1881            Some(1)
1882        );
1883    }
1884
1885    #[test]
1886    fn test_builder_on_exception_mixed_mode_rejected() {
1887        let result = RouteBuilder::from("direct:start")
1888            .route_id("test-route")
1889            .error_handler(ErrorHandlerConfig::log_only())
1890            .on_exception(|_e| true)
1891            .end_on_exception()
1892            .to("mock:out")
1893            .build();
1894
1895        let err = result.err().expect("mixed mode should fail with an error");
1896
1897        assert!(
1898            format!("{err}").contains("mixed error handler modes"),
1899            "unexpected error: {err}"
1900        );
1901    }
1902
1903    #[test]
1904    fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1905        let definition = RouteBuilder::from("direct:start")
1906            .route_id("test-route")
1907            .on_exception(|_e| true)
1908            .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1909            .with_jitter(0.5)
1910            .end_on_exception()
1911            .to("mock:out")
1912            .build()
1913            .expect("route should build");
1914
1915        let cfg = definition
1916            .error_handler_config()
1917            .expect("error handler should be set");
1918        assert_eq!(cfg.policies.len(), 1);
1919        assert!(cfg.policies[0].retry.is_none());
1920    }
1921
1922    #[test]
1923    fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1924        let definition = RouteBuilder::from("direct:start")
1925            .route_id("test-route")
1926            .dead_letter_channel("log:dlc")
1927            .to("mock:out")
1928            .build()
1929            .expect("route should build");
1930
1931        let cfg = definition
1932            .error_handler_config()
1933            .expect("error handler should be set");
1934        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1935        assert!(cfg.policies.is_empty());
1936    }
1937
1938    #[test]
1939    fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1940        let definition = RouteBuilder::from("direct:start")
1941            .route_id("test-route")
1942            .dead_letter_channel("log:first")
1943            .on_exception(|e| matches!(e, CamelError::Io(_)))
1944            .retry(2)
1945            .end_on_exception()
1946            .dead_letter_channel("log:second")
1947            .to("mock:out")
1948            .build()
1949            .expect("route should build");
1950
1951        let cfg = definition
1952            .error_handler_config()
1953            .expect("error handler should be set");
1954        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1955        assert_eq!(cfg.policies.len(), 1);
1956        assert_eq!(
1957            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1958            Some(2)
1959        );
1960    }
1961
1962    #[test]
1963    fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
1964        let definition = RouteBuilder::from("direct:start")
1965            .route_id("test-route")
1966            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1967            .retry(1)
1968            .end_on_exception()
1969            .to("mock:out")
1970            .build()
1971            .expect("route should build");
1972
1973        let cfg = definition
1974            .error_handler_config()
1975            .expect("error handler should be set");
1976        assert!(cfg.dlc_uri.is_none());
1977        assert_eq!(cfg.policies.len(), 1);
1978    }
1979
1980    #[test]
1981    fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
1982        let first = ErrorHandlerConfig::dead_letter_channel("log:first");
1983        let second = ErrorHandlerConfig::dead_letter_channel("log:second");
1984
1985        let definition = RouteBuilder::from("direct:start")
1986            .route_id("test-route")
1987            .error_handler(first)
1988            .error_handler(second)
1989            .to("mock:out")
1990            .build()
1991            .expect("route should build");
1992
1993        let cfg = definition
1994            .error_handler_config()
1995            .expect("error handler should be set");
1996        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1997    }
1998
1999    // --- Splitter builder tests ---
2000
2001    #[test]
2002    fn test_split_builder_typestate() {
2003        use camel_api::splitter::{SplitterConfig, split_body_lines};
2004
2005        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
2006        let definition = RouteBuilder::from("timer:test?period=1000")
2007            .route_id("test-route")
2008            .split(SplitterConfig::new(split_body_lines()))
2009            .to("mock:per-fragment")
2010            .end_split()
2011            .to("mock:final")
2012            .build()
2013            .unwrap();
2014
2015        // Should have 2 top-level steps: Split + To("mock:final")
2016        assert_eq!(definition.steps().len(), 2);
2017    }
2018
2019    #[test]
2020    fn test_split_builder_steps_collected() {
2021        use camel_api::splitter::{SplitterConfig, split_body_lines};
2022
2023        let definition = RouteBuilder::from("timer:test?period=1000")
2024            .route_id("test-route")
2025            .split(SplitterConfig::new(split_body_lines()))
2026            .set_header("fragment", Value::String("yes".into()))
2027            .to("mock:per-fragment")
2028            .end_split()
2029            .build()
2030            .unwrap();
2031
2032        // Should have 1 top-level step: Split (containing 2 sub-steps)
2033        assert_eq!(definition.steps().len(), 1);
2034        match &definition.steps()[0] {
2035            BuilderStep::Split { steps, .. } => {
2036                assert_eq!(steps.len(), 2); // SetHeader + To
2037            }
2038            other => panic!("Expected Split, got {:?}", other),
2039        }
2040    }
2041
2042    #[test]
2043    fn test_split_builder_config_propagated() {
2044        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
2045
2046        let definition = RouteBuilder::from("timer:test?period=1000")
2047            .route_id("test-route")
2048            .split(
2049                SplitterConfig::new(split_body_lines())
2050                    .parallel(true)
2051                    .parallel_limit(4)
2052                    .aggregation(AggregationStrategy::CollectAll),
2053            )
2054            .to("mock:per-fragment")
2055            .end_split()
2056            .build()
2057            .unwrap();
2058
2059        match &definition.steps()[0] {
2060            BuilderStep::Split { config, .. } => {
2061                assert!(config.parallel);
2062                assert_eq!(config.parallel_limit, Some(4));
2063                assert!(matches!(
2064                    config.aggregation,
2065                    AggregationStrategy::CollectAll
2066                ));
2067            }
2068            other => panic!("Expected Split, got {:?}", other),
2069        }
2070    }
2071
2072    #[test]
2073    fn test_aggregate_builder_adds_step() {
2074        use camel_api::aggregator::AggregatorConfig;
2075        use camel_core::route::BuilderStep;
2076
2077        let definition = RouteBuilder::from("timer:tick")
2078            .route_id("test-route")
2079            .aggregate(
2080                AggregatorConfig::correlate_by("key")
2081                    .complete_when_size(2)
2082                    .build()
2083                    .unwrap(),
2084            )
2085            .build()
2086            .unwrap();
2087
2088        assert_eq!(definition.steps().len(), 1);
2089        assert!(matches!(
2090            definition.steps()[0],
2091            BuilderStep::Aggregate { .. }
2092        ));
2093    }
2094
2095    #[test]
2096    fn test_aggregate_in_split_builder() {
2097        use camel_api::aggregator::AggregatorConfig;
2098        use camel_api::splitter::{SplitterConfig, split_body_lines};
2099        use camel_core::route::BuilderStep;
2100
2101        let definition = RouteBuilder::from("timer:tick")
2102            .route_id("test-route")
2103            .split(SplitterConfig::new(split_body_lines()))
2104            .aggregate(
2105                AggregatorConfig::correlate_by("key")
2106                    .complete_when_size(1)
2107                    .build()
2108                    .unwrap(),
2109            )
2110            .end_split()
2111            .build()
2112            .unwrap();
2113
2114        assert_eq!(definition.steps().len(), 1);
2115        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2116            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2117        } else {
2118            panic!("expected Split step");
2119        }
2120    }
2121
2122    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
2123
2124    #[test]
2125    fn test_builder_set_body_static_adds_processor() {
2126        let definition = RouteBuilder::from("timer:tick")
2127            .route_id("test-route")
2128            .set_body("fixed")
2129            .build()
2130            .unwrap();
2131        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2132    }
2133
2134    #[test]
2135    fn test_builder_set_body_fn_adds_processor() {
2136        let definition = RouteBuilder::from("timer:tick")
2137            .route_id("test-route")
2138            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2139            .build()
2140            .unwrap();
2141        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2142    }
2143
2144    #[test]
2145    fn transform_alias_produces_same_as_set_body() {
2146        let route_transform = RouteBuilder::from("timer:tick")
2147            .route_id("test-route")
2148            .transform("hello")
2149            .build()
2150            .unwrap();
2151
2152        let route_set_body = RouteBuilder::from("timer:tick")
2153            .route_id("test-route")
2154            .set_body("hello")
2155            .build()
2156            .unwrap();
2157
2158        assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2159    }
2160
2161    #[test]
2162    fn test_builder_set_header_fn_adds_processor() {
2163        let definition = RouteBuilder::from("timer:tick")
2164            .route_id("test-route")
2165            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2166            .build()
2167            .unwrap();
2168        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2169    }
2170
2171    #[tokio::test]
2172    async fn test_set_body_static_processor_works() {
2173        use camel_core::route::compose_pipeline;
2174        let def = RouteBuilder::from("t:t")
2175            .route_id("test-route")
2176            .set_body("replaced")
2177            .build()
2178            .unwrap();
2179        let pipeline = compose_pipeline(
2180            def.steps()
2181                .iter()
2182                .filter_map(|s| {
2183                    if let BuilderStep::Processor(p) = s {
2184                        Some(p.clone())
2185                    } else {
2186                        None
2187                    }
2188                })
2189                .collect(),
2190        );
2191        let exchange = Exchange::new(Message::new("original"));
2192        let result = pipeline.oneshot(exchange).await.unwrap();
2193        assert_eq!(result.input.body.as_text(), Some("replaced"));
2194    }
2195
2196    #[tokio::test]
2197    async fn test_set_body_fn_processor_works() {
2198        use camel_core::route::compose_pipeline;
2199        let def = RouteBuilder::from("t:t")
2200            .route_id("test-route")
2201            .set_body_fn(|ex: &Exchange| {
2202                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2203            })
2204            .build()
2205            .unwrap();
2206        let pipeline = compose_pipeline(
2207            def.steps()
2208                .iter()
2209                .filter_map(|s| {
2210                    if let BuilderStep::Processor(p) = s {
2211                        Some(p.clone())
2212                    } else {
2213                        None
2214                    }
2215                })
2216                .collect(),
2217        );
2218        let exchange = Exchange::new(Message::new("hello"));
2219        let result = pipeline.oneshot(exchange).await.unwrap();
2220        assert_eq!(result.input.body.as_text(), Some("HELLO"));
2221    }
2222
2223    #[tokio::test]
2224    async fn test_set_header_fn_processor_works() {
2225        use camel_core::route::compose_pipeline;
2226        let def = RouteBuilder::from("t:t")
2227            .route_id("test-route")
2228            .set_header_fn("echo", |ex: &Exchange| {
2229                ex.input
2230                    .body
2231                    .as_text()
2232                    .map(|t| Value::String(t.into()))
2233                    .unwrap_or(Value::Null)
2234            })
2235            .build()
2236            .unwrap();
2237        let pipeline = compose_pipeline(
2238            def.steps()
2239                .iter()
2240                .filter_map(|s| {
2241                    if let BuilderStep::Processor(p) = s {
2242                        Some(p.clone())
2243                    } else {
2244                        None
2245                    }
2246                })
2247                .collect(),
2248        );
2249        let exchange = Exchange::new(Message::new("ping"));
2250        let result = pipeline.oneshot(exchange).await.unwrap();
2251        assert_eq!(
2252            result.input.header("echo"),
2253            Some(&Value::String("ping".into()))
2254        );
2255    }
2256
2257    // ── FilterBuilder typestate tests ─────────────────────────────────────
2258
2259    #[test]
2260    fn test_filter_builder_typestate() {
2261        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2262            .route_id("test-route")
2263            .filter(|_ex| true)
2264            .to("mock:inner")
2265            .end_filter()
2266            .to("mock:outer")
2267            .build();
2268        assert!(result.is_ok());
2269    }
2270
2271    #[test]
2272    fn test_filter_builder_steps_collected() {
2273        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2274            .route_id("test-route")
2275            .filter(|_ex| true)
2276            .to("mock:inner")
2277            .end_filter()
2278            .build()
2279            .unwrap();
2280
2281        assert_eq!(definition.steps().len(), 1);
2282        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2283    }
2284
2285    #[test]
2286    fn test_wire_tap_builder_adds_step() {
2287        let definition = RouteBuilder::from("timer:tick")
2288            .route_id("test-route")
2289            .wire_tap("mock:tap")
2290            .to("mock:result")
2291            .build()
2292            .unwrap();
2293
2294        assert_eq!(definition.steps().len(), 2);
2295        assert!(
2296            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2297        );
2298        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2299    }
2300
2301    // ── MulticastBuilder typestate tests ─────────────────────────────────────
2302
2303    #[test]
2304    fn test_multicast_builder_typestate() {
2305        let definition = RouteBuilder::from("timer:tick")
2306            .route_id("test-route")
2307            .multicast()
2308            .to("direct:a")
2309            .to("direct:b")
2310            .end_multicast()
2311            .to("mock:result")
2312            .build()
2313            .unwrap();
2314
2315        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
2316    }
2317
2318    #[test]
2319    fn test_multicast_builder_steps_collected() {
2320        let definition = RouteBuilder::from("timer:tick")
2321            .route_id("test-route")
2322            .multicast()
2323            .to("direct:a")
2324            .to("direct:b")
2325            .end_multicast()
2326            .build()
2327            .unwrap();
2328
2329        match &definition.steps()[0] {
2330            BuilderStep::Multicast { steps, .. } => {
2331                assert_eq!(steps.len(), 2);
2332            }
2333            other => panic!("Expected Multicast, got {:?}", other),
2334        }
2335    }
2336
2337    // ── Concurrency builder tests ─────────────────────────────────────
2338
2339    #[test]
2340    fn test_builder_concurrent_sets_concurrency() {
2341        use camel_component_api::ConcurrencyModel;
2342
2343        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2344            .route_id("test-route")
2345            .concurrent(16)
2346            .to("log:info")
2347            .build()
2348            .unwrap();
2349
2350        assert_eq!(
2351            definition.concurrency_override(),
2352            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2353        );
2354    }
2355
2356    #[test]
2357    fn test_builder_concurrent_zero_means_unbounded() {
2358        use camel_component_api::ConcurrencyModel;
2359
2360        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2361            .route_id("test-route")
2362            .concurrent(0)
2363            .to("log:info")
2364            .build()
2365            .unwrap();
2366
2367        assert_eq!(
2368            definition.concurrency_override(),
2369            Some(&ConcurrencyModel::Concurrent { max: None })
2370        );
2371    }
2372
2373    #[test]
2374    fn test_builder_sequential_sets_concurrency() {
2375        use camel_component_api::ConcurrencyModel;
2376
2377        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2378            .route_id("test-route")
2379            .sequential()
2380            .to("log:info")
2381            .build()
2382            .unwrap();
2383
2384        assert_eq!(
2385            definition.concurrency_override(),
2386            Some(&ConcurrencyModel::Sequential)
2387        );
2388    }
2389
2390    #[test]
2391    fn test_builder_default_concurrency_is_none() {
2392        let definition = RouteBuilder::from("timer:tick")
2393            .route_id("test-route")
2394            .to("log:info")
2395            .build()
2396            .unwrap();
2397
2398        assert_eq!(definition.concurrency_override(), None);
2399    }
2400
2401    // ── Route lifecycle builder tests ─────────────────────────────────────
2402
2403    #[test]
2404    fn test_builder_route_id_sets_id() {
2405        let definition = RouteBuilder::from("timer:tick")
2406            .route_id("my-route")
2407            .build()
2408            .unwrap();
2409
2410        assert_eq!(definition.route_id(), "my-route");
2411    }
2412
2413    #[test]
2414    fn test_build_without_route_id_fails() {
2415        let result = RouteBuilder::from("timer:tick?period=1000")
2416            .to("log:info")
2417            .build();
2418        let err = match result {
2419            Err(e) => e.to_string(),
2420            Ok(_) => panic!("build() should fail without route_id"),
2421        };
2422        assert!(
2423            err.contains("route_id"),
2424            "error should mention route_id, got: {}",
2425            err
2426        );
2427    }
2428
2429    #[test]
2430    fn test_builder_empty_route_id_rejected() {
2431        let result = RouteBuilder::from("timer:tick").route_id("").build();
2432        let err = result.err().expect("empty route_id should be rejected");
2433        assert!(matches!(err, CamelError::RouteError(_)));
2434    }
2435
2436    #[test]
2437    fn test_builder_whitespace_route_id_rejected() {
2438        let result = RouteBuilder::from("timer:tick").route_id("   ").build();
2439        assert!(result.is_err());
2440    }
2441
2442    #[test]
2443    fn test_builder_auto_startup_false() {
2444        let definition = RouteBuilder::from("timer:tick")
2445            .route_id("test-route")
2446            .auto_startup(false)
2447            .build()
2448            .unwrap();
2449
2450        assert!(!definition.auto_startup());
2451    }
2452
2453    #[test]
2454    fn test_builder_startup_order_custom() {
2455        let definition = RouteBuilder::from("timer:tick")
2456            .route_id("test-route")
2457            .startup_order(50)
2458            .build()
2459            .unwrap();
2460
2461        assert_eq!(definition.startup_order(), 50);
2462    }
2463
2464    #[test]
2465    fn test_builder_defaults() {
2466        let definition = RouteBuilder::from("timer:tick")
2467            .route_id("test-route")
2468            .build()
2469            .unwrap();
2470
2471        assert_eq!(definition.route_id(), "test-route");
2472        assert!(definition.auto_startup());
2473        assert_eq!(definition.startup_order(), 1000);
2474    }
2475
2476    // ── Choice typestate tests ──────────────────────────────────────────────────
2477
2478    #[test]
2479    fn test_choice_builder_single_when() {
2480        let definition = RouteBuilder::from("timer:tick")
2481            .route_id("test-route")
2482            .choice()
2483            .when(|ex: &Exchange| ex.input.header("type").is_some())
2484            .to("mock:typed")
2485            .end_when()
2486            .end_choice()
2487            .build()
2488            .unwrap();
2489        assert_eq!(definition.steps().len(), 1);
2490        assert!(
2491            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2492            if whens.len() == 1 && otherwise.is_none())
2493        );
2494    }
2495
2496    #[test]
2497    fn test_choice_builder_when_otherwise() {
2498        let definition = RouteBuilder::from("timer:tick")
2499            .route_id("test-route")
2500            .choice()
2501            .when(|ex: &Exchange| ex.input.header("a").is_some())
2502            .to("mock:a")
2503            .end_when()
2504            .otherwise()
2505            .to("mock:fallback")
2506            .end_otherwise()
2507            .end_choice()
2508            .build()
2509            .unwrap();
2510        assert!(
2511            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2512            if whens.len() == 1 && otherwise.is_some())
2513        );
2514    }
2515
2516    #[test]
2517    fn test_choice_builder_multiple_whens() {
2518        let definition = RouteBuilder::from("timer:tick")
2519            .route_id("test-route")
2520            .choice()
2521            .when(|ex: &Exchange| ex.input.header("a").is_some())
2522            .to("mock:a")
2523            .end_when()
2524            .when(|ex: &Exchange| ex.input.header("b").is_some())
2525            .to("mock:b")
2526            .end_when()
2527            .end_choice()
2528            .build()
2529            .unwrap();
2530        assert!(
2531            matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2532            if whens.len() == 2)
2533        );
2534    }
2535
2536    #[test]
2537    fn test_choice_step_after_choice() {
2538        // Steps after end_choice() are added to the outer pipeline, not inside choice.
2539        let definition = RouteBuilder::from("timer:tick")
2540            .route_id("test-route")
2541            .choice()
2542            .when(|_ex: &Exchange| true)
2543            .to("mock:inner")
2544            .end_when()
2545            .end_choice()
2546            .to("mock:outer") // must be step[1], not inside choice
2547            .build()
2548            .unwrap();
2549        assert_eq!(definition.steps().len(), 2);
2550        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2551    }
2552
2553    // ── Throttle typestate tests ──────────────────────────────────────────────────
2554
2555    #[test]
2556    fn test_throttle_builder_typestate() {
2557        let definition = RouteBuilder::from("timer:tick")
2558            .route_id("test-route")
2559            .throttle(10, std::time::Duration::from_secs(1))
2560            .to("mock:result")
2561            .end_throttle()
2562            .build()
2563            .unwrap();
2564
2565        assert_eq!(definition.steps().len(), 1);
2566        assert!(matches!(
2567            &definition.steps()[0],
2568            BuilderStep::Throttle { .. }
2569        ));
2570    }
2571
2572    #[test]
2573    fn test_throttle_builder_with_strategy() {
2574        let definition = RouteBuilder::from("timer:tick")
2575            .route_id("test-route")
2576            .throttle(10, std::time::Duration::from_secs(1))
2577            .strategy(ThrottleStrategy::Reject)
2578            .to("mock:result")
2579            .end_throttle()
2580            .build()
2581            .unwrap();
2582
2583        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2584            assert_eq!(config.strategy, ThrottleStrategy::Reject);
2585        } else {
2586            panic!("Expected Throttle step");
2587        }
2588    }
2589
2590    #[test]
2591    fn test_throttle_builder_steps_collected() {
2592        let definition = RouteBuilder::from("timer:tick")
2593            .route_id("test-route")
2594            .throttle(5, std::time::Duration::from_secs(1))
2595            .set_header("throttled", Value::Bool(true))
2596            .to("mock:throttled")
2597            .end_throttle()
2598            .build()
2599            .unwrap();
2600
2601        match &definition.steps()[0] {
2602            BuilderStep::Throttle { steps, .. } => {
2603                assert_eq!(steps.len(), 2); // SetHeader + To
2604            }
2605            other => panic!("Expected Throttle, got {:?}", other),
2606        }
2607    }
2608
2609    #[test]
2610    fn test_throttle_step_after_throttle() {
2611        // Steps after end_throttle() are added to the outer pipeline, not inside throttle.
2612        let definition = RouteBuilder::from("timer:tick")
2613            .route_id("test-route")
2614            .throttle(10, std::time::Duration::from_secs(1))
2615            .to("mock:inner")
2616            .end_throttle()
2617            .to("mock:outer")
2618            .build()
2619            .unwrap();
2620
2621        assert_eq!(definition.steps().len(), 2);
2622        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2623    }
2624
2625    // ── LoadBalance typestate tests ──────────────────────────────────────────────────
2626
2627    #[test]
2628    fn test_load_balance_builder_typestate() {
2629        let definition = RouteBuilder::from("timer:tick")
2630            .route_id("test-route")
2631            .load_balance()
2632            .round_robin()
2633            .to("mock:a")
2634            .to("mock:b")
2635            .end_load_balance()
2636            .build()
2637            .unwrap();
2638
2639        assert_eq!(definition.steps().len(), 1);
2640        assert!(matches!(
2641            &definition.steps()[0],
2642            BuilderStep::LoadBalance { .. }
2643        ));
2644    }
2645
2646    #[test]
2647    fn test_load_balance_builder_with_strategy() {
2648        let definition = RouteBuilder::from("timer:tick")
2649            .route_id("test-route")
2650            .load_balance()
2651            .random()
2652            .to("mock:result")
2653            .end_load_balance()
2654            .build()
2655            .unwrap();
2656
2657        if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2658            assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2659        } else {
2660            panic!("Expected LoadBalance step");
2661        }
2662    }
2663
2664    #[test]
2665    fn test_load_balance_builder_steps_collected() {
2666        let definition = RouteBuilder::from("timer:tick")
2667            .route_id("test-route")
2668            .load_balance()
2669            .set_header("lb", Value::Bool(true))
2670            .to("mock:a")
2671            .end_load_balance()
2672            .build()
2673            .unwrap();
2674
2675        match &definition.steps()[0] {
2676            BuilderStep::LoadBalance { steps, .. } => {
2677                assert_eq!(steps.len(), 2); // SetHeader + To
2678            }
2679            other => panic!("Expected LoadBalance, got {:?}", other),
2680        }
2681    }
2682
2683    #[test]
2684    fn test_load_balance_step_after_load_balance() {
2685        // Steps after end_load_balance() are added to the outer pipeline, not inside load_balance.
2686        let definition = RouteBuilder::from("timer:tick")
2687            .route_id("test-route")
2688            .load_balance()
2689            .to("mock:inner")
2690            .end_load_balance()
2691            .to("mock:outer")
2692            .build()
2693            .unwrap();
2694
2695        assert_eq!(definition.steps().len(), 2);
2696        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2697    }
2698
2699    // ── DynamicRouter typestate tests ──────────────────────────────────────────────────
2700
2701    #[test]
2702    fn test_dynamic_router_builder() {
2703        let definition = RouteBuilder::from("timer:tick")
2704            .route_id("test-route")
2705            .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2706            .build()
2707            .unwrap();
2708
2709        assert_eq!(definition.steps().len(), 1);
2710        assert!(matches!(
2711            &definition.steps()[0],
2712            BuilderStep::DynamicRouter { .. }
2713        ));
2714    }
2715
2716    #[test]
2717    fn test_dynamic_router_builder_with_config() {
2718        let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2719            .max_iterations(100)
2720            .cache_size(500);
2721
2722        let definition = RouteBuilder::from("timer:tick")
2723            .route_id("test-route")
2724            .dynamic_router_with_config(config)
2725            .build()
2726            .unwrap();
2727
2728        assert_eq!(definition.steps().len(), 1);
2729        if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2730            assert_eq!(config.max_iterations, 100);
2731            assert_eq!(config.cache_size, 500);
2732        } else {
2733            panic!("Expected DynamicRouter step");
2734        }
2735    }
2736
2737    #[test]
2738    fn test_dynamic_router_step_after_router() {
2739        // Steps after dynamic_router() are added to the outer pipeline.
2740        let definition = RouteBuilder::from("timer:tick")
2741            .route_id("test-route")
2742            .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2743            .to("mock:outer")
2744            .build()
2745            .unwrap();
2746
2747        assert_eq!(definition.steps().len(), 2);
2748        assert!(matches!(
2749            &definition.steps()[0],
2750            BuilderStep::DynamicRouter { .. }
2751        ));
2752        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2753    }
2754
2755    #[test]
2756    fn routing_slip_builder_creates_step() {
2757        use camel_api::RoutingSlipExpression;
2758
2759        let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2760
2761        let route = RouteBuilder::from("direct:start")
2762            .route_id("routing-slip-test")
2763            .routing_slip(expression)
2764            .build()
2765            .unwrap();
2766
2767        assert!(
2768            matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2769            "Expected RoutingSlip step"
2770        );
2771    }
2772
2773    #[test]
2774    fn routing_slip_with_config_builder_creates_step() {
2775        use camel_api::RoutingSlipConfig;
2776
2777        let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2778            .uri_delimiter("|")
2779            .cache_size(50)
2780            .ignore_invalid_endpoints(true);
2781
2782        let route = RouteBuilder::from("direct:start")
2783            .route_id("routing-slip-config-test")
2784            .routing_slip_with_config(config)
2785            .build()
2786            .unwrap();
2787
2788        if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2789            assert_eq!(config.uri_delimiter, "|");
2790            assert_eq!(config.cache_size, 50);
2791            assert!(config.ignore_invalid_endpoints);
2792        } else {
2793            panic!("Expected RoutingSlip step");
2794        }
2795    }
2796
2797    #[test]
2798    fn test_builder_marshal_adds_processor_step() {
2799        let definition = RouteBuilder::from("timer:tick")
2800            .route_id("test-route")
2801            .marshal("json")
2802            .unwrap()
2803            .build()
2804            .unwrap();
2805        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2806    }
2807
2808    #[test]
2809    fn test_builder_unmarshal_adds_processor_step() {
2810        let definition = RouteBuilder::from("timer:tick")
2811            .route_id("test-route")
2812            .unmarshal("json")
2813            .unwrap()
2814            .build()
2815            .unwrap();
2816        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2817    }
2818
2819    #[test]
2820    fn test_builder_stream_cache_adds_processor_step() {
2821        let definition = RouteBuilder::from("timer:tick")
2822            .route_id("test-route")
2823            .stream_cache(1024)
2824            .build()
2825            .unwrap();
2826        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2827    }
2828
2829    #[test]
2830    fn validate_adds_to_step_with_validator_uri() {
2831        let def = RouteBuilder::from("direct:in")
2832            .route_id("test")
2833            .validate("schemas/order.xsd")
2834            .build()
2835            .unwrap();
2836        let steps = def.steps();
2837        assert_eq!(steps.len(), 1);
2838        assert!(
2839            matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2840            "got: {:?}",
2841            steps[0]
2842        );
2843    }
2844
2845    #[test]
2846    fn test_builder_marshal_returns_err_for_unknown_format() {
2847        let result = RouteBuilder::from("timer:tick")
2848            .route_id("test-route")
2849            .marshal("csv");
2850        let err = match result {
2851            Err(e) => e,
2852            Ok(_) => panic!("marshal with unknown format should return Err"),
2853        };
2854        let msg = err.to_string();
2855        assert!(
2856            msg.contains("unknown data format"),
2857            "error should mention unknown format, got: {msg}"
2858        );
2859        assert!(
2860            msg.contains("csv"),
2861            "error should mention format name, got: {msg}"
2862        );
2863    }
2864
2865    #[test]
2866    fn test_builder_unmarshal_returns_err_for_unknown_format() {
2867        let result = RouteBuilder::from("timer:tick")
2868            .route_id("test-route")
2869            .unmarshal("csv");
2870        let err = match result {
2871            Err(e) => e,
2872            Ok(_) => panic!("unmarshal with unknown format should return Err"),
2873        };
2874        let msg = err.to_string();
2875        assert!(
2876            msg.contains("unknown data format"),
2877            "error should mention unknown format, got: {msg}"
2878        );
2879        assert!(
2880            msg.contains("csv"),
2881            "error should mention format name, got: {msg}"
2882        );
2883    }
2884
2885    #[test]
2886    fn test_builder_recipient_list_creates_step() {
2887        let route = RouteBuilder::from("direct:start")
2888            .route_id("recipient-list-test")
2889            .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2890            .build()
2891            .unwrap();
2892
2893        assert!(matches!(
2894            &route.steps()[0],
2895            BuilderStep::RecipientList { .. }
2896        ));
2897    }
2898
2899    #[test]
2900    fn test_builder_recipient_list_with_config_creates_step() {
2901        let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2902
2903        let route = RouteBuilder::from("direct:start")
2904            .route_id("recipient-list-config-test")
2905            .recipient_list_with_config(config)
2906            .build()
2907            .unwrap();
2908
2909        assert!(matches!(
2910            &route.steps()[0],
2911            BuilderStep::RecipientList { .. }
2912        ));
2913    }
2914
2915    #[test]
2916    fn test_builder_script_adds_script_step() {
2917        let route = RouteBuilder::from("direct:start")
2918            .route_id("script-test")
2919            .script("rhai", "headers[\"x\"] = \"y\"")
2920            .build()
2921            .unwrap();
2922
2923        assert!(matches!(
2924            &route.steps()[0],
2925            BuilderStep::Script { language, script }
2926            if language == "rhai" && script == "headers[\"x\"] = \"y\""
2927        ));
2928    }
2929
2930    #[test]
2931    fn test_builder_delay_and_delay_with_header_add_steps() {
2932        let route = RouteBuilder::from("direct:start")
2933            .route_id("delay-test")
2934            .delay(Duration::from_millis(250))
2935            .delay_with_header(Duration::from_millis(500), "x-delay")
2936            .build()
2937            .unwrap();
2938
2939        assert_eq!(route.steps().len(), 2);
2940        assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
2941        assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
2942    }
2943
2944    #[test]
2945    fn test_builder_log_and_stop_add_steps_in_order() {
2946        let route = RouteBuilder::from("direct:start")
2947            .route_id("log-stop-test")
2948            .log("hello", LogLevel::Info)
2949            .stop()
2950            .to("mock:after")
2951            .build()
2952            .unwrap();
2953
2954        assert_eq!(route.steps().len(), 3);
2955        assert!(matches!(
2956            &route.steps()[0],
2957            BuilderStep::Log { message, .. } if message == "hello"
2958        ));
2959        assert!(matches!(&route.steps()[1], BuilderStep::Stop));
2960        assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
2961    }
2962
2963    #[test]
2964    fn test_builder_stream_cache_default_adds_processor_step() {
2965        let route = RouteBuilder::from("direct:start")
2966            .route_id("stream-cache-default-test")
2967            .stream_cache_default()
2968            .build()
2969            .unwrap();
2970
2971        assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
2972    }
2973
2974    #[test]
2975    fn test_validate_preserves_existing_validator_prefix() {
2976        let route = RouteBuilder::from("direct:in")
2977            .route_id("validate-prefix-test")
2978            .validate("validator:schemas/order.xsd")
2979            .build()
2980            .unwrap();
2981
2982        assert!(matches!(
2983            &route.steps()[0],
2984            BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
2985        ));
2986    }
2987
2988    #[test]
2989    fn test_load_balance_builder_weighted_failover_parallel_config() {
2990        let route = RouteBuilder::from("direct:start")
2991            .route_id("lb-weighted-failover-parallel")
2992            .load_balance()
2993            .weighted(vec![
2994                ("direct:a".to_string(), 3),
2995                ("direct:b".to_string(), 1),
2996            ])
2997            .failover()
2998            .parallel(true)
2999            .to("mock:result")
3000            .end_load_balance()
3001            .build()
3002            .unwrap();
3003
3004        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3005            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3006            assert!(config.parallel);
3007        } else {
3008            panic!("Expected LoadBalance step");
3009        }
3010    }
3011
3012    #[test]
3013    fn test_multicast_builder_all_config_setters() {
3014        let route = RouteBuilder::from("direct:start")
3015            .route_id("multicast-config-test")
3016            .multicast()
3017            .parallel(true)
3018            .parallel_limit(4)
3019            .stop_on_exception(true)
3020            .timeout(Duration::from_millis(300))
3021            .aggregation(MulticastStrategy::Original)
3022            .to("mock:a")
3023            .end_multicast()
3024            .build()
3025            .unwrap();
3026
3027        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3028            assert!(config.parallel);
3029            assert_eq!(config.parallel_limit, Some(4));
3030            assert!(config.stop_on_exception);
3031            assert_eq!(config.timeout, Some(Duration::from_millis(300)));
3032            assert!(matches!(config.aggregation, MulticastStrategy::Original));
3033        } else {
3034            panic!("Expected Multicast step");
3035        }
3036    }
3037
3038    #[test]
3039    fn test_build_canonical_rejects_unsupported_processor_step() {
3040        let err = RouteBuilder::from("direct:start")
3041            .route_id("canonical-reject")
3042            .set_header("k", Value::String("v".into()))
3043            .build_canonical()
3044            .unwrap_err();
3045
3046        assert!(format!("{err}").contains("does not support step `processor`"));
3047    }
3048
3049    // ── LoadBalance strategy-specific tests ─────────────────────────────────────
3050
3051    #[test]
3052    fn test_load_balance_builder_weighted_strategy() {
3053        let route = RouteBuilder::from("direct:start")
3054            .route_id("lb-weighted")
3055            .load_balance()
3056            .weighted(vec![
3057                ("direct:a".to_string(), 5),
3058                ("direct:b".to_string(), 2),
3059                ("direct:c".to_string(), 1),
3060            ])
3061            .to("mock:result")
3062            .end_load_balance()
3063            .build()
3064            .unwrap();
3065
3066        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3067            assert!(matches!(config.strategy, LoadBalanceStrategy::Weighted(_)));
3068        } else {
3069            panic!("Expected LoadBalance step");
3070        }
3071    }
3072
3073    #[test]
3074    fn test_load_balance_builder_failover_strategy() {
3075        let route = RouteBuilder::from("direct:start")
3076            .route_id("lb-failover")
3077            .load_balance()
3078            .failover()
3079            .to("mock:primary")
3080            .end_load_balance()
3081            .build()
3082            .unwrap();
3083
3084        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3085            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3086            assert!(!config.parallel);
3087        } else {
3088            panic!("Expected LoadBalance step");
3089        }
3090    }
3091
3092    #[test]
3093    fn test_load_balance_builder_parallel_false_explicit() {
3094        let route = RouteBuilder::from("direct:start")
3095            .route_id("lb-parallel-false")
3096            .load_balance()
3097            .round_robin()
3098            .parallel(false)
3099            .to("mock:result")
3100            .end_load_balance()
3101            .build()
3102            .unwrap();
3103
3104        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3105            assert!(!config.parallel);
3106        } else {
3107            panic!("Expected LoadBalance step");
3108        }
3109    }
3110
3111    // ── FilterInSplitBuilder tests ──────────────────────────────────────────────
3112
3113    #[test]
3114    fn test_filter_in_split_builder_typestate() {
3115        use camel_api::splitter::{SplitterConfig, split_body_lines};
3116
3117        let definition = RouteBuilder::from("timer:test")
3118            .route_id("filter-in-split")
3119            .split(SplitterConfig::new(split_body_lines()))
3120            .filter(|_ex| true)
3121            .to("mock:filtered")
3122            .end_filter()
3123            .end_split()
3124            .build()
3125            .unwrap();
3126
3127        assert_eq!(definition.steps().len(), 1);
3128        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3129            assert_eq!(steps.len(), 1);
3130            assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3131        } else {
3132            panic!("Expected Split step");
3133        }
3134    }
3135
3136    #[test]
3137    fn test_filter_in_split_builder_multiple_steps() {
3138        use camel_api::splitter::{SplitterConfig, split_body_lines};
3139
3140        let definition = RouteBuilder::from("timer:test")
3141            .route_id("filter-in-split-multi")
3142            .split(SplitterConfig::new(split_body_lines()))
3143            .to("mock:before-filter")
3144            .filter(|_ex| true)
3145            .to("mock:inside-filter")
3146            .end_filter()
3147            .to("mock:after-filter")
3148            .end_split()
3149            .build()
3150            .unwrap();
3151
3152        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3153            // To("before-filter") + Filter{...} + To("after-filter") = 3
3154            assert_eq!(steps.len(), 3);
3155        } else {
3156            panic!("Expected Split step");
3157        }
3158    }
3159
3160    // ── build_canonical tests ───────────────────────────────────────────────────
3161
3162    #[test]
3163    fn test_build_canonical_with_circuit_breaker() {
3164        use camel_api::circuit_breaker::CircuitBreakerConfig;
3165
3166        let spec = RouteBuilder::from("direct:start")
3167            .route_id("canonical-cb")
3168            .circuit_breaker(CircuitBreakerConfig::new().failure_threshold(10))
3169            .to("mock:result")
3170            .build_canonical()
3171            .unwrap();
3172
3173        let cb = spec.circuit_breaker.expect("circuit breaker should be set");
3174        assert_eq!(cb.failure_threshold, 10);
3175    }
3176
3177    #[test]
3178    fn test_build_canonical_rejects_custom_split_aggregation() {
3179        use camel_api::splitter::{SplitterConfig, split_body_lines};
3180
3181        let err = RouteBuilder::from("direct:start")
3182            .route_id("canonical-custom-split")
3183            .split(SplitterConfig::new(split_body_lines()).aggregation(
3184                camel_api::splitter::AggregationStrategy::Custom(Arc::new(|_, ex| ex)),
3185            ))
3186            .to("mock:frag")
3187            .end_split()
3188            .build_canonical()
3189            .unwrap_err();
3190
3191        // Split with closure-based expression is rejected in canonical v1.
3192        assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3193    }
3194
3195    #[test]
3196    fn test_build_canonical_rejects_custom_aggregate_strategy() {
3197        let err = RouteBuilder::from("direct:start")
3198            .route_id("canonical-custom-agg")
3199            .aggregate(
3200                AggregatorConfig::correlate_by("key")
3201                    .complete_when_size(2)
3202                    .strategy(AggregationStrategy::Custom(Arc::new(|_, ex| ex)))
3203                    .build()
3204                    .unwrap(),
3205            )
3206            .build_canonical()
3207            .unwrap_err();
3208
3209        assert!(format!("{err}").contains("custom aggregate strategy"));
3210    }
3211
3212    #[test]
3213    fn test_build_canonical_rejects_fn_correlation_strategy() {
3214        let err = RouteBuilder::from("direct:start")
3215            .route_id("canonical-fn-corr")
3216            .aggregate(AggregatorConfig {
3217                header_name: "key".to_string(),
3218                completion: CompletionMode::Single(CompletionCondition::Size(1)),
3219                correlation: CorrelationStrategy::Fn(Arc::new(|_| Some("key".to_string()))),
3220                strategy: AggregationStrategy::CollectAll,
3221                max_buckets: None,
3222                bucket_ttl: None,
3223                force_completion_on_stop: false,
3224                discard_on_timeout: false,
3225            })
3226            .build_canonical()
3227            .unwrap_err();
3228
3229        assert!(format!("{err}").contains("Fn correlation strategy"));
3230    }
3231
3232    #[test]
3233    fn test_build_canonical_rejects_predicate_completion() {
3234        let err = RouteBuilder::from("direct:start")
3235            .route_id("canonical-pred-completion")
3236            .aggregate(AggregatorConfig {
3237                header_name: "key".to_string(),
3238                completion: CompletionMode::Single(CompletionCondition::Predicate(Arc::new(
3239                    |_| false,
3240                ))),
3241                correlation: CorrelationStrategy::HeaderName("key".to_string()),
3242                strategy: AggregationStrategy::CollectAll,
3243                max_buckets: None,
3244                bucket_ttl: None,
3245                force_completion_on_stop: false,
3246                discard_on_timeout: false,
3247            })
3248            .build_canonical()
3249            .unwrap_err();
3250
3251        assert!(format!("{err}").contains("predicate completion"));
3252    }
3253
3254    #[test]
3255    fn test_build_canonical_with_expression_correlation() {
3256        let spec = RouteBuilder::from("direct:start")
3257            .route_id("canonical-expr-corr")
3258            .aggregate(AggregatorConfig {
3259                header_name: "key".to_string(),
3260                completion: CompletionMode::Single(CompletionCondition::Size(1)),
3261                correlation: CorrelationStrategy::Expression {
3262                    expr: "header.key".to_string(),
3263                    language: "simple".to_string(),
3264                },
3265                strategy: AggregationStrategy::CollectAll,
3266                max_buckets: None,
3267                bucket_ttl: None,
3268                force_completion_on_stop: false,
3269                discard_on_timeout: false,
3270            })
3271            .build_canonical()
3272            .unwrap();
3273
3274        assert!(spec.steps.iter().any(|s| matches!(s, CanonicalStepSpec::Aggregate(a) if a.correlation_key == Some("header.key".to_string()))));
3275    }
3276
3277    #[test]
3278    fn test_build_canonical_split_rejected_with_closure_expression() {
3279        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
3280
3281        // Builder-based split uses closure expressions, which are not serializable.
3282        let err = RouteBuilder::from("direct:start")
3283            .route_id("canonical-split-last")
3284            .split(
3285                SplitterConfig::new(split_body_lines()).aggregation(AggregationStrategy::LastWins),
3286            )
3287            .to("mock:frag")
3288            .end_split()
3289            .build_canonical()
3290            .unwrap_err();
3291
3292        assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3293    }
3294
3295    // ── OnExceptionBuilder full chain tests ─────────────────────────────────────
3296
3297    #[test]
3298    fn test_on_exception_full_chain_retry_backoff_jitter_handled_by() {
3299        let definition = RouteBuilder::from("direct:start")
3300            .route_id("on-exception-full")
3301            .dead_letter_channel("log:dlc")
3302            .on_exception(|e| matches!(e, CamelError::Io(_)))
3303            .retry(5)
3304            .with_backoff(Duration::from_millis(10), 2.0, Duration::from_millis(500))
3305            .with_jitter(0.3)
3306            .handled_by("log:io-handler")
3307            .end_on_exception()
3308            .to("mock:out")
3309            .build()
3310            .unwrap();
3311
3312        let cfg = definition
3313            .error_handler_config()
3314            .expect("error handler should be set");
3315        assert_eq!(cfg.policies.len(), 1);
3316        let policy = &cfg.policies[0];
3317        let retry = policy.retry.as_ref().expect("retry should be set");
3318        assert_eq!(retry.max_attempts, 5);
3319        assert_eq!(retry.initial_delay, Duration::from_millis(10));
3320        assert_eq!(retry.multiplier, 2.0);
3321        assert_eq!(retry.max_delay, Duration::from_millis(500));
3322        assert!((retry.jitter_factor - 0.3).abs() < f64::EPSILON);
3323        assert_eq!(policy.handled_by.as_deref(), Some("log:io-handler"));
3324    }
3325
3326    #[test]
3327    fn test_on_exception_jitter_clamped_to_valid_range() {
3328        let definition = RouteBuilder::from("direct:start")
3329            .route_id("jitter-clamp")
3330            .on_exception(|_e| true)
3331            .retry(1)
3332            .with_jitter(5.0)
3333            .end_on_exception()
3334            .to("mock:out")
3335            .build()
3336            .unwrap();
3337
3338        let cfg = definition.error_handler_config().unwrap();
3339        let retry = cfg.policies[0].retry.as_ref().unwrap();
3340        assert!((retry.jitter_factor - 1.0).abs() < f64::EPSILON);
3341    }
3342
3343    // ── StepAccumulator: process_fn, convert_body_to, bean ──────────────────────
3344
3345    #[test]
3346    fn test_builder_process_fn_adds_processor_step() {
3347        use camel_api::BoxProcessorExt;
3348        let processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
3349        let definition = RouteBuilder::from("timer:tick")
3350            .route_id("process-fn-test")
3351            .process_fn(processor)
3352            .build()
3353            .unwrap();
3354
3355        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3356    }
3357
3358    #[test]
3359    fn test_builder_convert_body_to_adds_processor_step() {
3360        let definition = RouteBuilder::from("timer:tick")
3361            .route_id("convert-body-test")
3362            .convert_body_to(BodyType::Json)
3363            .build()
3364            .unwrap();
3365
3366        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3367    }
3368
3369    #[test]
3370    fn test_builder_bean_adds_bean_step() {
3371        let definition = RouteBuilder::from("timer:tick")
3372            .route_id("bean-test")
3373            .bean("myBean", "process")
3374            .build()
3375            .unwrap();
3376
3377        assert!(
3378            matches!(&definition.steps()[0], BuilderStep::Bean { name, method }
3379            if name == "myBean" && method == "process")
3380        );
3381    }
3382
3383    // ── Throttle strategy-specific tests ────────────────────────────────────────
3384
3385    #[test]
3386    fn test_throttle_builder_delay_strategy() {
3387        let definition = RouteBuilder::from("timer:tick")
3388            .route_id("throttle-delay")
3389            .throttle(10, Duration::from_secs(1))
3390            .strategy(ThrottleStrategy::Delay)
3391            .to("mock:result")
3392            .end_throttle()
3393            .build()
3394            .unwrap();
3395
3396        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3397            assert_eq!(config.strategy, ThrottleStrategy::Delay);
3398        } else {
3399            panic!("Expected Throttle step");
3400        }
3401    }
3402
3403    #[test]
3404    fn test_throttle_builder_drop_strategy() {
3405        let definition = RouteBuilder::from("timer:tick")
3406            .route_id("throttle-drop")
3407            .throttle(10, Duration::from_secs(1))
3408            .strategy(ThrottleStrategy::Drop)
3409            .to("mock:result")
3410            .end_throttle()
3411            .build()
3412            .unwrap();
3413
3414        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3415            assert_eq!(config.strategy, ThrottleStrategy::Drop);
3416        } else {
3417            panic!("Expected Throttle step");
3418        }
3419    }
3420
3421    // ── LoopInLoopBuilder with loop_while ───────────────────────────────────────
3422
3423    #[test]
3424    fn test_nested_loop_while_builder() {
3425        use camel_api::loop_eip::LoopMode;
3426
3427        let def = RouteBuilder::from("direct:start")
3428            .route_id("nested-loop-while")
3429            .loop_count(2)
3430            .to("mock:outer")
3431            .loop_while(|_ex| true)
3432            .to("mock:inner")
3433            .end_loop()
3434            .end_loop()
3435            .build()
3436            .unwrap();
3437
3438        assert_eq!(def.steps().len(), 1);
3439        if let BuilderStep::Loop { steps, .. } = &def.steps()[0] {
3440            assert_eq!(steps.len(), 2);
3441            if let BuilderStep::Loop { config, .. } = &steps[1] {
3442                assert!(matches!(config.mode, LoopMode::While(_)));
3443            } else {
3444                panic!("Expected inner Loop step");
3445            }
3446        } else {
3447            panic!("Expected outer Loop step");
3448        }
3449    }
3450
3451    // ── Choice with multiple whens + otherwise ──────────────────────────────────
3452
3453    #[test]
3454    fn test_choice_builder_multiple_whens_with_otherwise() {
3455        let definition = RouteBuilder::from("timer:tick")
3456            .route_id("choice-multi-otherwise")
3457            .choice()
3458            .when(|ex: &Exchange| ex.input.header("a").is_some())
3459            .to("mock:a")
3460            .end_when()
3461            .when(|ex: &Exchange| ex.input.header("b").is_some())
3462            .to("mock:b")
3463            .end_when()
3464            .when(|ex: &Exchange| ex.input.header("c").is_some())
3465            .to("mock:c")
3466            .end_when()
3467            .otherwise()
3468            .to("mock:fallback")
3469            .end_otherwise()
3470            .end_choice()
3471            .build()
3472            .unwrap();
3473
3474        if let BuilderStep::Choice { whens, otherwise } = &definition.steps()[0] {
3475            assert_eq!(whens.len(), 3);
3476            assert!(otherwise.is_some());
3477            assert_eq!(otherwise.as_ref().unwrap().len(), 1);
3478        } else {
3479            panic!("Expected Choice step");
3480        }
3481    }
3482
3483    // ── Multicast individual config tests ───────────────────────────────────────
3484
3485    #[test]
3486    fn test_multicast_builder_parallel_only() {
3487        let route = RouteBuilder::from("direct:start")
3488            .route_id("multicast-parallel")
3489            .multicast()
3490            .parallel(true)
3491            .to("mock:a")
3492            .end_multicast()
3493            .build()
3494            .unwrap();
3495
3496        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3497            assert!(config.parallel);
3498            assert_eq!(config.parallel_limit, None);
3499        } else {
3500            panic!("Expected Multicast step");
3501        }
3502    }
3503
3504    #[test]
3505    fn test_multicast_builder_timeout_only() {
3506        let route = RouteBuilder::from("direct:start")
3507            .route_id("multicast-timeout")
3508            .multicast()
3509            .timeout(Duration::from_secs(5))
3510            .to("mock:a")
3511            .end_multicast()
3512            .build()
3513            .unwrap();
3514
3515        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3516            assert_eq!(config.timeout, Some(Duration::from_secs(5)));
3517        } else {
3518            panic!("Expected Multicast step");
3519        }
3520    }
3521
3522    #[test]
3523    fn test_multicast_builder_aggregation_collect_all() {
3524        let route = RouteBuilder::from("direct:start")
3525            .route_id("multicast-collect")
3526            .multicast()
3527            .aggregation(MulticastStrategy::CollectAll)
3528            .to("mock:a")
3529            .end_multicast()
3530            .build()
3531            .unwrap();
3532
3533        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3534            assert!(matches!(config.aggregation, MulticastStrategy::CollectAll));
3535        } else {
3536            panic!("Expected Multicast step");
3537        }
3538    }
3539
3540    // ── extract_completion_fields: Any mode with multiple conditions ────────────
3541
3542    #[test]
3543    fn test_build_canonical_aggregate_any_completion_mode() {
3544        let spec = RouteBuilder::from("direct:start")
3545            .route_id("canonical-any-completion")
3546            .aggregate(
3547                AggregatorConfig::correlate_by("key")
3548                    .complete_on_size_or_timeout(10, Duration::from_secs(30))
3549                    .build()
3550                    .unwrap(),
3551            )
3552            .build_canonical()
3553            .unwrap();
3554
3555        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3556            assert_eq!(agg.completion_size, Some(10));
3557            assert_eq!(agg.completion_timeout_ms, Some(30_000));
3558        } else {
3559            panic!("Expected Aggregate step");
3560        }
3561    }
3562
3563    #[test]
3564    fn test_build_canonical_aggregate_timeout_completion() {
3565        let spec = RouteBuilder::from("direct:start")
3566            .route_id("canonical-timeout-completion")
3567            .aggregate(
3568                AggregatorConfig::correlate_by("key")
3569                    .complete_on_timeout(Duration::from_millis(500))
3570                    .build()
3571                    .unwrap(),
3572            )
3573            .build_canonical()
3574            .unwrap();
3575
3576        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3577            assert_eq!(agg.completion_size, None);
3578            assert_eq!(agg.completion_timeout_ms, Some(500));
3579        } else {
3580            panic!("Expected Aggregate step");
3581        }
3582    }
3583
3584    // ── canonicalize_aggregate: discard_on_timeout and force_completion_on_stop ─
3585
3586    #[test]
3587    fn test_build_canonical_aggregate_discard_on_timeout() {
3588        use camel_api::aggregator::AggregatorConfig;
3589
3590        let spec = RouteBuilder::from("direct:start")
3591            .route_id("canonical-discard-timeout")
3592            .aggregate(
3593                AggregatorConfig::correlate_by("key")
3594                    .complete_when_size(1)
3595                    .discard_on_timeout(true)
3596                    .build()
3597                    .unwrap(),
3598            )
3599            .build_canonical()
3600            .unwrap();
3601
3602        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3603            assert_eq!(agg.discard_on_timeout, Some(true));
3604        } else {
3605            panic!("Expected Aggregate step");
3606        }
3607    }
3608
3609    #[test]
3610    fn test_build_canonical_aggregate_force_completion_on_stop() {
3611        use camel_api::aggregator::AggregatorConfig;
3612
3613        let spec = RouteBuilder::from("direct:start")
3614            .route_id("canonical-force-stop")
3615            .aggregate(
3616                AggregatorConfig::correlate_by("key")
3617                    .complete_when_size(1)
3618                    .force_completion_on_stop(true)
3619                    .build()
3620                    .unwrap(),
3621            )
3622            .build_canonical()
3623            .unwrap();
3624
3625        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3626            assert_eq!(agg.force_completion_on_stop, Some(true));
3627        } else {
3628            panic!("Expected Aggregate step");
3629        }
3630    }
3631
3632    // ── build_canonical: max_buckets and bucket_ttl ─────────────────────────────
3633
3634    #[test]
3635    fn test_build_canonical_aggregate_max_buckets_and_ttl() {
3636        use camel_api::aggregator::AggregatorConfig;
3637
3638        let spec = RouteBuilder::from("direct:start")
3639            .route_id("canonical-buckets-ttl")
3640            .aggregate(
3641                AggregatorConfig::correlate_by("key")
3642                    .complete_when_size(1)
3643                    .max_buckets(100)
3644                    .bucket_ttl(Duration::from_secs(60))
3645                    .build()
3646                    .unwrap(),
3647            )
3648            .build_canonical()
3649            .unwrap();
3650
3651        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3652            assert_eq!(agg.max_buckets, Some(100));
3653            assert_eq!(agg.bucket_ttl_ms, Some(60_000));
3654        } else {
3655            panic!("Expected Aggregate step");
3656        }
3657    }
3658
3659    // ── SplitBuilder with filter inside ─────────────────────────────────────────
3660
3661    #[test]
3662    fn test_split_builder_with_filter_inside() {
3663        use camel_api::splitter::{SplitterConfig, split_body_lines};
3664
3665        let definition = RouteBuilder::from("timer:test")
3666            .route_id("split-with-filter")
3667            .split(SplitterConfig::new(split_body_lines()))
3668            .filter(|_ex| true)
3669            .to("mock:filtered-frag")
3670            .end_filter()
3671            .end_split()
3672            .build()
3673            .unwrap();
3674
3675        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3676            assert_eq!(steps.len(), 1);
3677            assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3678        } else {
3679            panic!("Expected Split step");
3680        }
3681    }
3682
3683    // ── WireTap additional tests ────────────────────────────────────────────────
3684
3685    #[test]
3686    fn test_wire_tap_multiple_taps() {
3687        let definition = RouteBuilder::from("timer:tick")
3688            .route_id("multi-wire-tap")
3689            .wire_tap("mock:tap1")
3690            .wire_tap("mock:tap2")
3691            .to("mock:result")
3692            .build()
3693            .unwrap();
3694
3695        assert_eq!(definition.steps().len(), 3);
3696        assert!(
3697            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap1")
3698        );
3699        assert!(
3700            matches!(&definition.steps()[1], BuilderStep::WireTap { uri } if uri == "mock:tap2")
3701        );
3702    }
3703
3704    // ── Error handler: explicit config after shorthand → Mixed mode ─────────────
3705
3706    #[test]
3707    fn test_builder_shorthand_then_explicit_mixed_mode() {
3708        let result = RouteBuilder::from("direct:start")
3709            .route_id("mixed-mode-2")
3710            .dead_letter_channel("log:dlc")
3711            .error_handler(ErrorHandlerConfig::log_only())
3712            .to("mock:out")
3713            .build();
3714
3715        let err = result.err().expect("mixed mode should fail");
3716        assert!(format!("{err}").contains("mixed error handler modes"));
3717    }
3718
3719    // ── build_canonical: empty from_uri error ───────────────────────────────────
3720
3721    #[test]
3722    fn test_build_canonical_empty_from_uri_errors() {
3723        let result = RouteBuilder::from("").route_id("test").build_canonical();
3724        assert!(result.is_err());
3725    }
3726
3727    #[test]
3728    fn test_build_canonical_missing_route_id_errors() {
3729        let result = RouteBuilder::from("direct:start").build_canonical();
3730        assert!(result.is_err());
3731        let err = result.unwrap_err().to_string();
3732        assert!(err.contains("route_id"));
3733    }
3734
3735    // ── SplitBuilder: aggregate inside split ────────────────────────────────────
3736
3737    #[test]
3738    fn test_split_builder_with_aggregate_inside() {
3739        use camel_api::aggregator::AggregatorConfig;
3740        use camel_api::splitter::{SplitterConfig, split_body_lines};
3741
3742        let definition = RouteBuilder::from("timer:test")
3743            .route_id("split-agg")
3744            .split(SplitterConfig::new(split_body_lines()))
3745            .aggregate(
3746                AggregatorConfig::correlate_by("frag-key")
3747                    .complete_when_size(3)
3748                    .build()
3749                    .unwrap(),
3750            )
3751            .end_split()
3752            .build()
3753            .unwrap();
3754
3755        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3756            assert_eq!(steps.len(), 1);
3757            assert!(matches!(&steps[0], BuilderStep::Aggregate { .. }));
3758        } else {
3759            panic!("Expected Split step");
3760        }
3761    }
3762
3763    // ── Throttle: steps collected inside throttle scope ─────────────────────────
3764
3765    #[test]
3766    fn test_throttle_builder_with_steps_inside() {
3767        let definition = RouteBuilder::from("timer:tick")
3768            .route_id("throttle-steps")
3769            .throttle(10, Duration::from_secs(1))
3770            .set_header("throttled", Value::Bool(true))
3771            .to("mock:throttled")
3772            .end_throttle()
3773            .build()
3774            .unwrap();
3775
3776        if let BuilderStep::Throttle { steps, .. } = &definition.steps()[0] {
3777            assert_eq!(steps.len(), 2);
3778        } else {
3779            panic!("Expected Throttle step");
3780        }
3781    }
3782
3783    // ── LoadBalance: steps collected inside scope ───────────────────────────────
3784
3785    #[test]
3786    fn test_load_balance_builder_with_steps_inside() {
3787        let definition = RouteBuilder::from("timer:tick")
3788            .route_id("lb-steps")
3789            .load_balance()
3790            .round_robin()
3791            .set_header("lb", Value::Bool(true))
3792            .to("mock:lb")
3793            .end_load_balance()
3794            .build()
3795            .unwrap();
3796
3797        if let BuilderStep::LoadBalance { steps, .. } = &definition.steps()[0] {
3798            assert_eq!(steps.len(), 2);
3799        } else {
3800            panic!("Expected LoadBalance step");
3801        }
3802    }
3803
3804    // ── Multicast: steps collected inside scope ─────────────────────────────────
3805
3806    #[test]
3807    fn test_multicast_builder_with_steps_inside() {
3808        let definition = RouteBuilder::from("timer:tick")
3809            .route_id("multicast-steps")
3810            .multicast()
3811            .set_header("mc", Value::Bool(true))
3812            .to("mock:multicast")
3813            .end_multicast()
3814            .build()
3815            .unwrap();
3816
3817        if let BuilderStep::Multicast { steps, .. } = &definition.steps()[0] {
3818            assert_eq!(steps.len(), 2);
3819        } else {
3820            panic!("Expected Multicast step");
3821        }
3822    }
3823
3824    // ── LoopBuilder: steps collected inside loop scope ──────────────────────────
3825
3826    #[test]
3827    fn test_loop_builder_with_steps_inside() {
3828        let definition = RouteBuilder::from("timer:tick")
3829            .route_id("loop-steps")
3830            .loop_count(3)
3831            .set_header("loop", Value::Bool(true))
3832            .to("mock:loop")
3833            .end_loop()
3834            .build()
3835            .unwrap();
3836
3837        if let BuilderStep::Loop { steps, .. } = &definition.steps()[0] {
3838            assert_eq!(steps.len(), 2);
3839        } else {
3840            panic!("Expected Loop step");
3841        }
3842    }
3843
3844    // ── canonical_step_name coverage for remaining variants ─────────────────────
3845
3846    #[test]
3847    fn test_build_canonical_rejects_loop_step() {
3848        let err = RouteBuilder::from("direct:start")
3849            .route_id("canonical-loop")
3850            .loop_count(3)
3851            .to("mock:loop")
3852            .end_loop()
3853            .build_canonical()
3854            .unwrap_err();
3855
3856        assert!(format!("{err}").contains("does not support step `loop`"));
3857    }
3858
3859    #[test]
3860    fn test_build_canonical_rejects_multicast_step() {
3861        let err = RouteBuilder::from("direct:start")
3862            .route_id("canonical-multicast")
3863            .multicast()
3864            .to("mock:a")
3865            .end_multicast()
3866            .build_canonical()
3867            .unwrap_err();
3868
3869        assert!(format!("{err}").contains("does not support step `multicast`"));
3870    }
3871
3872    #[test]
3873    fn test_build_canonical_rejects_throttle_step() {
3874        let err = RouteBuilder::from("direct:start")
3875            .route_id("canonical-throttle")
3876            .throttle(10, Duration::from_secs(1))
3877            .to("mock:result")
3878            .end_throttle()
3879            .build_canonical()
3880            .unwrap_err();
3881
3882        assert!(format!("{err}").contains("does not support step `throttle`"));
3883    }
3884
3885    #[test]
3886    fn test_build_canonical_rejects_load_balancer_step() {
3887        let err = RouteBuilder::from("direct:start")
3888            .route_id("canonical-lb")
3889            .load_balance()
3890            .round_robin()
3891            .to("mock:result")
3892            .end_load_balance()
3893            .build_canonical()
3894            .unwrap_err();
3895
3896        assert!(format!("{err}").contains("does not support step `load_balancer`"));
3897    }
3898
3899    #[test]
3900    fn test_build_canonical_rejects_bean_step() {
3901        let err = RouteBuilder::from("direct:start")
3902            .route_id("canonical-bean")
3903            .bean("myBean", "process")
3904            .build_canonical()
3905            .unwrap_err();
3906
3907        assert!(format!("{err}").contains("does not support step `bean`"));
3908    }
3909
3910    #[test]
3911    fn test_build_canonical_rejects_script_step() {
3912        let err = RouteBuilder::from("direct:start")
3913            .route_id("canonical-script")
3914            .script("rhai", "x = 1")
3915            .build_canonical()
3916            .unwrap_err();
3917
3918        assert!(format!("{err}").contains("does not support step `script`"));
3919    }
3920
3921    #[test]
3922    fn test_build_canonical_accepts_delay_step() {
3923        let spec = RouteBuilder::from("direct:start")
3924            .route_id("canonical-delay")
3925            .delay(Duration::from_millis(100))
3926            .build_canonical()
3927            .unwrap();
3928
3929        assert!(
3930            spec.steps.iter().any(
3931                |s| matches!(s, CanonicalStepSpec::Delay { delay_ms, .. } if *delay_ms == 100)
3932            )
3933        );
3934    }
3935
3936    #[test]
3937    fn test_build_canonical_accepts_wire_tap_step() {
3938        let spec = RouteBuilder::from("direct:start")
3939            .route_id("canonical-wiretap")
3940            .wire_tap("mock:tap")
3941            .build_canonical()
3942            .unwrap();
3943
3944        assert!(
3945            spec.steps
3946                .iter()
3947                .any(|s| matches!(s, CanonicalStepSpec::WireTap { uri } if uri == "mock:tap"))
3948        );
3949    }
3950
3951    #[test]
3952    fn test_build_canonical_rejects_dynamic_router_step() {
3953        let err = RouteBuilder::from("direct:start")
3954            .route_id("canonical-dyn-router")
3955            .dynamic_router(Arc::new(|_| Some("mock:a".to_string())))
3956            .build_canonical()
3957            .unwrap_err();
3958
3959        assert!(format!("{err}").contains("does not support step `dynamic_router`"));
3960    }
3961
3962    #[test]
3963    fn test_build_canonical_rejects_routing_slip_step() {
3964        let err = RouteBuilder::from("direct:start")
3965            .route_id("canonical-routing-slip")
3966            .routing_slip(Arc::new(|_| Some("mock:a".to_string())))
3967            .build_canonical()
3968            .unwrap_err();
3969
3970        assert!(format!("{err}").contains("does not support step `routing_slip`"));
3971    }
3972
3973    #[test]
3974    fn test_build_canonical_rejects_recipient_list_step() {
3975        let err = RouteBuilder::from("direct:start")
3976            .route_id("canonical-recipient")
3977            .recipient_list(Arc::new(|_| "mock:a".to_string()))
3978            .build_canonical()
3979            .unwrap_err();
3980
3981        assert!(format!("{err}").contains("does not support step `recipient_list`"));
3982    }
3983
3984    // ── extract_completion_fields: Any mode with predicate → error ──────────────
3985
3986    #[test]
3987    fn test_build_canonical_rejects_any_mode_with_predicate() {
3988        let err = RouteBuilder::from("direct:start")
3989            .route_id("canonical-any-pred")
3990            .aggregate(AggregatorConfig {
3991                header_name: "key".to_string(),
3992                completion: CompletionMode::Any(vec![
3993                    CompletionCondition::Size(5),
3994                    CompletionCondition::Predicate(Arc::new(|_| false)),
3995                ]),
3996                correlation: CorrelationStrategy::HeaderName("key".to_string()),
3997                strategy: AggregationStrategy::CollectAll,
3998                max_buckets: None,
3999                bucket_ttl: None,
4000                force_completion_on_stop: false,
4001                discard_on_timeout: false,
4002            })
4003            .build_canonical()
4004            .unwrap_err();
4005
4006        assert!(format!("{err}").contains("predicate completion"));
4007    }
4008
4009    // ── BUILDER-004: Validation errors for missing required fields ────────────
4010
4011    #[test]
4012    fn test_builder_validation_missing_from_uri() {
4013        let result = RouteBuilder::from("")
4014            .route_id("missing-uri-route")
4015            .to("log:info")
4016            .build();
4017        assert!(result.is_err(), "empty from URI should fail validation");
4018        let err = result.err().unwrap().to_string();
4019        assert!(
4020            err.contains("'from'") || err.contains("URI"),
4021            "error should mention from/URI, got: {err}"
4022        );
4023    }
4024
4025    #[test]
4026    fn test_builder_validation_invalid_step_uri_scheme() {
4027        let result = RouteBuilder::from("timer:tick")
4028            .route_id("bad-step-route")
4029            .to("not-a-valid-uri") // no scheme
4030            .build();
4031        // The builder itself accepts any URI string; validation happens at
4032        // resolution time. Verify the build succeeds (step URI is deferred).
4033        assert!(
4034            result.is_ok(),
4035            "builder should accept opaque step URIs; resolution happens later"
4036        );
4037    }
4038
4039    // ── BUILDER-006: Duplicate route IDs ──────────────────────────────────────
4040
4041    #[test]
4042    fn test_builder_duplicate_route_ids_produce_identical_definitions() {
4043        // The builder itself doesn't check for duplicates (that's context-level).
4044        // Verify both builds succeed with the same ID — detection is TODO(BUILDER-006).
4045        let route1 = RouteBuilder::from("direct:a")
4046            .route_id("dup-route")
4047            .to("mock:out")
4048            .build();
4049        let route2 = RouteBuilder::from("direct:b")
4050            .route_id("dup-route")
4051            .to("mock:out")
4052            .build();
4053
4054        assert!(route1.is_ok());
4055        assert!(route2.is_ok());
4056        assert_eq!(route1.unwrap().route_id(), route2.unwrap().route_id());
4057    }
4058}