Skip to main content

camel_builder/
lib.rs

1//! Fluent builder API for constructing Camel routes programmatically with EIP patterns.
2//!
3//! Main types: `RouteBuilder`, `StepAccumulator`, `SplitBuilder`, `ChoiceBuilder`, `MulticastBuilder`,
4//! `ThrottleBuilder`, `LoopBuilder`, `LoadBalancerBuilder`, `OnExceptionBuilder`.
5
6use camel_api::DelayConfig;
7use camel_api::aggregator::{
8    AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
9};
10use camel_api::body::Body;
11use camel_api::body_converter::BodyType;
12use camel_api::circuit_breaker::CircuitBreakerConfig;
13use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
14use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
15use camel_api::load_balancer::LoadBalancerConfig;
16use camel_api::loop_eip::{LoopConfig, LoopMode};
17use camel_api::multicast::{MulticastConfig, MulticastStrategy};
18use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
19use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
20use camel_api::splitter::SplitterConfig;
21use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
22use camel_api::{
23    BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
24    ProcessorFn, Value,
25    runtime::{
26        CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
27        CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
28        CanonicalWhenSpec,
29    },
30};
31use camel_component_api::ConcurrencyModel;
32use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
33use camel_processor::{
34    ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
35    StreamCacheService, UnmarshalService, builtin_data_format,
36};
37
38/// Shared step-accumulation methods for all builder types.
39///
40/// Implementors provide `steps_mut()` and get step-adding methods for free.
41/// `filter()` and other branching methods are NOT included — they return
42/// different types per builder and stay as per-builder methods.
43pub trait StepAccumulator: Sized {
44    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
45
46    fn to(mut self, endpoint: impl Into<String>) -> Self {
47        self.steps_mut().push(BuilderStep::To(endpoint.into()));
48        self
49    }
50
51    fn process<F, Fut>(mut self, f: F) -> Self
52    where
53        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
54        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
55    {
56        let svc = ProcessorFn::new(f);
57        self.steps_mut()
58            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
59        self
60    }
61
62    fn process_fn(mut self, processor: BoxProcessor) -> Self {
63        self.steps_mut().push(BuilderStep::Processor(processor));
64        self
65    }
66
67    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
68        let svc = SetHeader::new(IdentityProcessor, key, value);
69        self.steps_mut()
70            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
71        self
72    }
73
74    fn map_body<F>(mut self, mapper: F) -> Self
75    where
76        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
77    {
78        let svc = MapBody::new(IdentityProcessor, mapper);
79        self.steps_mut()
80            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
81        self
82    }
83
84    fn set_body<B>(mut self, body: B) -> Self
85    where
86        B: Into<Body> + Clone + Send + Sync + 'static,
87    {
88        let body: Body = body.into();
89        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
90        self.steps_mut()
91            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
92        self
93    }
94
95    /// Apache Camel-compatible alias for [`set_body`](Self::set_body).
96    ///
97    /// Transforms the message body using the given value. Semantically identical
98    /// to `set_body` — provided for familiarity with Apache Camel route DSLs.
99    fn transform<B>(self, body: B) -> Self
100    where
101        B: Into<Body> + Clone + Send + Sync + 'static,
102    {
103        self.set_body(body)
104    }
105
106    fn set_body_fn<F>(mut self, expr: F) -> Self
107    where
108        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
109    {
110        let svc = SetBody::new(IdentityProcessor, expr);
111        self.steps_mut()
112            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
113        self
114    }
115
116    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
117    where
118        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
119    {
120        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
121        self.steps_mut()
122            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
123        self
124    }
125
126    fn aggregate(mut self, config: AggregatorConfig) -> Self {
127        self.steps_mut().push(BuilderStep::Aggregate { config });
128        self
129    }
130
131    /// Stop processing this exchange immediately. No further steps in the
132    /// current pipeline will run.
133    ///
134    /// Can be used at any point in the route: directly on RouteBuilder,
135    /// inside `.filter()`, inside `.split()`, etc.
136    fn stop(mut self) -> Self {
137        self.steps_mut().push(BuilderStep::Stop);
138        self
139    }
140
141    fn delay(mut self, duration: std::time::Duration) -> Self {
142        self.steps_mut().push(BuilderStep::Delay {
143            config: DelayConfig::from_duration(duration),
144        });
145        self
146    }
147
148    fn delay_with_header(
149        mut self,
150        duration: std::time::Duration,
151        header: impl Into<String>,
152    ) -> Self {
153        self.steps_mut().push(BuilderStep::Delay {
154            config: DelayConfig::from_duration_with_header(duration, header),
155        });
156        self
157    }
158
159    /// Log a message at the specified level.
160    ///
161    /// The message will be logged when an exchange passes through this step.
162    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
163        self.steps_mut().push(BuilderStep::Log {
164            level,
165            message: message.into(),
166        });
167        self
168    }
169
170    /// Convert the message body to the target type.
171    ///
172    /// Supported: Text ↔ Json ↔ Bytes. `Body::Stream` always fails.
173    /// Returns `TypeConversionFailed` if conversion is not possible.
174    ///
175    /// # Example
176    /// ```ignore
177    /// route.set_body(Value::String(r#"{"x":1}"#.into()))
178    ///      .convert_body_to(BodyType::Json)
179    ///      .to("direct:next")
180    /// ```
181    fn convert_body_to(mut self, target: BodyType) -> Self {
182        let svc = ConvertBodyTo::new(IdentityProcessor, target);
183        self.steps_mut()
184            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
185        self
186    }
187
188    fn stream_cache(mut self, threshold: usize) -> Self {
189        let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
190        let svc = StreamCacheService::new(IdentityProcessor, config);
191        self.steps_mut()
192            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
193        self
194    }
195
196    /// Materialize `Body::Stream` into `Body::Bytes` using the default threshold (128 KB).
197    ///
198    /// Equivalent to `.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)`.
199    fn stream_cache_default(self) -> Self {
200        self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
201    }
202
203    /// Marshal the message body using the specified data format.
204    ///
205    /// Supported formats: `"json"`, `"xml"`. Returns `Err(CamelError::Config)` if
206    /// the format name is unknown.
207    /// Converts a structured body (e.g., `Body::Json`) to a wire-format body (e.g., `Body::Text`).
208    ///
209    /// # Example
210    /// ```ignore
211    /// route.marshal("json")?.to("direct:next")
212    /// ```
213    fn marshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
214        let name = format.into();
215        let df = builtin_data_format(&name)
216            .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
217        let svc = MarshalService::new(IdentityProcessor, df);
218        self.steps_mut()
219            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
220        Ok(self)
221    }
222
223    /// Unmarshal the message body using the specified data format.
224    ///
225    /// Supported formats: `"json"`, `"xml"`. Returns `Err(CamelError::Config)` if
226    /// the format name is unknown.
227    /// Converts a wire-format body (e.g., `Body::Text`) to a structured body (e.g., `Body::Json`).
228    ///
229    /// # Example
230    /// ```ignore
231    /// route.unmarshal("json")?.to("direct:next")
232    /// ```
233    fn unmarshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
234        let name = format.into();
235        let df = builtin_data_format(&name)
236            .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
237        let svc = UnmarshalService::new(IdentityProcessor, df);
238        self.steps_mut()
239            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
240        Ok(self)
241    }
242
243    /// Validate the exchange body against a schema file.
244    ///
245    /// Shorthand for `to("validator:path")`. Supports XSD, JSON Schema, and YAML.
246    ///
247    /// # Example
248    /// ```ignore
249    /// route.validate("schemas/order.xsd").to("direct:out")
250    /// ```
251    fn validate(mut self, schema_path: impl Into<String>) -> Self {
252        let path = schema_path.into();
253        let uri = if path.starts_with("validator:") {
254            path
255        } else {
256            format!("validator:{path}")
257        };
258        self.steps_mut().push(BuilderStep::To(uri));
259        self
260    }
261
262    /// Execute a script that can modify the exchange (headers, properties, body).
263    ///
264    /// The script has access to `headers`, `properties`, and `body` variables
265    /// and can modify them with assignment syntax: `headers["k"] = v`.
266    ///
267    /// # Example
268    /// ```ignore
269    /// // ignore: requires full CamelContext setup with registered language
270    /// route.script("rhai", r#"headers["tenant"] = "acme"; body = body + "_processed""#)
271    /// ```
272    fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
273        self.steps_mut().push(BuilderStep::Script {
274            language: language.into(),
275            script: script.into(),
276        });
277        self
278    }
279
280    /// EIP-7 enrich: synchronous content enrichment via a resolved producer.
281    ///
282    /// Calls the given endpoint URI as a producer, then merges the response
283    /// back into the original exchange body using the default `UseEnrichedBody`
284    /// strategy (original headers/properties are preserved).
285    fn enrich(mut self, uri: impl Into<String>) -> Self {
286        self.steps_mut().push(BuilderStep::Enrich {
287            uri: uri.into(),
288            strategy: None,
289            timeout_ms: None,
290        });
291        self
292    }
293
294    /// EIP-7 pollEnrich: blocking poll of a PollingConsumer with timeout.
295    ///
296    /// Reads from a polling endpoint (e.g., file) and merges the result
297    /// into the exchange body using the default `UseEnrichedBody` strategy.
298    /// `timeout_ms` controls how long to wait for data.
299    fn poll_enrich(mut self, uri: impl Into<String>, timeout_ms: u64) -> Self {
300        self.steps_mut().push(BuilderStep::PollEnrich {
301            uri: uri.into(),
302            strategy: None,
303            timeout_ms: Some(timeout_ms),
304        });
305        self
306    }
307
308    fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
309        self.steps_mut().push(BuilderStep::Bean {
310            name: name.into(),
311            method: method.into(),
312        });
313        self
314    }
315}
316
317/// A fluent builder for constructing routes.
318///
319/// # Example
320///
321/// ```ignore
322/// let definition = RouteBuilder::from("timer:tick?period=1000")
323///     .set_header("source", Value::String("timer".into()))
324///     .filter(|ex| ex.input.body.as_text().is_some())
325///     .to("log:info?showHeaders=true")
326///     .build()?;
327/// ```
328// TODO(BUILDER-003): RouteBuilder does not support clone-and-reuse semantics.
329// Once built, the builder is consumed. Add `Clone` or a `fork()` method to allow
330// reusing a partially-built route as a template for multiple routes.
331pub struct RouteBuilder {
332    from_uri: String,
333    steps: Vec<BuilderStep>,
334    error_handler: Option<ErrorHandlerConfig>,
335    error_handler_mode: ErrorHandlerMode,
336    circuit_breaker_config: Option<CircuitBreakerConfig>,
337    security_policy_config: Option<camel_api::security_policy::SecurityPolicyConfig>,
338    security_authenticator: Option<std::sync::Arc<dyn camel_auth::TokenAuthenticator>>,
339    concurrency: Option<ConcurrencyModel>,
340    route_id: Option<String>,
341    auto_startup: Option<bool>,
342    startup_order: Option<i32>,
343}
344
345#[derive(Default)]
346enum ErrorHandlerMode {
347    #[default]
348    None,
349    ExplicitConfig,
350    Shorthand {
351        dlc_uri: Option<String>,
352        specs: Vec<OnExceptionSpec>,
353    },
354    Mixed,
355}
356
357#[derive(Clone)]
358struct OnExceptionSpec {
359    matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
360    retry: Option<RedeliveryPolicy>,
361    handled_by: Option<String>,
362}
363
364impl RouteBuilder {
365    /// Start building a route from the given source endpoint URI.
366    pub fn from(endpoint: &str) -> Self {
367        Self {
368            from_uri: endpoint.to_string(),
369            steps: Vec::new(),
370            error_handler: None,
371            error_handler_mode: ErrorHandlerMode::None,
372            circuit_breaker_config: None,
373            security_policy_config: None,
374            security_authenticator: None,
375            concurrency: None,
376            route_id: None,
377            auto_startup: None,
378            startup_order: None,
379        }
380    }
381
382    /// Open a filter scope. Only exchanges matching `predicate` will be processed
383    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
384    /// and continue to steps after `.end_filter()`.
385    pub fn filter<F>(self, predicate: F) -> FilterBuilder
386    where
387        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
388    {
389        FilterBuilder {
390            parent: self,
391            predicate: std::sync::Arc::new(predicate),
392            steps: vec![],
393        }
394    }
395
396    /// Open a choice scope for content-based routing.
397    ///
398    /// Within the choice, you can define multiple `.when()` clauses and an
399    /// optional `.otherwise()` clause. The first matching `when` predicate
400    /// determines which sub-pipeline executes.
401    pub fn choice(self) -> ChoiceBuilder {
402        ChoiceBuilder {
403            parent: self,
404            whens: vec![],
405            _otherwise: None,
406        }
407    }
408
409    /// Add a WireTap step that sends a clone of the exchange to the given
410    /// endpoint URI (fire-and-forget). The original exchange continues
411    /// downstream unchanged.
412    pub fn wire_tap(mut self, endpoint: &str) -> Self {
413        self.steps.push(BuilderStep::WireTap {
414            uri: endpoint.to_string(),
415        });
416        self
417    }
418
419    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
420    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
421        self.error_handler_mode = match self.error_handler_mode {
422            ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
423                ErrorHandlerMode::ExplicitConfig
424            }
425            ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
426        };
427        self.error_handler = Some(config);
428        self
429    }
430
431    /// Set a dead letter channel URI for shorthand error handler mode.
432    pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
433        let uri = uri.into();
434        self.error_handler_mode = match self.error_handler_mode {
435            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
436                dlc_uri: Some(uri),
437                specs: Vec::new(),
438            },
439            ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
440                dlc_uri: Some(uri),
441                specs,
442            },
443            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
444        };
445        self
446    }
447
448    /// Add a shorthand exception policy scope. Call `.end_on_exception()` to return to route builder.
449    pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
450    where
451        F: Fn(&CamelError) -> bool + Send + Sync + 'static,
452    {
453        self.error_handler_mode = match self.error_handler_mode {
454            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
455                dlc_uri: None,
456                specs: Vec::new(),
457            },
458            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
459            shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
460        };
461
462        OnExceptionBuilder {
463            parent: self,
464            policy: OnExceptionSpec {
465                matches: std::sync::Arc::new(matches),
466                retry: None,
467                handled_by: None,
468            },
469        }
470    }
471
472    /// Set a circuit breaker for this route.
473    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
474        self.circuit_breaker_config = Some(config);
475        self
476    }
477
478    pub fn security_policy(
479        mut self,
480        config: camel_api::security_policy::SecurityPolicyConfig,
481    ) -> Self {
482        self.security_policy_config = Some(config);
483        self
484    }
485
486    pub fn security_authenticator(
487        mut self,
488        auth: std::sync::Arc<dyn camel_auth::TokenAuthenticator>,
489    ) -> Self {
490        self.security_authenticator = Some(auth);
491        self
492    }
493
494    /// Override the consumer's default concurrency model.
495    ///
496    /// When set, the pipeline spawns a task per exchange, processing them
497    /// concurrently. `max` limits the number of simultaneously active
498    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
499    ///
500    /// # Example
501    /// ```ignore
502    /// RouteBuilder::from("http://0.0.0.0:8080/api")
503    ///     .concurrent(16)  // max 16 in-flight pipeline executions
504    ///     .process(handle_request)
505    ///     .build()
506    /// ```
507    pub fn concurrent(mut self, max: usize) -> Self {
508        let max = if max == 0 { None } else { Some(max) };
509        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
510        self
511    }
512
513    /// Force sequential processing, overriding a concurrent-capable consumer.
514    ///
515    /// Useful for HTTP routes that mutate shared state and need ordering
516    /// guarantees.
517    pub fn sequential(mut self) -> Self {
518        self.concurrency = Some(ConcurrencyModel::Sequential);
519        self
520    }
521
522    /// Set the route ID for this route.
523    ///
524    /// If not set, the route will be assigned an auto-generated ID.
525    pub fn route_id(mut self, id: impl Into<String>) -> Self {
526        self.route_id = Some(id.into());
527        self
528    }
529
530    /// Set whether this route should automatically start when the context starts.
531    ///
532    /// Default is `true`.
533    pub fn auto_startup(mut self, auto: bool) -> Self {
534        self.auto_startup = Some(auto);
535        self
536    }
537
538    /// Set the startup order for this route.
539    ///
540    /// Routes with lower values start first. Default is 1000.
541    pub fn startup_order(mut self, order: i32) -> Self {
542        self.startup_order = Some(order);
543        self
544    }
545
546    /// Begin a Splitter sub-pipeline. Steps added after this call (until
547    /// `.end_split()`) will be executed per-fragment.
548    ///
549    /// Returns a `SplitBuilder` — you cannot call `.build()` until
550    /// `.end_split()` closes the split scope (enforced by the type system).
551    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
552        SplitBuilder {
553            parent: self,
554            config,
555            steps: Vec::new(),
556        }
557    }
558
559    /// Begin a Multicast sub-pipeline. Steps added after this call (until
560    /// `.end_multicast()`) will each receive a copy of the exchange.
561    ///
562    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
563    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
564    pub fn multicast(self) -> MulticastBuilder {
565        MulticastBuilder {
566            parent: self,
567            steps: Vec::new(),
568            config: MulticastConfig::new(),
569        }
570    }
571
572    /// Begin a Throttle sub-pipeline. Rate limits message processing to at most
573    /// `max_requests` per `period`. Steps inside the throttle scope are only
574    /// executed when the rate limit allows.
575    ///
576    /// Returns a `ThrottleBuilder` — you cannot call `.build()` until
577    /// `.end_throttle()` closes the throttle scope (enforced by the type system).
578    pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
579        ThrottleBuilder {
580            parent: self,
581            config: ThrottlerConfig::new(max_requests, period),
582            steps: Vec::new(),
583        }
584    }
585
586    /// Begin a Loop sub-pipeline that iterates a fixed number of times.
587    pub fn loop_count(self, count: usize) -> LoopBuilder {
588        LoopBuilder {
589            parent: self,
590            config: LoopConfig {
591                mode: LoopMode::Count(count),
592            },
593            steps: vec![],
594        }
595    }
596
597    /// Begin a Loop sub-pipeline that iterates while a predicate is true.
598    pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
599    where
600        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
601    {
602        LoopBuilder {
603            parent: self,
604            config: LoopConfig {
605                mode: LoopMode::While(std::sync::Arc::new(predicate)),
606            },
607            steps: vec![],
608        }
609    }
610
611    /// Begin a LoadBalance sub-pipeline. Distributes exchanges across multiple
612    /// endpoints using a configurable strategy (round-robin, random, weighted, failover).
613    ///
614    /// Returns a `LoadBalancerBuilder` — you cannot call `.build()` until
615    /// `.end_load_balance()` closes the load balance scope (enforced by the type system).
616    pub fn load_balance(self) -> LoadBalancerBuilder {
617        LoadBalancerBuilder {
618            parent: self,
619            config: LoadBalancerConfig::round_robin(),
620            steps: Vec::new(),
621        }
622    }
623
624    /// Add a dynamic router step that routes exchanges dynamically based on
625    /// expression evaluation at runtime.
626    ///
627    /// The expression receives the exchange and returns `Some(uri)` to route to
628    /// the next endpoint, or `None` to stop routing.
629    ///
630    /// # Example
631    /// ```ignore
632    /// RouteBuilder::from("timer:tick")
633    ///     .route_id("test-route")
634    ///     .dynamic_router(|ex| {
635    ///         ex.input.header("dest").and_then(|v| v.as_str().map(|s| s.to_string()))
636    ///     })
637    ///     .build()
638    /// ```
639    pub fn dynamic_router(self, expression: RouterExpression) -> Self {
640        self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
641    }
642
643    /// Add a dynamic router step with full configuration.
644    ///
645    /// Allows customization of URI delimiter, cache size, timeout, and other options.
646    pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
647        self.steps.push(BuilderStep::DynamicRouter { config });
648        self
649    }
650
651    pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
652        self.routing_slip_with_config(RoutingSlipConfig::new(expression))
653    }
654
655    pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
656        self.steps.push(BuilderStep::RoutingSlip { config });
657        self
658    }
659
660    pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
661        self.recipient_list_with_config(RecipientListConfig::new(expression))
662    }
663
664    pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
665        self.steps.push(BuilderStep::RecipientList { config });
666        self
667    }
668
669    /// Consume the builder and produce a [`RouteDefinition`].
670    // TODO(BUILDER-006): Validate duplicate route IDs. When a route with the same
671    // ID is already registered in the context, return `Err(CamelError::RouteError)`.
672    // Currently, duplicate IDs are silently accepted; detection should happen at
673    // `CamelContext::add_route_definition` time.
674    pub fn build(self) -> Result<RouteDefinition, CamelError> {
675        validate_uri(&self.from_uri)?;
676        let route_id = self
677            .route_id
678            .filter(|s| !s.trim().is_empty())
679            .ok_or_else(|| {
680                CamelError::RouteError(
681                    "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
682                        .to_string(),
683                )
684            })?;
685        let resolved_error_handler = match self.error_handler_mode {
686            ErrorHandlerMode::None => self.error_handler,
687            ErrorHandlerMode::ExplicitConfig => self.error_handler,
688            ErrorHandlerMode::Mixed => {
689                return Err(CamelError::RouteError(
690                    "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
691                ));
692            }
693            ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
694                let mut config = if let Some(uri) = dlc_uri {
695                    ErrorHandlerConfig::dead_letter_channel(uri)
696                } else {
697                    ErrorHandlerConfig::log_only()
698                };
699
700                for spec in specs {
701                    let matcher = spec.matches.clone();
702                    let mut builder = config.on_exception(move |e| matcher(e));
703
704                    if let Some(retry) = spec.retry {
705                        builder = builder.retry(retry.max_attempts).with_backoff(
706                            retry.initial_delay,
707                            retry.multiplier,
708                            retry.max_delay,
709                        );
710                        if retry.jitter_factor > 0.0 {
711                            builder = builder.with_jitter(retry.jitter_factor);
712                        }
713                    }
714
715                    if let Some(uri) = spec.handled_by {
716                        builder = builder.handled_by(uri);
717                    }
718
719                    config = builder.build();
720                }
721
722                Some(config)
723            }
724        };
725
726        let definition = RouteDefinition::new(self.from_uri, self.steps);
727        let definition = if let Some(eh) = resolved_error_handler {
728            definition.with_error_handler(eh)
729        } else {
730            definition
731        };
732        let definition = if let Some(cb) = self.circuit_breaker_config {
733            definition.with_circuit_breaker(cb)
734        } else {
735            definition
736        };
737        let definition = if let Some(sp) = self.security_policy_config {
738            definition.with_security_policy(sp)
739        } else {
740            definition
741        };
742        let definition = if let Some(auth) = self.security_authenticator {
743            definition.with_security_authenticator(auth)
744        } else {
745            definition
746        };
747        let definition = if let Some(concurrency) = self.concurrency {
748            definition.with_concurrency(concurrency)
749        } else {
750            definition
751        };
752        let definition = definition.with_route_id(route_id);
753        let definition = if let Some(auto) = self.auto_startup {
754            definition.with_auto_startup(auto)
755        } else {
756            definition
757        };
758        let definition = if let Some(order) = self.startup_order {
759            definition.with_startup_order(order)
760        } else {
761            definition
762        };
763        Ok(definition)
764    }
765
766    /// Compile this builder route into canonical spec.
767    pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
768        validate_uri(&self.from_uri)?;
769        let route_id = self
770            .route_id
771            .filter(|s| !s.trim().is_empty())
772            .ok_or_else(|| {
773                CamelError::RouteError(
774                    "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
775                        .to_string(),
776                )
777            })?;
778
779        let steps = canonicalize_steps(self.steps)?;
780        let circuit_breaker = self
781            .circuit_breaker_config
782            .map(canonicalize_circuit_breaker);
783
784        if self.security_policy_config.is_some() {
785            return Err(CamelError::RouteError(
786                "routes with security_policy cannot use the canonical/hot-reload path (not yet supported)"
787                    .into(),
788            ));
789        }
790
791        let spec = CanonicalRouteSpec {
792            route_id,
793            from: self.from_uri,
794            steps,
795            circuit_breaker,
796            auto_startup: None,
797            startup_order: None,
798            concurrency: None,
799            version: camel_api::CANONICAL_CONTRACT_VERSION,
800        };
801        spec.validate_contract()?;
802        Ok(spec)
803    }
804}
805
806pub struct OnExceptionBuilder {
807    parent: RouteBuilder,
808    policy: OnExceptionSpec,
809}
810
811impl OnExceptionBuilder {
812    pub fn retry(mut self, max_attempts: u32) -> Self {
813        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
814        self
815    }
816
817    pub fn with_backoff(
818        mut self,
819        initial: std::time::Duration,
820        multiplier: f64,
821        max: std::time::Duration,
822    ) -> Self {
823        if let Some(ref mut retry) = self.policy.retry {
824            retry.initial_delay = initial;
825            retry.multiplier = multiplier;
826            retry.max_delay = max;
827        } else {
828            tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
829        }
830        self
831    }
832
833    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
834        if let Some(ref mut retry) = self.policy.retry {
835            retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
836        } else {
837            tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
838        }
839        self
840    }
841
842    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
843        self.policy.handled_by = Some(uri.into());
844        self
845    }
846
847    pub fn end_on_exception(mut self) -> RouteBuilder {
848        if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
849            specs.push(self.policy);
850        }
851        self.parent
852    }
853}
854
855/// Validate that a URI is non-empty and contains a scheme component.
856fn validate_uri(uri: &str) -> Result<(), CamelError> {
857    let trimmed = uri.trim();
858    if trimmed.is_empty() {
859        return Err(CamelError::RouteError(
860            "route must have a 'from' URI".to_string(),
861        ));
862    }
863    if !trimmed.contains(':') {
864        return Err(CamelError::RouteError(
865            "URI must have a scheme (e.g. 'timer:tick')".to_string(),
866        ));
867    }
868    let scheme = trimmed.split(':').next().unwrap_or("");
869    if scheme.trim().is_empty() {
870        return Err(CamelError::RouteError(
871            "URI scheme must not be empty".to_string(),
872        ));
873    }
874    Ok(())
875}
876
877fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
878    let mut canonical = Vec::with_capacity(steps.len());
879    for step in steps {
880        canonical.push(canonicalize_step(step)?);
881    }
882    Ok(canonical)
883}
884
885fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
886    match step {
887        BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
888        BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
889        BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
890        BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
891        BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
892            delay_ms: config.delay_ms,
893            dynamic_header: config.dynamic_header,
894        }),
895        BuilderStep::DeclarativeScript { expression } => {
896            Ok(CanonicalStepSpec::Script { expression })
897        }
898        BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
899            predicate,
900            steps: canonicalize_steps(steps)?,
901        }),
902        BuilderStep::DeclarativeChoice { whens, otherwise } => {
903            let mut canonical_whens = Vec::with_capacity(whens.len());
904            for DeclarativeWhenStep { predicate, steps } in whens {
905                canonical_whens.push(CanonicalWhenSpec {
906                    predicate,
907                    steps: canonicalize_steps(steps)?,
908                });
909            }
910            let otherwise = match otherwise {
911                Some(steps) => Some(canonicalize_steps(steps)?),
912                None => None,
913            };
914            Ok(CanonicalStepSpec::Choice {
915                whens: canonical_whens,
916                otherwise,
917            })
918        }
919        BuilderStep::DeclarativeSplit {
920            expression,
921            aggregation,
922            parallel,
923            parallel_limit,
924            stop_on_exception,
925            steps,
926        } => Ok(CanonicalStepSpec::Split {
927            expression: CanonicalSplitExpressionSpec::Language(expression),
928            aggregation: canonicalize_split_aggregation(aggregation)?,
929            parallel,
930            parallel_limit,
931            stop_on_exception,
932            steps: canonicalize_steps(steps)?,
933        }),
934        BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate(
935            canonicalize_aggregate(config)?,
936        )),
937        other => {
938            let step_name = canonical_step_name(&other);
939            let detail = camel_api::canonical_contract_rejection_reason(step_name)
940                .unwrap_or("not included in canonical v1");
941            Err(CamelError::RouteError(format!(
942                "canonical v1 does not support step `{step_name}`: {detail}"
943            )))
944        }
945    }
946}
947
948fn canonicalize_split_aggregation(
949    strategy: camel_api::splitter::AggregationStrategy,
950) -> Result<CanonicalSplitAggregationSpec, CamelError> {
951    match strategy {
952        camel_api::splitter::AggregationStrategy::LastWins => {
953            Ok(CanonicalSplitAggregationSpec::LastWins)
954        }
955        camel_api::splitter::AggregationStrategy::CollectAll => {
956            Ok(CanonicalSplitAggregationSpec::CollectAll)
957        }
958        camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
959            "canonical v1 does not support custom split aggregation".to_string(),
960        )),
961        camel_api::splitter::AggregationStrategy::Original => {
962            Ok(CanonicalSplitAggregationSpec::Original)
963        }
964    }
965}
966
967fn extract_completion_fields(
968    mode: &CompletionMode,
969) -> Result<(Option<usize>, Option<u64>), CamelError> {
970    match mode {
971        CompletionMode::Single(cond) => match cond {
972            CompletionCondition::Size(n) => Ok((Some(*n), None)),
973            CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
974            CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
975                "canonical v1 does not support aggregate predicate completion".to_string(),
976            )),
977        },
978        CompletionMode::Any(conds) => {
979            let mut size = None;
980            let mut timeout_ms = None;
981            for cond in conds {
982                match cond {
983                    CompletionCondition::Size(n) => size = Some(*n),
984                    CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
985                    CompletionCondition::Predicate(_) => {
986                        return Err(CamelError::RouteError(
987                            "canonical v1 does not support aggregate predicate completion"
988                                .to_string(),
989                        ));
990                    }
991                }
992            }
993            Ok((size, timeout_ms))
994        }
995    }
996}
997
998fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
999    let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
1000
1001    let header = match &config.correlation {
1002        CorrelationStrategy::HeaderName(h) => h.clone(),
1003        CorrelationStrategy::Expression { expr, .. } => expr.clone(),
1004        CorrelationStrategy::Fn(_) => {
1005            return Err(CamelError::RouteError(
1006                "canonical v1 does not support Fn correlation strategy".to_string(),
1007            ));
1008        }
1009    };
1010
1011    let correlation_key = match &config.correlation {
1012        CorrelationStrategy::HeaderName(_) => None,
1013        CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
1014        CorrelationStrategy::Fn(_) => unreachable!(),
1015    };
1016
1017    let strategy = match config.strategy {
1018        AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
1019        AggregationStrategy::Custom(_) => {
1020            return Err(CamelError::RouteError(
1021                "canonical v1 does not support custom aggregate strategy".to_string(),
1022            ));
1023        }
1024    };
1025    let bucket_ttl_ms = config
1026        .bucket_ttl
1027        .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
1028
1029    Ok(CanonicalAggregateSpec {
1030        header,
1031        completion_size,
1032        completion_timeout_ms,
1033        correlation_key,
1034        force_completion_on_stop: if config.force_completion_on_stop {
1035            Some(true)
1036        } else {
1037            None
1038        },
1039        discard_on_timeout: if config.discard_on_timeout {
1040            Some(true)
1041        } else {
1042            None
1043        },
1044        strategy,
1045        max_buckets: config.max_buckets,
1046        bucket_ttl_ms,
1047    })
1048}
1049
1050fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
1051    CanonicalCircuitBreakerSpec {
1052        failure_threshold: config.failure_threshold,
1053        open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
1054    }
1055}
1056
1057fn canonical_step_name(step: &BuilderStep) -> &'static str {
1058    match step {
1059        BuilderStep::Processor(_) => "processor",
1060        BuilderStep::To(_) => "to",
1061        BuilderStep::Stop => "stop",
1062        BuilderStep::Log { .. } => "log",
1063        BuilderStep::DeclarativeSetHeader { .. } => "set_header",
1064        BuilderStep::DeclarativeSetBody { .. } => "set_body",
1065        BuilderStep::DeclarativeFilter { .. } => "filter",
1066        BuilderStep::DeclarativeChoice { .. } => "choice",
1067        BuilderStep::DeclarativeScript { .. } => "script",
1068        BuilderStep::DeclarativeFunction { .. } => "function",
1069        BuilderStep::DeclarativeSplit { .. } => "split",
1070        BuilderStep::Split { .. } => "split",
1071        BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
1072        BuilderStep::Aggregate { .. } => "aggregate",
1073        BuilderStep::Filter { .. } => "filter",
1074        BuilderStep::Choice { .. } => "choice",
1075        BuilderStep::WireTap { .. } => "wire_tap",
1076        BuilderStep::Delay { .. } => "delay",
1077        BuilderStep::Multicast { .. } => "multicast",
1078        BuilderStep::DeclarativeLog { .. } => "log",
1079        BuilderStep::Bean { .. } => "bean",
1080        BuilderStep::Script { .. } => "script",
1081        BuilderStep::Throttle { .. } => "throttle",
1082        BuilderStep::LoadBalance { .. } => "load_balancer",
1083        BuilderStep::DynamicRouter { .. } => "dynamic_router",
1084        BuilderStep::RoutingSlip { .. } => "routing_slip",
1085        BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
1086        BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
1087        BuilderStep::RecipientList { .. } => "recipient_list",
1088        BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
1089        BuilderStep::DeclarativeSetProperty { .. } => "set_property",
1090        BuilderStep::DeclarativeStreamSplit { .. } => "stream_split",
1091        BuilderStep::Enrich { .. } => "enrich",
1092        BuilderStep::PollEnrich { .. } => "poll_enrich",
1093    }
1094}
1095
1096impl StepAccumulator for RouteBuilder {
1097    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1098        &mut self.steps
1099    }
1100}
1101
1102/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
1103///
1104/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1105/// but NOT `.build()` and NOT `.split()` (no nested splits).
1106///
1107/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
1108/// and returns the parent `RouteBuilder`.
1109pub struct SplitBuilder {
1110    parent: RouteBuilder,
1111    config: SplitterConfig,
1112    steps: Vec<BuilderStep>,
1113}
1114
1115impl SplitBuilder {
1116    /// Open a filter scope within the split sub-pipeline.
1117    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1118    where
1119        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1120    {
1121        FilterInSplitBuilder {
1122            parent: self,
1123            predicate: std::sync::Arc::new(predicate),
1124            steps: vec![],
1125        }
1126    }
1127
1128    /// Close the split scope. Packages the accumulated sub-steps into a
1129    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
1130    pub fn end_split(mut self) -> RouteBuilder {
1131        let split_step = BuilderStep::Split {
1132            config: self.config,
1133            steps: self.steps,
1134        };
1135        self.parent.steps.push(split_step);
1136        self.parent
1137    }
1138}
1139
1140impl StepAccumulator for SplitBuilder {
1141    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1142        &mut self.steps
1143    }
1144}
1145
1146/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
1147pub struct FilterBuilder {
1148    parent: RouteBuilder,
1149    predicate: FilterPredicate,
1150    steps: Vec<BuilderStep>,
1151}
1152
1153impl FilterBuilder {
1154    /// Close the filter scope. Packages the accumulated sub-steps into a
1155    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
1156    pub fn end_filter(mut self) -> RouteBuilder {
1157        let step = BuilderStep::Filter {
1158            predicate: self.predicate,
1159            steps: self.steps,
1160        };
1161        self.parent.steps.push(step);
1162        self.parent
1163    }
1164}
1165
1166impl StepAccumulator for FilterBuilder {
1167    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1168        &mut self.steps
1169    }
1170}
1171
1172/// Builder for a filter scope nested inside a `.split()` block.
1173pub struct FilterInSplitBuilder {
1174    parent: SplitBuilder,
1175    predicate: FilterPredicate,
1176    steps: Vec<BuilderStep>,
1177}
1178
1179impl FilterInSplitBuilder {
1180    /// Close the filter scope and return the parent `SplitBuilder`.
1181    pub fn end_filter(mut self) -> SplitBuilder {
1182        let step = BuilderStep::Filter {
1183            predicate: self.predicate,
1184            steps: self.steps,
1185        };
1186        self.parent.steps.push(step);
1187        self.parent
1188    }
1189}
1190
1191impl StepAccumulator for FilterInSplitBuilder {
1192    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1193        &mut self.steps
1194    }
1195}
1196
1197// ── Choice/When/Otherwise builders ─────────────────────────────────────────
1198
1199/// Builder for a `.choice()` ... `.end_choice()` block.
1200///
1201/// Accumulates `when` clauses and an optional `otherwise` clause.
1202/// Cannot call `.build()` until `.end_choice()` is called.
1203pub struct ChoiceBuilder {
1204    parent: RouteBuilder,
1205    whens: Vec<WhenStep>,
1206    _otherwise: Option<Vec<BuilderStep>>,
1207}
1208
1209impl ChoiceBuilder {
1210    /// Open a `when` clause. Only exchanges matching `predicate` will be
1211    /// processed by the steps inside the `.when()` ... `.end_when()` scope.
1212    pub fn when<F>(self, predicate: F) -> WhenBuilder
1213    where
1214        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1215    {
1216        WhenBuilder {
1217            parent: self,
1218            predicate: std::sync::Arc::new(predicate),
1219            steps: vec![],
1220        }
1221    }
1222
1223    /// Open an `otherwise` clause. Executed when no `when` predicate matched.
1224    ///
1225    /// Only one `otherwise` is allowed per `choice`. Call this after all `.when()` clauses.
1226    pub fn otherwise(self) -> OtherwiseBuilder {
1227        OtherwiseBuilder {
1228            parent: self,
1229            steps: vec![],
1230        }
1231    }
1232
1233    /// Close the choice scope. Packages all accumulated `when` clauses and
1234    /// optional `otherwise` into a `BuilderStep::Choice` and returns the
1235    /// parent `RouteBuilder`.
1236    pub fn end_choice(mut self) -> RouteBuilder {
1237        let step = BuilderStep::Choice {
1238            whens: self.whens,
1239            otherwise: self._otherwise,
1240        };
1241        self.parent.steps.push(step);
1242        self.parent
1243    }
1244}
1245
1246/// Builder for the sub-pipeline within a `.when()` ... `.end_when()` block.
1247pub struct WhenBuilder {
1248    parent: ChoiceBuilder,
1249    predicate: camel_api::FilterPredicate,
1250    steps: Vec<BuilderStep>,
1251}
1252
1253impl WhenBuilder {
1254    /// Close the when scope. Packages the accumulated sub-steps into a
1255    /// `WhenStep` and returns the parent `ChoiceBuilder`.
1256    pub fn end_when(mut self) -> ChoiceBuilder {
1257        self.parent.whens.push(WhenStep {
1258            predicate: self.predicate,
1259            steps: self.steps,
1260        });
1261        self.parent
1262    }
1263}
1264
1265impl StepAccumulator for WhenBuilder {
1266    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1267        &mut self.steps
1268    }
1269}
1270
1271/// Builder for the sub-pipeline within an `.otherwise()` ... `.end_otherwise()` block.
1272pub struct OtherwiseBuilder {
1273    parent: ChoiceBuilder,
1274    steps: Vec<BuilderStep>,
1275}
1276
1277impl OtherwiseBuilder {
1278    /// Close the otherwise scope and return the parent `ChoiceBuilder`.
1279    pub fn end_otherwise(self) -> ChoiceBuilder {
1280        let OtherwiseBuilder { mut parent, steps } = self;
1281        parent._otherwise = Some(steps);
1282        parent
1283    }
1284}
1285
1286impl StepAccumulator for OtherwiseBuilder {
1287    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1288        &mut self.steps
1289    }
1290}
1291
1292/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
1293///
1294/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1295/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
1296///
1297/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
1298/// and returns the parent `RouteBuilder`.
1299pub struct MulticastBuilder {
1300    parent: RouteBuilder,
1301    steps: Vec<BuilderStep>,
1302    config: MulticastConfig,
1303}
1304
1305impl MulticastBuilder {
1306    pub fn parallel(mut self, parallel: bool) -> Self {
1307        self.config = self.config.parallel(parallel);
1308        self
1309    }
1310
1311    pub fn parallel_limit(mut self, limit: usize) -> Self {
1312        self.config = self.config.parallel_limit(limit);
1313        self
1314    }
1315
1316    pub fn stop_on_exception(mut self, stop: bool) -> Self {
1317        self.config = self.config.stop_on_exception(stop);
1318        self
1319    }
1320
1321    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1322        self.config = self.config.timeout(duration);
1323        self
1324    }
1325
1326    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1327        self.config = self.config.aggregation(strategy);
1328        self
1329    }
1330
1331    pub fn end_multicast(mut self) -> RouteBuilder {
1332        let step = BuilderStep::Multicast {
1333            steps: self.steps,
1334            config: self.config,
1335        };
1336        self.parent.steps.push(step);
1337        self.parent
1338    }
1339}
1340
1341impl StepAccumulator for MulticastBuilder {
1342    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1343        &mut self.steps
1344    }
1345}
1346
1347/// Builder for the sub-pipeline within a `.throttle()` ... `.end_throttle()` block.
1348///
1349/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1350/// but NOT `.build()` and NOT `.throttle()` (no nested throttles).
1351///
1352/// Calling `.end_throttle()` packages the sub-steps into a `BuilderStep::Throttle`
1353/// and returns the parent `RouteBuilder`.
1354pub struct ThrottleBuilder {
1355    parent: RouteBuilder,
1356    config: ThrottlerConfig,
1357    steps: Vec<BuilderStep>,
1358}
1359
1360impl ThrottleBuilder {
1361    /// Set the throttle strategy. Default is `Delay`.
1362    ///
1363    /// - `Delay`: Queue messages until capacity available
1364    /// - `Reject`: Return error immediately when throttled
1365    /// - `Drop`: Silently discard excess messages
1366    pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1367        self.config = self.config.strategy(strategy);
1368        self
1369    }
1370
1371    /// Close the throttle scope. Packages the accumulated sub-steps into a
1372    /// `BuilderStep::Throttle` and returns the parent `RouteBuilder`.
1373    pub fn end_throttle(mut self) -> RouteBuilder {
1374        let step = BuilderStep::Throttle {
1375            config: self.config,
1376            steps: self.steps,
1377        };
1378        self.parent.steps.push(step);
1379        self.parent
1380    }
1381}
1382
1383impl StepAccumulator for ThrottleBuilder {
1384    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1385        &mut self.steps
1386    }
1387}
1388
1389/// Builder for the sub-pipeline within a `.loop_count()` / `.loop_while()` ... `.end_loop()` block.
1390pub struct LoopBuilder {
1391    parent: RouteBuilder,
1392    config: LoopConfig,
1393    steps: Vec<BuilderStep>,
1394}
1395
1396impl LoopBuilder {
1397    pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1398        LoopInLoopBuilder {
1399            parent: self,
1400            config: LoopConfig {
1401                mode: LoopMode::Count(count),
1402            },
1403            steps: vec![],
1404        }
1405    }
1406
1407    pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1408    where
1409        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1410    {
1411        LoopInLoopBuilder {
1412            parent: self,
1413            config: LoopConfig {
1414                mode: LoopMode::While(std::sync::Arc::new(predicate)),
1415            },
1416            steps: vec![],
1417        }
1418    }
1419
1420    pub fn end_loop(mut self) -> RouteBuilder {
1421        let step = BuilderStep::Loop {
1422            config: self.config,
1423            steps: self.steps,
1424        };
1425        self.parent.steps.push(step);
1426        self.parent
1427    }
1428}
1429
1430impl StepAccumulator for LoopBuilder {
1431    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1432        &mut self.steps
1433    }
1434}
1435
1436pub struct LoopInLoopBuilder {
1437    parent: LoopBuilder,
1438    config: LoopConfig,
1439    steps: Vec<BuilderStep>,
1440}
1441
1442impl LoopInLoopBuilder {
1443    pub fn end_loop(mut self) -> LoopBuilder {
1444        let step = BuilderStep::Loop {
1445            config: self.config,
1446            steps: self.steps,
1447        };
1448        self.parent.steps.push(step);
1449        self.parent
1450    }
1451}
1452
1453impl StepAccumulator for LoopInLoopBuilder {
1454    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1455        &mut self.steps
1456    }
1457}
1458
1459/// Builder for the sub-pipeline within a `.load_balance()` ... `.end_load_balance()` block.
1460///
1461/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1462/// but NOT `.build()` and NOT `.load_balance()` (no nested load balancers).
1463///
1464/// Calling `.end_load_balance()` packages the sub-steps into a `BuilderStep::LoadBalance`
1465/// and returns the parent `RouteBuilder`.
1466pub struct LoadBalancerBuilder {
1467    parent: RouteBuilder,
1468    config: LoadBalancerConfig,
1469    steps: Vec<BuilderStep>,
1470}
1471
1472impl LoadBalancerBuilder {
1473    /// Set the load balance strategy to round-robin (default).
1474    pub fn round_robin(mut self) -> Self {
1475        self.config = LoadBalancerConfig::round_robin();
1476        self
1477    }
1478
1479    /// Set the load balance strategy to random selection.
1480    pub fn random(mut self) -> Self {
1481        self.config = LoadBalancerConfig::random();
1482        self
1483    }
1484
1485    /// Set the load balance strategy to weighted selection.
1486    ///
1487    /// Each endpoint is assigned a weight that determines its probability
1488    /// of being selected.
1489    pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1490        self.config = LoadBalancerConfig::weighted(weights);
1491        self
1492    }
1493
1494    /// Set the load balance strategy to failover.
1495    ///
1496    /// Exchanges are sent to the first endpoint; on failure, the next endpoint
1497    /// is tried.
1498    pub fn failover(mut self) -> Self {
1499        self.config = LoadBalancerConfig::failover();
1500        self
1501    }
1502
1503    /// Enable or disable parallel execution of endpoints.
1504    ///
1505    /// When enabled, all endpoints receive the exchange simultaneously.
1506    /// When disabled (default), only one endpoint is selected per exchange.
1507    pub fn parallel(mut self, parallel: bool) -> Self {
1508        self.config = self.config.parallel(parallel);
1509        self
1510    }
1511
1512    /// Close the load balance scope. Packages the accumulated sub-steps into a
1513    /// `BuilderStep::LoadBalance` and returns the parent `RouteBuilder`.
1514    pub fn end_load_balance(mut self) -> RouteBuilder {
1515        let step = BuilderStep::LoadBalance {
1516            config: self.config,
1517            steps: self.steps,
1518        };
1519        self.parent.steps.push(step);
1520        self.parent
1521    }
1522}
1523
1524impl StepAccumulator for LoadBalancerBuilder {
1525    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1526        &mut self.steps
1527    }
1528}
1529
1530// ---------------------------------------------------------------------------
1531// Tests
1532// ---------------------------------------------------------------------------
1533
1534#[cfg(test)]
1535mod tests {
1536    use super::*;
1537    use camel_api::error_handler::ErrorHandlerConfig;
1538    use camel_api::load_balancer::LoadBalanceStrategy;
1539    use camel_api::{Exchange, Message};
1540    use camel_core::route::BuilderStep;
1541    use std::sync::Arc;
1542    use std::time::Duration;
1543    use tower::{Service, ServiceExt};
1544
1545    #[test]
1546    fn test_builder_from_creates_definition() {
1547        let definition = RouteBuilder::from("timer:tick")
1548            .route_id("test-route")
1549            .build()
1550            .unwrap();
1551        assert_eq!(definition.from_uri(), "timer:tick");
1552    }
1553
1554    #[test]
1555    fn test_builder_empty_from_uri_errors() {
1556        let result = RouteBuilder::from("").route_id("test-route").build();
1557        assert!(result.is_err());
1558    }
1559
1560    #[test]
1561    fn test_build_rejects_schemeless_uri() {
1562        let result = RouteBuilder::from("no-scheme-here")
1563            .route_id("test-route")
1564            .build();
1565        match result {
1566            Err(err) => {
1567                let err_msg = format!("{err}");
1568                assert!(
1569                    err_msg.contains("scheme"),
1570                    "expected scheme-related error, got: {err_msg}"
1571                );
1572            }
1573            Ok(_) => panic!("schemeless URI should fail"),
1574        }
1575    }
1576
1577    #[test]
1578    fn test_build_rejects_empty_scheme_uri() {
1579        let result = RouteBuilder::from(":missing-scheme")
1580            .route_id("test-route")
1581            .build();
1582        match result {
1583            Err(err) => {
1584                let err_msg = format!("{err}");
1585                assert!(
1586                    err_msg.contains("scheme"),
1587                    "expected scheme-related error, got: {err_msg}"
1588                );
1589            }
1590            Ok(_) => panic!("empty-scheme URI should fail"),
1591        }
1592    }
1593
1594    #[test]
1595    fn test_build_accepts_valid_uri() {
1596        let result = RouteBuilder::from("timer:tick")
1597            .route_id("test-route")
1598            .build();
1599        assert!(result.is_ok());
1600    }
1601
1602    #[test]
1603    fn test_build_canonical_rejects_schemeless_uri() {
1604        let result = RouteBuilder::from("no-scheme-here")
1605            .route_id("test-route")
1606            .build_canonical();
1607        assert!(result.is_err());
1608    }
1609
1610    #[test]
1611    fn test_builder_to_adds_step() {
1612        let definition = RouteBuilder::from("timer:tick")
1613            .route_id("test-route")
1614            .to("log:info")
1615            .build()
1616            .unwrap();
1617
1618        assert_eq!(definition.from_uri(), "timer:tick");
1619        // We can verify steps were added by checking the structure
1620        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1621    }
1622
1623    #[test]
1624    fn test_builder_filter_adds_filter_step() {
1625        let definition = RouteBuilder::from("timer:tick")
1626            .route_id("test-route")
1627            .filter(|_ex| true)
1628            .to("mock:result")
1629            .end_filter()
1630            .build()
1631            .unwrap();
1632
1633        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1634    }
1635
1636    #[test]
1637    fn test_builder_set_header_adds_processor_step() {
1638        let definition = RouteBuilder::from("timer:tick")
1639            .route_id("test-route")
1640            .set_header("key", Value::String("value".into()))
1641            .build()
1642            .unwrap();
1643
1644        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1645    }
1646
1647    #[test]
1648    fn test_builder_map_body_adds_processor_step() {
1649        let definition = RouteBuilder::from("timer:tick")
1650            .route_id("test-route")
1651            .map_body(|body| body)
1652            .build()
1653            .unwrap();
1654
1655        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1656    }
1657
1658    #[test]
1659    fn test_builder_process_adds_processor_step() {
1660        let definition = RouteBuilder::from("timer:tick")
1661            .route_id("test-route")
1662            .process(|ex| async move { Ok(ex) })
1663            .build()
1664            .unwrap();
1665
1666        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1667    }
1668
1669    #[test]
1670    fn test_builder_chain_multiple_steps() {
1671        let definition = RouteBuilder::from("timer:tick")
1672            .route_id("test-route")
1673            .set_header("source", Value::String("timer".into()))
1674            .filter(|ex| ex.input.header("source").is_some())
1675            .to("log:info")
1676            .end_filter()
1677            .to("mock:result")
1678            .build()
1679            .unwrap();
1680
1681        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
1682        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
1683        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
1684        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1685    }
1686
1687    #[test]
1688    fn test_loop_count_builder() {
1689        use camel_api::loop_eip::LoopMode;
1690
1691        let def = RouteBuilder::from("direct:start")
1692            .route_id("loop-test")
1693            .loop_count(3)
1694            .to("mock:inside")
1695            .end_loop()
1696            .to("mock:after")
1697            .build()
1698            .unwrap();
1699
1700        assert_eq!(def.steps().len(), 2);
1701        match &def.steps()[0] {
1702            BuilderStep::Loop { config, steps } => {
1703                assert!(matches!(config.mode, LoopMode::Count(3)));
1704                assert_eq!(steps.len(), 1);
1705            }
1706            other => panic!("Expected Loop, got {:?}", other),
1707        }
1708        assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1709    }
1710
1711    #[test]
1712    fn test_loop_while_builder() {
1713        use camel_api::loop_eip::LoopMode;
1714
1715        let def = RouteBuilder::from("direct:start")
1716            .route_id("loop-while-test")
1717            .loop_while(|_ex| true)
1718            .to("mock:retry")
1719            .end_loop()
1720            .build()
1721            .unwrap();
1722
1723        assert_eq!(def.steps().len(), 1);
1724        match &def.steps()[0] {
1725            BuilderStep::Loop { config, steps } => {
1726                assert!(matches!(config.mode, LoopMode::While(_)));
1727                assert_eq!(steps.len(), 1);
1728            }
1729            other => panic!("Expected Loop, got {:?}", other),
1730        }
1731    }
1732
1733    #[test]
1734    fn test_nested_loop_builder() {
1735        use camel_api::loop_eip::LoopMode;
1736
1737        let def = RouteBuilder::from("direct:start")
1738            .route_id("nested-loop-test")
1739            .loop_count(2)
1740            .to("mock:outer")
1741            .loop_count(3)
1742            .to("mock:inner")
1743            .end_loop()
1744            .end_loop()
1745            .to("mock:after")
1746            .build()
1747            .unwrap();
1748
1749        assert_eq!(def.steps().len(), 2);
1750        match &def.steps()[0] {
1751            BuilderStep::Loop { steps, .. } => {
1752                assert_eq!(steps.len(), 2);
1753                match &steps[1] {
1754                    BuilderStep::Loop {
1755                        config,
1756                        steps: inner_steps,
1757                    } => {
1758                        assert!(matches!(config.mode, LoopMode::Count(3)));
1759                        assert_eq!(inner_steps.len(), 1);
1760                    }
1761                    other => panic!("Expected nested Loop, got {:?}", other),
1762                }
1763            }
1764            other => panic!("Expected outer Loop, got {:?}", other),
1765        }
1766    }
1767
1768    // -----------------------------------------------------------------------
1769    // Processor behavior tests — exercise the real Tower services directly
1770    // -----------------------------------------------------------------------
1771
1772    #[tokio::test]
1773    async fn test_set_header_processor_works() {
1774        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1775        let exchange = Exchange::new(Message::new("test"));
1776        let result = svc.call(exchange).await.unwrap();
1777        assert_eq!(
1778            result.input.header("greeting"),
1779            Some(&Value::String("hello".into()))
1780        );
1781    }
1782
1783    #[tokio::test]
1784    async fn test_filter_processor_passes() {
1785        use camel_api::BoxProcessorExt;
1786        use camel_processor::FilterService;
1787
1788        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1789        let mut svc =
1790            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1791        let exchange = Exchange::new(Message::new("pass"));
1792        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1793        assert_eq!(result.input.body.as_text(), Some("pass"));
1794    }
1795
1796    #[tokio::test]
1797    async fn test_filter_processor_blocks() {
1798        use camel_api::BoxProcessorExt;
1799        use camel_processor::FilterService;
1800
1801        let sub = BoxProcessor::from_fn(|_ex| {
1802            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1803        });
1804        let mut svc =
1805            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1806        let exchange = Exchange::new(Message::new("reject"));
1807        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1808        assert_eq!(result.input.body.as_text(), Some("reject"));
1809    }
1810
1811    #[tokio::test]
1812    async fn test_map_body_processor_works() {
1813        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1814            if let Some(text) = body.as_text() {
1815                Body::Text(text.to_uppercase())
1816            } else {
1817                body
1818            }
1819        });
1820        let exchange = Exchange::new(Message::new("hello"));
1821        let result = mapper.oneshot(exchange).await.unwrap();
1822        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1823    }
1824
1825    #[tokio::test]
1826    async fn test_process_custom_processor_works() {
1827        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1828            ex.set_property("custom", Value::Bool(true));
1829            Ok(ex)
1830        });
1831        let exchange = Exchange::new(Message::default());
1832        let result = processor.oneshot(exchange).await.unwrap();
1833        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1834    }
1835
1836    // -----------------------------------------------------------------------
1837    // Sequential pipeline test
1838    // -----------------------------------------------------------------------
1839
1840    #[tokio::test]
1841    async fn test_compose_pipeline_runs_steps_in_order() {
1842        use camel_core::route::compose_pipeline;
1843
1844        let processors = vec![
1845            BoxProcessor::new(SetHeader::new(
1846                IdentityProcessor,
1847                "step",
1848                Value::String("one".into()),
1849            )),
1850            BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1851                if let Some(text) = body.as_text() {
1852                    Body::Text(format!("{}-processed", text))
1853                } else {
1854                    body
1855                }
1856            })),
1857        ];
1858
1859        let pipeline = compose_pipeline(processors);
1860        let exchange = Exchange::new(Message::new("hello"));
1861        let result = pipeline.oneshot(exchange).await.unwrap();
1862
1863        assert_eq!(
1864            result.input.header("step"),
1865            Some(&Value::String("one".into()))
1866        );
1867        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1868    }
1869
1870    #[tokio::test]
1871    async fn test_compose_pipeline_empty_is_identity() {
1872        use camel_core::route::compose_pipeline;
1873
1874        let pipeline = compose_pipeline(vec![]);
1875        let exchange = Exchange::new(Message::new("unchanged"));
1876        let result = pipeline.oneshot(exchange).await.unwrap();
1877        assert_eq!(result.input.body.as_text(), Some("unchanged"));
1878    }
1879
1880    // -----------------------------------------------------------------------
1881    // Circuit breaker builder tests
1882    // -----------------------------------------------------------------------
1883
1884    #[test]
1885    fn test_builder_circuit_breaker_sets_config() {
1886        use camel_api::circuit_breaker::CircuitBreakerConfig;
1887
1888        let config = CircuitBreakerConfig::new().failure_threshold(5);
1889        let definition = RouteBuilder::from("timer:tick")
1890            .route_id("test-route")
1891            .circuit_breaker(config)
1892            .build()
1893            .unwrap();
1894
1895        let cb = definition
1896            .circuit_breaker_config()
1897            .expect("circuit breaker should be set");
1898        assert_eq!(cb.failure_threshold, 5);
1899    }
1900
1901    #[test]
1902    fn test_builder_circuit_breaker_with_error_handler() {
1903        use camel_api::circuit_breaker::CircuitBreakerConfig;
1904        use camel_api::error_handler::ErrorHandlerConfig;
1905
1906        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1907        let eh_config = ErrorHandlerConfig::log_only();
1908
1909        let definition = RouteBuilder::from("timer:tick")
1910            .route_id("test-route")
1911            .to("log:info")
1912            .circuit_breaker(cb_config)
1913            .error_handler(eh_config)
1914            .build()
1915            .unwrap();
1916
1917        assert!(
1918            definition.circuit_breaker_config().is_some(),
1919            "circuit breaker config should be set"
1920        );
1921        // Route definition was built successfully with both configs.
1922    }
1923
1924    #[test]
1925    fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1926        let definition = RouteBuilder::from("direct:start")
1927            .route_id("test-route")
1928            .dead_letter_channel("log:dlc")
1929            .on_exception(|e| matches!(e, CamelError::Io(_)))
1930            .retry(3)
1931            .handled_by("log:io")
1932            .end_on_exception()
1933            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1934            .retry(1)
1935            .end_on_exception()
1936            .to("mock:out")
1937            .build()
1938            .expect("route should build");
1939
1940        let cfg = definition
1941            .error_handler_config()
1942            .expect("error handler should be set");
1943        assert_eq!(cfg.policies.len(), 2);
1944        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1945        assert_eq!(
1946            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1947            Some(3)
1948        );
1949        assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1950        assert_eq!(
1951            cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1952            Some(1)
1953        );
1954    }
1955
1956    #[test]
1957    fn test_builder_on_exception_mixed_mode_rejected() {
1958        let result = RouteBuilder::from("direct:start")
1959            .route_id("test-route")
1960            .error_handler(ErrorHandlerConfig::log_only())
1961            .on_exception(|_e| true)
1962            .end_on_exception()
1963            .to("mock:out")
1964            .build();
1965
1966        let err = result.err().expect("mixed mode should fail with an error");
1967
1968        assert!(
1969            format!("{err}").contains("mixed error handler modes"),
1970            "unexpected error: {err}"
1971        );
1972    }
1973
1974    #[test]
1975    fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1976        let definition = RouteBuilder::from("direct:start")
1977            .route_id("test-route")
1978            .on_exception(|_e| true)
1979            .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1980            .with_jitter(0.5)
1981            .end_on_exception()
1982            .to("mock:out")
1983            .build()
1984            .expect("route should build");
1985
1986        let cfg = definition
1987            .error_handler_config()
1988            .expect("error handler should be set");
1989        assert_eq!(cfg.policies.len(), 1);
1990        assert!(cfg.policies[0].retry.is_none());
1991    }
1992
1993    #[test]
1994    fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1995        let definition = RouteBuilder::from("direct:start")
1996            .route_id("test-route")
1997            .dead_letter_channel("log:dlc")
1998            .to("mock:out")
1999            .build()
2000            .expect("route should build");
2001
2002        let cfg = definition
2003            .error_handler_config()
2004            .expect("error handler should be set");
2005        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
2006        assert!(cfg.policies.is_empty());
2007    }
2008
2009    #[test]
2010    fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
2011        let definition = RouteBuilder::from("direct:start")
2012            .route_id("test-route")
2013            .dead_letter_channel("log:first")
2014            .on_exception(|e| matches!(e, CamelError::Io(_)))
2015            .retry(2)
2016            .end_on_exception()
2017            .dead_letter_channel("log:second")
2018            .to("mock:out")
2019            .build()
2020            .expect("route should build");
2021
2022        let cfg = definition
2023            .error_handler_config()
2024            .expect("error handler should be set");
2025        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2026        assert_eq!(cfg.policies.len(), 1);
2027        assert_eq!(
2028            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
2029            Some(2)
2030        );
2031    }
2032
2033    #[test]
2034    fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
2035        let definition = RouteBuilder::from("direct:start")
2036            .route_id("test-route")
2037            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
2038            .retry(1)
2039            .end_on_exception()
2040            .to("mock:out")
2041            .build()
2042            .expect("route should build");
2043
2044        let cfg = definition
2045            .error_handler_config()
2046            .expect("error handler should be set");
2047        assert!(cfg.dlc_uri.is_none());
2048        assert_eq!(cfg.policies.len(), 1);
2049    }
2050
2051    #[test]
2052    fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
2053        let first = ErrorHandlerConfig::dead_letter_channel("log:first");
2054        let second = ErrorHandlerConfig::dead_letter_channel("log:second");
2055
2056        let definition = RouteBuilder::from("direct:start")
2057            .route_id("test-route")
2058            .error_handler(first)
2059            .error_handler(second)
2060            .to("mock:out")
2061            .build()
2062            .expect("route should build");
2063
2064        let cfg = definition
2065            .error_handler_config()
2066            .expect("error handler should be set");
2067        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2068    }
2069
2070    // --- Splitter builder tests ---
2071
2072    #[test]
2073    fn test_split_builder_typestate() {
2074        use camel_api::splitter::{SplitterConfig, split_body_lines};
2075
2076        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
2077        let definition = RouteBuilder::from("timer:test?period=1000")
2078            .route_id("test-route")
2079            .split(SplitterConfig::new(split_body_lines()))
2080            .to("mock:per-fragment")
2081            .end_split()
2082            .to("mock:final")
2083            .build()
2084            .unwrap();
2085
2086        // Should have 2 top-level steps: Split + To("mock:final")
2087        assert_eq!(definition.steps().len(), 2);
2088    }
2089
2090    #[test]
2091    fn test_split_builder_steps_collected() {
2092        use camel_api::splitter::{SplitterConfig, split_body_lines};
2093
2094        let definition = RouteBuilder::from("timer:test?period=1000")
2095            .route_id("test-route")
2096            .split(SplitterConfig::new(split_body_lines()))
2097            .set_header("fragment", Value::String("yes".into()))
2098            .to("mock:per-fragment")
2099            .end_split()
2100            .build()
2101            .unwrap();
2102
2103        // Should have 1 top-level step: Split (containing 2 sub-steps)
2104        assert_eq!(definition.steps().len(), 1);
2105        match &definition.steps()[0] {
2106            BuilderStep::Split { steps, .. } => {
2107                assert_eq!(steps.len(), 2); // SetHeader + To
2108            }
2109            other => panic!("Expected Split, got {:?}", other),
2110        }
2111    }
2112
2113    #[test]
2114    fn test_split_builder_config_propagated() {
2115        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
2116
2117        let definition = RouteBuilder::from("timer:test?period=1000")
2118            .route_id("test-route")
2119            .split(
2120                SplitterConfig::new(split_body_lines())
2121                    .parallel(true)
2122                    .parallel_limit(4)
2123                    .aggregation(AggregationStrategy::CollectAll),
2124            )
2125            .to("mock:per-fragment")
2126            .end_split()
2127            .build()
2128            .unwrap();
2129
2130        match &definition.steps()[0] {
2131            BuilderStep::Split { config, .. } => {
2132                assert!(config.parallel);
2133                assert_eq!(config.parallel_limit, Some(4));
2134                assert!(matches!(
2135                    config.aggregation,
2136                    AggregationStrategy::CollectAll
2137                ));
2138            }
2139            other => panic!("Expected Split, got {:?}", other),
2140        }
2141    }
2142
2143    #[test]
2144    fn test_aggregate_builder_adds_step() {
2145        use camel_api::aggregator::AggregatorConfig;
2146        use camel_core::route::BuilderStep;
2147
2148        let definition = RouteBuilder::from("timer:tick")
2149            .route_id("test-route")
2150            .aggregate(
2151                AggregatorConfig::correlate_by("key")
2152                    .complete_when_size(2)
2153                    .build()
2154                    .unwrap(),
2155            )
2156            .build()
2157            .unwrap();
2158
2159        assert_eq!(definition.steps().len(), 1);
2160        assert!(matches!(
2161            definition.steps()[0],
2162            BuilderStep::Aggregate { .. }
2163        ));
2164    }
2165
2166    #[test]
2167    fn test_aggregate_in_split_builder() {
2168        use camel_api::aggregator::AggregatorConfig;
2169        use camel_api::splitter::{SplitterConfig, split_body_lines};
2170        use camel_core::route::BuilderStep;
2171
2172        let definition = RouteBuilder::from("timer:tick")
2173            .route_id("test-route")
2174            .split(SplitterConfig::new(split_body_lines()))
2175            .aggregate(
2176                AggregatorConfig::correlate_by("key")
2177                    .complete_when_size(1)
2178                    .build()
2179                    .unwrap(),
2180            )
2181            .end_split()
2182            .build()
2183            .unwrap();
2184
2185        assert_eq!(definition.steps().len(), 1);
2186        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2187            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2188        } else {
2189            panic!("expected Split step");
2190        }
2191    }
2192
2193    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
2194
2195    #[test]
2196    fn test_builder_set_body_static_adds_processor() {
2197        let definition = RouteBuilder::from("timer:tick")
2198            .route_id("test-route")
2199            .set_body("fixed")
2200            .build()
2201            .unwrap();
2202        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2203    }
2204
2205    #[test]
2206    fn test_builder_set_body_fn_adds_processor() {
2207        let definition = RouteBuilder::from("timer:tick")
2208            .route_id("test-route")
2209            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2210            .build()
2211            .unwrap();
2212        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2213    }
2214
2215    #[test]
2216    fn transform_alias_produces_same_as_set_body() {
2217        let route_transform = RouteBuilder::from("timer:tick")
2218            .route_id("test-route")
2219            .transform("hello")
2220            .build()
2221            .unwrap();
2222
2223        let route_set_body = RouteBuilder::from("timer:tick")
2224            .route_id("test-route")
2225            .set_body("hello")
2226            .build()
2227            .unwrap();
2228
2229        assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2230    }
2231
2232    #[test]
2233    fn test_builder_set_header_fn_adds_processor() {
2234        let definition = RouteBuilder::from("timer:tick")
2235            .route_id("test-route")
2236            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2237            .build()
2238            .unwrap();
2239        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2240    }
2241
2242    #[tokio::test]
2243    async fn test_set_body_static_processor_works() {
2244        use camel_core::route::compose_pipeline;
2245        let def = RouteBuilder::from("t:t")
2246            .route_id("test-route")
2247            .set_body("replaced")
2248            .build()
2249            .unwrap();
2250        let pipeline = compose_pipeline(
2251            def.steps()
2252                .iter()
2253                .filter_map(|s| {
2254                    if let BuilderStep::Processor(p) = s {
2255                        Some(p.clone())
2256                    } else {
2257                        None
2258                    }
2259                })
2260                .collect(),
2261        );
2262        let exchange = Exchange::new(Message::new("original"));
2263        let result = pipeline.oneshot(exchange).await.unwrap();
2264        assert_eq!(result.input.body.as_text(), Some("replaced"));
2265    }
2266
2267    #[tokio::test]
2268    async fn test_set_body_fn_processor_works() {
2269        use camel_core::route::compose_pipeline;
2270        let def = RouteBuilder::from("t:t")
2271            .route_id("test-route")
2272            .set_body_fn(|ex: &Exchange| {
2273                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2274            })
2275            .build()
2276            .unwrap();
2277        let pipeline = compose_pipeline(
2278            def.steps()
2279                .iter()
2280                .filter_map(|s| {
2281                    if let BuilderStep::Processor(p) = s {
2282                        Some(p.clone())
2283                    } else {
2284                        None
2285                    }
2286                })
2287                .collect(),
2288        );
2289        let exchange = Exchange::new(Message::new("hello"));
2290        let result = pipeline.oneshot(exchange).await.unwrap();
2291        assert_eq!(result.input.body.as_text(), Some("HELLO"));
2292    }
2293
2294    #[tokio::test]
2295    async fn test_set_header_fn_processor_works() {
2296        use camel_core::route::compose_pipeline;
2297        let def = RouteBuilder::from("t:t")
2298            .route_id("test-route")
2299            .set_header_fn("echo", |ex: &Exchange| {
2300                ex.input
2301                    .body
2302                    .as_text()
2303                    .map(|t| Value::String(t.into()))
2304                    .unwrap_or(Value::Null)
2305            })
2306            .build()
2307            .unwrap();
2308        let pipeline = compose_pipeline(
2309            def.steps()
2310                .iter()
2311                .filter_map(|s| {
2312                    if let BuilderStep::Processor(p) = s {
2313                        Some(p.clone())
2314                    } else {
2315                        None
2316                    }
2317                })
2318                .collect(),
2319        );
2320        let exchange = Exchange::new(Message::new("ping"));
2321        let result = pipeline.oneshot(exchange).await.unwrap();
2322        assert_eq!(
2323            result.input.header("echo"),
2324            Some(&Value::String("ping".into()))
2325        );
2326    }
2327
2328    // ── FilterBuilder typestate tests ─────────────────────────────────────
2329
2330    #[test]
2331    fn test_filter_builder_typestate() {
2332        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2333            .route_id("test-route")
2334            .filter(|_ex| true)
2335            .to("mock:inner")
2336            .end_filter()
2337            .to("mock:outer")
2338            .build();
2339        assert!(result.is_ok());
2340    }
2341
2342    #[test]
2343    fn test_filter_builder_steps_collected() {
2344        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2345            .route_id("test-route")
2346            .filter(|_ex| true)
2347            .to("mock:inner")
2348            .end_filter()
2349            .build()
2350            .unwrap();
2351
2352        assert_eq!(definition.steps().len(), 1);
2353        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2354    }
2355
2356    #[test]
2357    fn test_wire_tap_builder_adds_step() {
2358        let definition = RouteBuilder::from("timer:tick")
2359            .route_id("test-route")
2360            .wire_tap("mock:tap")
2361            .to("mock:result")
2362            .build()
2363            .unwrap();
2364
2365        assert_eq!(definition.steps().len(), 2);
2366        assert!(
2367            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2368        );
2369        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2370    }
2371
2372    // ── MulticastBuilder typestate tests ─────────────────────────────────────
2373
2374    #[test]
2375    fn test_multicast_builder_typestate() {
2376        let definition = RouteBuilder::from("timer:tick")
2377            .route_id("test-route")
2378            .multicast()
2379            .to("direct:a")
2380            .to("direct:b")
2381            .end_multicast()
2382            .to("mock:result")
2383            .build()
2384            .unwrap();
2385
2386        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
2387    }
2388
2389    #[test]
2390    fn test_multicast_builder_steps_collected() {
2391        let definition = RouteBuilder::from("timer:tick")
2392            .route_id("test-route")
2393            .multicast()
2394            .to("direct:a")
2395            .to("direct:b")
2396            .end_multicast()
2397            .build()
2398            .unwrap();
2399
2400        match &definition.steps()[0] {
2401            BuilderStep::Multicast { steps, .. } => {
2402                assert_eq!(steps.len(), 2);
2403            }
2404            other => panic!("Expected Multicast, got {:?}", other),
2405        }
2406    }
2407
2408    // ── Concurrency builder tests ─────────────────────────────────────
2409
2410    #[test]
2411    fn test_builder_concurrent_sets_concurrency() {
2412        use camel_component_api::ConcurrencyModel;
2413
2414        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2415            .route_id("test-route")
2416            .concurrent(16)
2417            .to("log:info")
2418            .build()
2419            .unwrap();
2420
2421        assert_eq!(
2422            definition.concurrency_override(),
2423            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2424        );
2425    }
2426
2427    #[test]
2428    fn test_builder_concurrent_zero_means_unbounded() {
2429        use camel_component_api::ConcurrencyModel;
2430
2431        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2432            .route_id("test-route")
2433            .concurrent(0)
2434            .to("log:info")
2435            .build()
2436            .unwrap();
2437
2438        assert_eq!(
2439            definition.concurrency_override(),
2440            Some(&ConcurrencyModel::Concurrent { max: None })
2441        );
2442    }
2443
2444    #[test]
2445    fn test_builder_sequential_sets_concurrency() {
2446        use camel_component_api::ConcurrencyModel;
2447
2448        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2449            .route_id("test-route")
2450            .sequential()
2451            .to("log:info")
2452            .build()
2453            .unwrap();
2454
2455        assert_eq!(
2456            definition.concurrency_override(),
2457            Some(&ConcurrencyModel::Sequential)
2458        );
2459    }
2460
2461    #[test]
2462    fn test_builder_default_concurrency_is_none() {
2463        let definition = RouteBuilder::from("timer:tick")
2464            .route_id("test-route")
2465            .to("log:info")
2466            .build()
2467            .unwrap();
2468
2469        assert_eq!(definition.concurrency_override(), None);
2470    }
2471
2472    // ── Route lifecycle builder tests ─────────────────────────────────────
2473
2474    #[test]
2475    fn test_builder_route_id_sets_id() {
2476        let definition = RouteBuilder::from("timer:tick")
2477            .route_id("my-route")
2478            .build()
2479            .unwrap();
2480
2481        assert_eq!(definition.route_id(), "my-route");
2482    }
2483
2484    #[test]
2485    fn test_build_without_route_id_fails() {
2486        let result = RouteBuilder::from("timer:tick?period=1000")
2487            .to("log:info")
2488            .build();
2489        let err = match result {
2490            Err(e) => e.to_string(),
2491            Ok(_) => panic!("build() should fail without route_id"),
2492        };
2493        assert!(
2494            err.contains("route_id"),
2495            "error should mention route_id, got: {}",
2496            err
2497        );
2498    }
2499
2500    #[test]
2501    fn test_builder_empty_route_id_rejected() {
2502        let result = RouteBuilder::from("timer:tick").route_id("").build();
2503        let err = result.err().expect("empty route_id should be rejected");
2504        assert!(matches!(err, CamelError::RouteError(_)));
2505    }
2506
2507    #[test]
2508    fn test_builder_whitespace_route_id_rejected() {
2509        let result = RouteBuilder::from("timer:tick").route_id("   ").build();
2510        assert!(result.is_err());
2511    }
2512
2513    #[test]
2514    fn test_builder_auto_startup_false() {
2515        let definition = RouteBuilder::from("timer:tick")
2516            .route_id("test-route")
2517            .auto_startup(false)
2518            .build()
2519            .unwrap();
2520
2521        assert!(!definition.auto_startup());
2522    }
2523
2524    #[test]
2525    fn test_builder_startup_order_custom() {
2526        let definition = RouteBuilder::from("timer:tick")
2527            .route_id("test-route")
2528            .startup_order(50)
2529            .build()
2530            .unwrap();
2531
2532        assert_eq!(definition.startup_order(), 50);
2533    }
2534
2535    #[test]
2536    fn test_builder_defaults() {
2537        let definition = RouteBuilder::from("timer:tick")
2538            .route_id("test-route")
2539            .build()
2540            .unwrap();
2541
2542        assert_eq!(definition.route_id(), "test-route");
2543        assert!(definition.auto_startup());
2544        assert_eq!(definition.startup_order(), 1000);
2545    }
2546
2547    // ── Choice typestate tests ──────────────────────────────────────────────────
2548
2549    #[test]
2550    fn test_choice_builder_single_when() {
2551        let definition = RouteBuilder::from("timer:tick")
2552            .route_id("test-route")
2553            .choice()
2554            .when(|ex: &Exchange| ex.input.header("type").is_some())
2555            .to("mock:typed")
2556            .end_when()
2557            .end_choice()
2558            .build()
2559            .unwrap();
2560        assert_eq!(definition.steps().len(), 1);
2561        assert!(
2562            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2563            if whens.len() == 1 && otherwise.is_none())
2564        );
2565    }
2566
2567    #[test]
2568    fn test_choice_builder_when_otherwise() {
2569        let definition = RouteBuilder::from("timer:tick")
2570            .route_id("test-route")
2571            .choice()
2572            .when(|ex: &Exchange| ex.input.header("a").is_some())
2573            .to("mock:a")
2574            .end_when()
2575            .otherwise()
2576            .to("mock:fallback")
2577            .end_otherwise()
2578            .end_choice()
2579            .build()
2580            .unwrap();
2581        assert!(
2582            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2583            if whens.len() == 1 && otherwise.is_some())
2584        );
2585    }
2586
2587    #[test]
2588    fn test_choice_builder_multiple_whens() {
2589        let definition = RouteBuilder::from("timer:tick")
2590            .route_id("test-route")
2591            .choice()
2592            .when(|ex: &Exchange| ex.input.header("a").is_some())
2593            .to("mock:a")
2594            .end_when()
2595            .when(|ex: &Exchange| ex.input.header("b").is_some())
2596            .to("mock:b")
2597            .end_when()
2598            .end_choice()
2599            .build()
2600            .unwrap();
2601        assert!(
2602            matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2603            if whens.len() == 2)
2604        );
2605    }
2606
2607    #[test]
2608    fn test_choice_step_after_choice() {
2609        // Steps after end_choice() are added to the outer pipeline, not inside choice.
2610        let definition = RouteBuilder::from("timer:tick")
2611            .route_id("test-route")
2612            .choice()
2613            .when(|_ex: &Exchange| true)
2614            .to("mock:inner")
2615            .end_when()
2616            .end_choice()
2617            .to("mock:outer") // must be step[1], not inside choice
2618            .build()
2619            .unwrap();
2620        assert_eq!(definition.steps().len(), 2);
2621        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2622    }
2623
2624    // ── Throttle typestate tests ──────────────────────────────────────────────────
2625
2626    #[test]
2627    fn test_throttle_builder_typestate() {
2628        let definition = RouteBuilder::from("timer:tick")
2629            .route_id("test-route")
2630            .throttle(10, std::time::Duration::from_secs(1))
2631            .to("mock:result")
2632            .end_throttle()
2633            .build()
2634            .unwrap();
2635
2636        assert_eq!(definition.steps().len(), 1);
2637        assert!(matches!(
2638            &definition.steps()[0],
2639            BuilderStep::Throttle { .. }
2640        ));
2641    }
2642
2643    #[test]
2644    fn test_throttle_builder_with_strategy() {
2645        let definition = RouteBuilder::from("timer:tick")
2646            .route_id("test-route")
2647            .throttle(10, std::time::Duration::from_secs(1))
2648            .strategy(ThrottleStrategy::Reject)
2649            .to("mock:result")
2650            .end_throttle()
2651            .build()
2652            .unwrap();
2653
2654        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2655            assert_eq!(config.strategy, ThrottleStrategy::Reject);
2656        } else {
2657            panic!("Expected Throttle step");
2658        }
2659    }
2660
2661    #[test]
2662    fn test_throttle_builder_steps_collected() {
2663        let definition = RouteBuilder::from("timer:tick")
2664            .route_id("test-route")
2665            .throttle(5, std::time::Duration::from_secs(1))
2666            .set_header("throttled", Value::Bool(true))
2667            .to("mock:throttled")
2668            .end_throttle()
2669            .build()
2670            .unwrap();
2671
2672        match &definition.steps()[0] {
2673            BuilderStep::Throttle { steps, .. } => {
2674                assert_eq!(steps.len(), 2); // SetHeader + To
2675            }
2676            other => panic!("Expected Throttle, got {:?}", other),
2677        }
2678    }
2679
2680    #[test]
2681    fn test_throttle_step_after_throttle() {
2682        // Steps after end_throttle() are added to the outer pipeline, not inside throttle.
2683        let definition = RouteBuilder::from("timer:tick")
2684            .route_id("test-route")
2685            .throttle(10, std::time::Duration::from_secs(1))
2686            .to("mock:inner")
2687            .end_throttle()
2688            .to("mock:outer")
2689            .build()
2690            .unwrap();
2691
2692        assert_eq!(definition.steps().len(), 2);
2693        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2694    }
2695
2696    // ── LoadBalance typestate tests ──────────────────────────────────────────────────
2697
2698    #[test]
2699    fn test_load_balance_builder_typestate() {
2700        let definition = RouteBuilder::from("timer:tick")
2701            .route_id("test-route")
2702            .load_balance()
2703            .round_robin()
2704            .to("mock:a")
2705            .to("mock:b")
2706            .end_load_balance()
2707            .build()
2708            .unwrap();
2709
2710        assert_eq!(definition.steps().len(), 1);
2711        assert!(matches!(
2712            &definition.steps()[0],
2713            BuilderStep::LoadBalance { .. }
2714        ));
2715    }
2716
2717    #[test]
2718    fn test_load_balance_builder_with_strategy() {
2719        let definition = RouteBuilder::from("timer:tick")
2720            .route_id("test-route")
2721            .load_balance()
2722            .random()
2723            .to("mock:result")
2724            .end_load_balance()
2725            .build()
2726            .unwrap();
2727
2728        if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2729            assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2730        } else {
2731            panic!("Expected LoadBalance step");
2732        }
2733    }
2734
2735    #[test]
2736    fn test_load_balance_builder_steps_collected() {
2737        let definition = RouteBuilder::from("timer:tick")
2738            .route_id("test-route")
2739            .load_balance()
2740            .set_header("lb", Value::Bool(true))
2741            .to("mock:a")
2742            .end_load_balance()
2743            .build()
2744            .unwrap();
2745
2746        match &definition.steps()[0] {
2747            BuilderStep::LoadBalance { steps, .. } => {
2748                assert_eq!(steps.len(), 2); // SetHeader + To
2749            }
2750            other => panic!("Expected LoadBalance, got {:?}", other),
2751        }
2752    }
2753
2754    #[test]
2755    fn test_load_balance_step_after_load_balance() {
2756        // Steps after end_load_balance() are added to the outer pipeline, not inside load_balance.
2757        let definition = RouteBuilder::from("timer:tick")
2758            .route_id("test-route")
2759            .load_balance()
2760            .to("mock:inner")
2761            .end_load_balance()
2762            .to("mock:outer")
2763            .build()
2764            .unwrap();
2765
2766        assert_eq!(definition.steps().len(), 2);
2767        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2768    }
2769
2770    // ── DynamicRouter typestate tests ──────────────────────────────────────────────────
2771
2772    #[test]
2773    fn test_dynamic_router_builder() {
2774        let definition = RouteBuilder::from("timer:tick")
2775            .route_id("test-route")
2776            .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2777            .build()
2778            .unwrap();
2779
2780        assert_eq!(definition.steps().len(), 1);
2781        assert!(matches!(
2782            &definition.steps()[0],
2783            BuilderStep::DynamicRouter { .. }
2784        ));
2785    }
2786
2787    #[test]
2788    fn test_dynamic_router_builder_with_config() {
2789        let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2790            .max_iterations(100)
2791            .cache_size(500);
2792
2793        let definition = RouteBuilder::from("timer:tick")
2794            .route_id("test-route")
2795            .dynamic_router_with_config(config)
2796            .build()
2797            .unwrap();
2798
2799        assert_eq!(definition.steps().len(), 1);
2800        if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2801            assert_eq!(config.max_iterations, 100);
2802            assert_eq!(config.cache_size, 500);
2803        } else {
2804            panic!("Expected DynamicRouter step");
2805        }
2806    }
2807
2808    #[test]
2809    fn test_dynamic_router_step_after_router() {
2810        // Steps after dynamic_router() are added to the outer pipeline.
2811        let definition = RouteBuilder::from("timer:tick")
2812            .route_id("test-route")
2813            .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2814            .to("mock:outer")
2815            .build()
2816            .unwrap();
2817
2818        assert_eq!(definition.steps().len(), 2);
2819        assert!(matches!(
2820            &definition.steps()[0],
2821            BuilderStep::DynamicRouter { .. }
2822        ));
2823        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2824    }
2825
2826    #[test]
2827    fn routing_slip_builder_creates_step() {
2828        use camel_api::RoutingSlipExpression;
2829
2830        let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2831
2832        let route = RouteBuilder::from("direct:start")
2833            .route_id("routing-slip-test")
2834            .routing_slip(expression)
2835            .build()
2836            .unwrap();
2837
2838        assert!(
2839            matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2840            "Expected RoutingSlip step"
2841        );
2842    }
2843
2844    #[test]
2845    fn routing_slip_with_config_builder_creates_step() {
2846        use camel_api::RoutingSlipConfig;
2847
2848        let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2849            .uri_delimiter("|")
2850            .cache_size(50)
2851            .ignore_invalid_endpoints(true);
2852
2853        let route = RouteBuilder::from("direct:start")
2854            .route_id("routing-slip-config-test")
2855            .routing_slip_with_config(config)
2856            .build()
2857            .unwrap();
2858
2859        if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2860            assert_eq!(config.uri_delimiter, "|");
2861            assert_eq!(config.cache_size, 50);
2862            assert!(config.ignore_invalid_endpoints);
2863        } else {
2864            panic!("Expected RoutingSlip step");
2865        }
2866    }
2867
2868    #[test]
2869    fn test_builder_marshal_adds_processor_step() {
2870        let definition = RouteBuilder::from("timer:tick")
2871            .route_id("test-route")
2872            .marshal("json")
2873            .unwrap()
2874            .build()
2875            .unwrap();
2876        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2877    }
2878
2879    #[test]
2880    fn test_builder_unmarshal_adds_processor_step() {
2881        let definition = RouteBuilder::from("timer:tick")
2882            .route_id("test-route")
2883            .unmarshal("json")
2884            .unwrap()
2885            .build()
2886            .unwrap();
2887        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2888    }
2889
2890    #[test]
2891    fn test_builder_stream_cache_adds_processor_step() {
2892        let definition = RouteBuilder::from("timer:tick")
2893            .route_id("test-route")
2894            .stream_cache(1024)
2895            .build()
2896            .unwrap();
2897        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2898    }
2899
2900    #[test]
2901    fn validate_adds_to_step_with_validator_uri() {
2902        let def = RouteBuilder::from("direct:in")
2903            .route_id("test")
2904            .validate("schemas/order.xsd")
2905            .build()
2906            .unwrap();
2907        let steps = def.steps();
2908        assert_eq!(steps.len(), 1);
2909        assert!(
2910            matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2911            "got: {:?}",
2912            steps[0]
2913        );
2914    }
2915
2916    #[test]
2917    fn test_builder_marshal_returns_err_for_unknown_format() {
2918        let result = RouteBuilder::from("timer:tick")
2919            .route_id("test-route")
2920            .marshal("csv");
2921        let err = match result {
2922            Err(e) => e,
2923            Ok(_) => panic!("marshal with unknown format should return Err"),
2924        };
2925        let msg = err.to_string();
2926        assert!(
2927            msg.contains("unknown data format"),
2928            "error should mention unknown format, got: {msg}"
2929        );
2930        assert!(
2931            msg.contains("csv"),
2932            "error should mention format name, got: {msg}"
2933        );
2934    }
2935
2936    #[test]
2937    fn test_builder_unmarshal_returns_err_for_unknown_format() {
2938        let result = RouteBuilder::from("timer:tick")
2939            .route_id("test-route")
2940            .unmarshal("csv");
2941        let err = match result {
2942            Err(e) => e,
2943            Ok(_) => panic!("unmarshal with unknown format should return Err"),
2944        };
2945        let msg = err.to_string();
2946        assert!(
2947            msg.contains("unknown data format"),
2948            "error should mention unknown format, got: {msg}"
2949        );
2950        assert!(
2951            msg.contains("csv"),
2952            "error should mention format name, got: {msg}"
2953        );
2954    }
2955
2956    #[test]
2957    fn test_builder_recipient_list_creates_step() {
2958        let route = RouteBuilder::from("direct:start")
2959            .route_id("recipient-list-test")
2960            .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2961            .build()
2962            .unwrap();
2963
2964        assert!(matches!(
2965            &route.steps()[0],
2966            BuilderStep::RecipientList { .. }
2967        ));
2968    }
2969
2970    #[test]
2971    fn test_builder_recipient_list_with_config_creates_step() {
2972        let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2973
2974        let route = RouteBuilder::from("direct:start")
2975            .route_id("recipient-list-config-test")
2976            .recipient_list_with_config(config)
2977            .build()
2978            .unwrap();
2979
2980        assert!(matches!(
2981            &route.steps()[0],
2982            BuilderStep::RecipientList { .. }
2983        ));
2984    }
2985
2986    #[test]
2987    fn test_builder_script_adds_script_step() {
2988        let route = RouteBuilder::from("direct:start")
2989            .route_id("script-test")
2990            .script("rhai", "headers[\"x\"] = \"y\"")
2991            .build()
2992            .unwrap();
2993
2994        assert!(matches!(
2995            &route.steps()[0],
2996            BuilderStep::Script { language, script }
2997            if language == "rhai" && script == "headers[\"x\"] = \"y\""
2998        ));
2999    }
3000
3001    #[test]
3002    fn test_builder_delay_and_delay_with_header_add_steps() {
3003        let route = RouteBuilder::from("direct:start")
3004            .route_id("delay-test")
3005            .delay(Duration::from_millis(250))
3006            .delay_with_header(Duration::from_millis(500), "x-delay")
3007            .build()
3008            .unwrap();
3009
3010        assert_eq!(route.steps().len(), 2);
3011        assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
3012        assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
3013    }
3014
3015    #[test]
3016    fn test_builder_log_and_stop_add_steps_in_order() {
3017        let route = RouteBuilder::from("direct:start")
3018            .route_id("log-stop-test")
3019            .log("hello", LogLevel::Info)
3020            .stop()
3021            .to("mock:after")
3022            .build()
3023            .unwrap();
3024
3025        assert_eq!(route.steps().len(), 3);
3026        assert!(matches!(
3027            &route.steps()[0],
3028            BuilderStep::Log { message, .. } if message == "hello"
3029        ));
3030        assert!(matches!(&route.steps()[1], BuilderStep::Stop));
3031        assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
3032    }
3033
3034    #[test]
3035    fn test_builder_stream_cache_default_adds_processor_step() {
3036        let route = RouteBuilder::from("direct:start")
3037            .route_id("stream-cache-default-test")
3038            .stream_cache_default()
3039            .build()
3040            .unwrap();
3041
3042        assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
3043    }
3044
3045    #[test]
3046    fn test_validate_preserves_existing_validator_prefix() {
3047        let route = RouteBuilder::from("direct:in")
3048            .route_id("validate-prefix-test")
3049            .validate("validator:schemas/order.xsd")
3050            .build()
3051            .unwrap();
3052
3053        assert!(matches!(
3054            &route.steps()[0],
3055            BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
3056        ));
3057    }
3058
3059    #[test]
3060    fn test_load_balance_builder_weighted_failover_parallel_config() {
3061        let route = RouteBuilder::from("direct:start")
3062            .route_id("lb-weighted-failover-parallel")
3063            .load_balance()
3064            .weighted(vec![
3065                ("direct:a".to_string(), 3),
3066                ("direct:b".to_string(), 1),
3067            ])
3068            .failover()
3069            .parallel(true)
3070            .to("mock:result")
3071            .end_load_balance()
3072            .build()
3073            .unwrap();
3074
3075        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3076            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3077            assert!(config.parallel);
3078        } else {
3079            panic!("Expected LoadBalance step");
3080        }
3081    }
3082
3083    #[test]
3084    fn test_multicast_builder_all_config_setters() {
3085        let route = RouteBuilder::from("direct:start")
3086            .route_id("multicast-config-test")
3087            .multicast()
3088            .parallel(true)
3089            .parallel_limit(4)
3090            .stop_on_exception(true)
3091            .timeout(Duration::from_millis(300))
3092            .aggregation(MulticastStrategy::Original)
3093            .to("mock:a")
3094            .end_multicast()
3095            .build()
3096            .unwrap();
3097
3098        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3099            assert!(config.parallel);
3100            assert_eq!(config.parallel_limit, Some(4));
3101            assert!(config.stop_on_exception);
3102            assert_eq!(config.timeout, Some(Duration::from_millis(300)));
3103            assert!(matches!(config.aggregation, MulticastStrategy::Original));
3104        } else {
3105            panic!("Expected Multicast step");
3106        }
3107    }
3108
3109    #[test]
3110    fn test_build_canonical_rejects_unsupported_processor_step() {
3111        let err = RouteBuilder::from("direct:start")
3112            .route_id("canonical-reject")
3113            .set_header("k", Value::String("v".into()))
3114            .build_canonical()
3115            .unwrap_err();
3116
3117        assert!(format!("{err}").contains("does not support step `processor`"));
3118    }
3119
3120    // ── LoadBalance strategy-specific tests ─────────────────────────────────────
3121
3122    #[test]
3123    fn test_load_balance_builder_weighted_strategy() {
3124        let route = RouteBuilder::from("direct:start")
3125            .route_id("lb-weighted")
3126            .load_balance()
3127            .weighted(vec![
3128                ("direct:a".to_string(), 5),
3129                ("direct:b".to_string(), 2),
3130                ("direct:c".to_string(), 1),
3131            ])
3132            .to("mock:result")
3133            .end_load_balance()
3134            .build()
3135            .unwrap();
3136
3137        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3138            assert!(matches!(config.strategy, LoadBalanceStrategy::Weighted(_)));
3139        } else {
3140            panic!("Expected LoadBalance step");
3141        }
3142    }
3143
3144    #[test]
3145    fn test_load_balance_builder_failover_strategy() {
3146        let route = RouteBuilder::from("direct:start")
3147            .route_id("lb-failover")
3148            .load_balance()
3149            .failover()
3150            .to("mock:primary")
3151            .end_load_balance()
3152            .build()
3153            .unwrap();
3154
3155        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3156            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3157            assert!(!config.parallel);
3158        } else {
3159            panic!("Expected LoadBalance step");
3160        }
3161    }
3162
3163    #[test]
3164    fn test_load_balance_builder_parallel_false_explicit() {
3165        let route = RouteBuilder::from("direct:start")
3166            .route_id("lb-parallel-false")
3167            .load_balance()
3168            .round_robin()
3169            .parallel(false)
3170            .to("mock:result")
3171            .end_load_balance()
3172            .build()
3173            .unwrap();
3174
3175        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3176            assert!(!config.parallel);
3177        } else {
3178            panic!("Expected LoadBalance step");
3179        }
3180    }
3181
3182    // ── FilterInSplitBuilder tests ──────────────────────────────────────────────
3183
3184    #[test]
3185    fn test_filter_in_split_builder_typestate() {
3186        use camel_api::splitter::{SplitterConfig, split_body_lines};
3187
3188        let definition = RouteBuilder::from("timer:test")
3189            .route_id("filter-in-split")
3190            .split(SplitterConfig::new(split_body_lines()))
3191            .filter(|_ex| true)
3192            .to("mock:filtered")
3193            .end_filter()
3194            .end_split()
3195            .build()
3196            .unwrap();
3197
3198        assert_eq!(definition.steps().len(), 1);
3199        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3200            assert_eq!(steps.len(), 1);
3201            assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3202        } else {
3203            panic!("Expected Split step");
3204        }
3205    }
3206
3207    #[test]
3208    fn test_filter_in_split_builder_multiple_steps() {
3209        use camel_api::splitter::{SplitterConfig, split_body_lines};
3210
3211        let definition = RouteBuilder::from("timer:test")
3212            .route_id("filter-in-split-multi")
3213            .split(SplitterConfig::new(split_body_lines()))
3214            .to("mock:before-filter")
3215            .filter(|_ex| true)
3216            .to("mock:inside-filter")
3217            .end_filter()
3218            .to("mock:after-filter")
3219            .end_split()
3220            .build()
3221            .unwrap();
3222
3223        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3224            // To("before-filter") + Filter{...} + To("after-filter") = 3
3225            assert_eq!(steps.len(), 3);
3226        } else {
3227            panic!("Expected Split step");
3228        }
3229    }
3230
3231    // ── build_canonical tests ───────────────────────────────────────────────────
3232
3233    #[test]
3234    fn test_build_canonical_with_circuit_breaker() {
3235        use camel_api::circuit_breaker::CircuitBreakerConfig;
3236
3237        let spec = RouteBuilder::from("direct:start")
3238            .route_id("canonical-cb")
3239            .circuit_breaker(CircuitBreakerConfig::new().failure_threshold(10))
3240            .to("mock:result")
3241            .build_canonical()
3242            .unwrap();
3243
3244        let cb = spec.circuit_breaker.expect("circuit breaker should be set");
3245        assert_eq!(cb.failure_threshold, 10);
3246    }
3247
3248    #[test]
3249    fn test_build_canonical_rejects_custom_split_aggregation() {
3250        use camel_api::splitter::{SplitterConfig, split_body_lines};
3251
3252        let err = RouteBuilder::from("direct:start")
3253            .route_id("canonical-custom-split")
3254            .split(SplitterConfig::new(split_body_lines()).aggregation(
3255                camel_api::splitter::AggregationStrategy::Custom(Arc::new(|_, ex| ex)),
3256            ))
3257            .to("mock:frag")
3258            .end_split()
3259            .build_canonical()
3260            .unwrap_err();
3261
3262        // Split with closure-based expression is rejected in canonical v1.
3263        assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3264    }
3265
3266    #[test]
3267    fn test_build_canonical_rejects_custom_aggregate_strategy() {
3268        let err = RouteBuilder::from("direct:start")
3269            .route_id("canonical-custom-agg")
3270            .aggregate(
3271                AggregatorConfig::correlate_by("key")
3272                    .complete_when_size(2)
3273                    .strategy(AggregationStrategy::Custom(Arc::new(|_, ex| ex)))
3274                    .build()
3275                    .unwrap(),
3276            )
3277            .build_canonical()
3278            .unwrap_err();
3279
3280        assert!(format!("{err}").contains("custom aggregate strategy"));
3281    }
3282
3283    #[test]
3284    fn test_build_canonical_rejects_fn_correlation_strategy() {
3285        let err = RouteBuilder::from("direct:start")
3286            .route_id("canonical-fn-corr")
3287            .aggregate(AggregatorConfig {
3288                header_name: "key".to_string(),
3289                completion: CompletionMode::Single(CompletionCondition::Size(1)),
3290                correlation: CorrelationStrategy::Fn(Arc::new(|_| Some("key".to_string()))),
3291                strategy: AggregationStrategy::CollectAll,
3292                max_buckets: None,
3293                bucket_ttl: None,
3294                force_completion_on_stop: false,
3295                discard_on_timeout: false,
3296            })
3297            .build_canonical()
3298            .unwrap_err();
3299
3300        assert!(format!("{err}").contains("Fn correlation strategy"));
3301    }
3302
3303    #[test]
3304    fn test_build_canonical_rejects_predicate_completion() {
3305        let err = RouteBuilder::from("direct:start")
3306            .route_id("canonical-pred-completion")
3307            .aggregate(AggregatorConfig {
3308                header_name: "key".to_string(),
3309                completion: CompletionMode::Single(CompletionCondition::Predicate(Arc::new(
3310                    |_| false,
3311                ))),
3312                correlation: CorrelationStrategy::HeaderName("key".to_string()),
3313                strategy: AggregationStrategy::CollectAll,
3314                max_buckets: None,
3315                bucket_ttl: None,
3316                force_completion_on_stop: false,
3317                discard_on_timeout: false,
3318            })
3319            .build_canonical()
3320            .unwrap_err();
3321
3322        assert!(format!("{err}").contains("predicate completion"));
3323    }
3324
3325    #[test]
3326    fn test_build_canonical_with_expression_correlation() {
3327        let spec = RouteBuilder::from("direct:start")
3328            .route_id("canonical-expr-corr")
3329            .aggregate(AggregatorConfig {
3330                header_name: "key".to_string(),
3331                completion: CompletionMode::Single(CompletionCondition::Size(1)),
3332                correlation: CorrelationStrategy::Expression {
3333                    expr: "header.key".to_string(),
3334                    language: "simple".to_string(),
3335                },
3336                strategy: AggregationStrategy::CollectAll,
3337                max_buckets: None,
3338                bucket_ttl: None,
3339                force_completion_on_stop: false,
3340                discard_on_timeout: false,
3341            })
3342            .build_canonical()
3343            .unwrap();
3344
3345        assert!(spec.steps.iter().any(|s| matches!(s, CanonicalStepSpec::Aggregate(a) if a.correlation_key == Some("header.key".to_string()))));
3346    }
3347
3348    #[test]
3349    fn test_build_canonical_split_rejected_with_closure_expression() {
3350        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
3351
3352        // Builder-based split uses closure expressions, which are not serializable.
3353        let err = RouteBuilder::from("direct:start")
3354            .route_id("canonical-split-last")
3355            .split(
3356                SplitterConfig::new(split_body_lines()).aggregation(AggregationStrategy::LastWins),
3357            )
3358            .to("mock:frag")
3359            .end_split()
3360            .build_canonical()
3361            .unwrap_err();
3362
3363        assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3364    }
3365
3366    // ── OnExceptionBuilder full chain tests ─────────────────────────────────────
3367
3368    #[test]
3369    fn test_on_exception_full_chain_retry_backoff_jitter_handled_by() {
3370        let definition = RouteBuilder::from("direct:start")
3371            .route_id("on-exception-full")
3372            .dead_letter_channel("log:dlc")
3373            .on_exception(|e| matches!(e, CamelError::Io(_)))
3374            .retry(5)
3375            .with_backoff(Duration::from_millis(10), 2.0, Duration::from_millis(500))
3376            .with_jitter(0.3)
3377            .handled_by("log:io-handler")
3378            .end_on_exception()
3379            .to("mock:out")
3380            .build()
3381            .unwrap();
3382
3383        let cfg = definition
3384            .error_handler_config()
3385            .expect("error handler should be set");
3386        assert_eq!(cfg.policies.len(), 1);
3387        let policy = &cfg.policies[0];
3388        let retry = policy.retry.as_ref().expect("retry should be set");
3389        assert_eq!(retry.max_attempts, 5);
3390        assert_eq!(retry.initial_delay, Duration::from_millis(10));
3391        assert_eq!(retry.multiplier, 2.0);
3392        assert_eq!(retry.max_delay, Duration::from_millis(500));
3393        assert!((retry.jitter_factor - 0.3).abs() < f64::EPSILON);
3394        assert_eq!(policy.handled_by.as_deref(), Some("log:io-handler"));
3395    }
3396
3397    #[test]
3398    fn test_on_exception_jitter_clamped_to_valid_range() {
3399        let definition = RouteBuilder::from("direct:start")
3400            .route_id("jitter-clamp")
3401            .on_exception(|_e| true)
3402            .retry(1)
3403            .with_jitter(5.0)
3404            .end_on_exception()
3405            .to("mock:out")
3406            .build()
3407            .unwrap();
3408
3409        let cfg = definition.error_handler_config().unwrap();
3410        let retry = cfg.policies[0].retry.as_ref().unwrap();
3411        assert!((retry.jitter_factor - 1.0).abs() < f64::EPSILON);
3412    }
3413
3414    // ── StepAccumulator: process_fn, convert_body_to, bean ──────────────────────
3415
3416    #[test]
3417    fn test_builder_process_fn_adds_processor_step() {
3418        use camel_api::BoxProcessorExt;
3419        let processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
3420        let definition = RouteBuilder::from("timer:tick")
3421            .route_id("process-fn-test")
3422            .process_fn(processor)
3423            .build()
3424            .unwrap();
3425
3426        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3427    }
3428
3429    #[test]
3430    fn test_builder_convert_body_to_adds_processor_step() {
3431        let definition = RouteBuilder::from("timer:tick")
3432            .route_id("convert-body-test")
3433            .convert_body_to(BodyType::Json)
3434            .build()
3435            .unwrap();
3436
3437        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3438    }
3439
3440    #[test]
3441    fn test_builder_bean_adds_bean_step() {
3442        let definition = RouteBuilder::from("timer:tick")
3443            .route_id("bean-test")
3444            .bean("myBean", "process")
3445            .build()
3446            .unwrap();
3447
3448        assert!(
3449            matches!(&definition.steps()[0], BuilderStep::Bean { name, method }
3450            if name == "myBean" && method == "process")
3451        );
3452    }
3453
3454    // ── Throttle strategy-specific tests ────────────────────────────────────────
3455
3456    #[test]
3457    fn test_throttle_builder_delay_strategy() {
3458        let definition = RouteBuilder::from("timer:tick")
3459            .route_id("throttle-delay")
3460            .throttle(10, Duration::from_secs(1))
3461            .strategy(ThrottleStrategy::Delay)
3462            .to("mock:result")
3463            .end_throttle()
3464            .build()
3465            .unwrap();
3466
3467        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3468            assert_eq!(config.strategy, ThrottleStrategy::Delay);
3469        } else {
3470            panic!("Expected Throttle step");
3471        }
3472    }
3473
3474    #[test]
3475    fn test_throttle_builder_drop_strategy() {
3476        let definition = RouteBuilder::from("timer:tick")
3477            .route_id("throttle-drop")
3478            .throttle(10, Duration::from_secs(1))
3479            .strategy(ThrottleStrategy::Drop)
3480            .to("mock:result")
3481            .end_throttle()
3482            .build()
3483            .unwrap();
3484
3485        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3486            assert_eq!(config.strategy, ThrottleStrategy::Drop);
3487        } else {
3488            panic!("Expected Throttle step");
3489        }
3490    }
3491
3492    // ── LoopInLoopBuilder with loop_while ───────────────────────────────────────
3493
3494    #[test]
3495    fn test_nested_loop_while_builder() {
3496        use camel_api::loop_eip::LoopMode;
3497
3498        let def = RouteBuilder::from("direct:start")
3499            .route_id("nested-loop-while")
3500            .loop_count(2)
3501            .to("mock:outer")
3502            .loop_while(|_ex| true)
3503            .to("mock:inner")
3504            .end_loop()
3505            .end_loop()
3506            .build()
3507            .unwrap();
3508
3509        assert_eq!(def.steps().len(), 1);
3510        if let BuilderStep::Loop { steps, .. } = &def.steps()[0] {
3511            assert_eq!(steps.len(), 2);
3512            if let BuilderStep::Loop { config, .. } = &steps[1] {
3513                assert!(matches!(config.mode, LoopMode::While(_)));
3514            } else {
3515                panic!("Expected inner Loop step");
3516            }
3517        } else {
3518            panic!("Expected outer Loop step");
3519        }
3520    }
3521
3522    // ── Choice with multiple whens + otherwise ──────────────────────────────────
3523
3524    #[test]
3525    fn test_choice_builder_multiple_whens_with_otherwise() {
3526        let definition = RouteBuilder::from("timer:tick")
3527            .route_id("choice-multi-otherwise")
3528            .choice()
3529            .when(|ex: &Exchange| ex.input.header("a").is_some())
3530            .to("mock:a")
3531            .end_when()
3532            .when(|ex: &Exchange| ex.input.header("b").is_some())
3533            .to("mock:b")
3534            .end_when()
3535            .when(|ex: &Exchange| ex.input.header("c").is_some())
3536            .to("mock:c")
3537            .end_when()
3538            .otherwise()
3539            .to("mock:fallback")
3540            .end_otherwise()
3541            .end_choice()
3542            .build()
3543            .unwrap();
3544
3545        if let BuilderStep::Choice { whens, otherwise } = &definition.steps()[0] {
3546            assert_eq!(whens.len(), 3);
3547            assert!(otherwise.is_some());
3548            assert_eq!(otherwise.as_ref().unwrap().len(), 1);
3549        } else {
3550            panic!("Expected Choice step");
3551        }
3552    }
3553
3554    // ── Multicast individual config tests ───────────────────────────────────────
3555
3556    #[test]
3557    fn test_multicast_builder_parallel_only() {
3558        let route = RouteBuilder::from("direct:start")
3559            .route_id("multicast-parallel")
3560            .multicast()
3561            .parallel(true)
3562            .to("mock:a")
3563            .end_multicast()
3564            .build()
3565            .unwrap();
3566
3567        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3568            assert!(config.parallel);
3569            assert_eq!(config.parallel_limit, None);
3570        } else {
3571            panic!("Expected Multicast step");
3572        }
3573    }
3574
3575    #[test]
3576    fn test_multicast_builder_timeout_only() {
3577        let route = RouteBuilder::from("direct:start")
3578            .route_id("multicast-timeout")
3579            .multicast()
3580            .timeout(Duration::from_secs(5))
3581            .to("mock:a")
3582            .end_multicast()
3583            .build()
3584            .unwrap();
3585
3586        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3587            assert_eq!(config.timeout, Some(Duration::from_secs(5)));
3588        } else {
3589            panic!("Expected Multicast step");
3590        }
3591    }
3592
3593    #[test]
3594    fn test_multicast_builder_aggregation_collect_all() {
3595        let route = RouteBuilder::from("direct:start")
3596            .route_id("multicast-collect")
3597            .multicast()
3598            .aggregation(MulticastStrategy::CollectAll)
3599            .to("mock:a")
3600            .end_multicast()
3601            .build()
3602            .unwrap();
3603
3604        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3605            assert!(matches!(config.aggregation, MulticastStrategy::CollectAll));
3606        } else {
3607            panic!("Expected Multicast step");
3608        }
3609    }
3610
3611    // ── extract_completion_fields: Any mode with multiple conditions ────────────
3612
3613    #[test]
3614    fn test_build_canonical_aggregate_any_completion_mode() {
3615        let spec = RouteBuilder::from("direct:start")
3616            .route_id("canonical-any-completion")
3617            .aggregate(
3618                AggregatorConfig::correlate_by("key")
3619                    .complete_on_size_or_timeout(10, Duration::from_secs(30))
3620                    .build()
3621                    .unwrap(),
3622            )
3623            .build_canonical()
3624            .unwrap();
3625
3626        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3627            assert_eq!(agg.completion_size, Some(10));
3628            assert_eq!(agg.completion_timeout_ms, Some(30_000));
3629        } else {
3630            panic!("Expected Aggregate step");
3631        }
3632    }
3633
3634    #[test]
3635    fn test_build_canonical_aggregate_timeout_completion() {
3636        let spec = RouteBuilder::from("direct:start")
3637            .route_id("canonical-timeout-completion")
3638            .aggregate(
3639                AggregatorConfig::correlate_by("key")
3640                    .complete_on_timeout(Duration::from_millis(500))
3641                    .build()
3642                    .unwrap(),
3643            )
3644            .build_canonical()
3645            .unwrap();
3646
3647        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3648            assert_eq!(agg.completion_size, None);
3649            assert_eq!(agg.completion_timeout_ms, Some(500));
3650        } else {
3651            panic!("Expected Aggregate step");
3652        }
3653    }
3654
3655    // ── canonicalize_aggregate: discard_on_timeout and force_completion_on_stop ─
3656
3657    #[test]
3658    fn test_build_canonical_aggregate_discard_on_timeout() {
3659        use camel_api::aggregator::AggregatorConfig;
3660
3661        let spec = RouteBuilder::from("direct:start")
3662            .route_id("canonical-discard-timeout")
3663            .aggregate(
3664                AggregatorConfig::correlate_by("key")
3665                    .complete_when_size(1)
3666                    .discard_on_timeout(true)
3667                    .build()
3668                    .unwrap(),
3669            )
3670            .build_canonical()
3671            .unwrap();
3672
3673        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3674            assert_eq!(agg.discard_on_timeout, Some(true));
3675        } else {
3676            panic!("Expected Aggregate step");
3677        }
3678    }
3679
3680    #[test]
3681    fn test_build_canonical_aggregate_force_completion_on_stop() {
3682        use camel_api::aggregator::AggregatorConfig;
3683
3684        let spec = RouteBuilder::from("direct:start")
3685            .route_id("canonical-force-stop")
3686            .aggregate(
3687                AggregatorConfig::correlate_by("key")
3688                    .complete_when_size(1)
3689                    .force_completion_on_stop(true)
3690                    .build()
3691                    .unwrap(),
3692            )
3693            .build_canonical()
3694            .unwrap();
3695
3696        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3697            assert_eq!(agg.force_completion_on_stop, Some(true));
3698        } else {
3699            panic!("Expected Aggregate step");
3700        }
3701    }
3702
3703    // ── build_canonical: max_buckets and bucket_ttl ─────────────────────────────
3704
3705    #[test]
3706    fn test_build_canonical_aggregate_max_buckets_and_ttl() {
3707        use camel_api::aggregator::AggregatorConfig;
3708
3709        let spec = RouteBuilder::from("direct:start")
3710            .route_id("canonical-buckets-ttl")
3711            .aggregate(
3712                AggregatorConfig::correlate_by("key")
3713                    .complete_when_size(1)
3714                    .max_buckets(100)
3715                    .bucket_ttl(Duration::from_secs(60))
3716                    .build()
3717                    .unwrap(),
3718            )
3719            .build_canonical()
3720            .unwrap();
3721
3722        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3723            assert_eq!(agg.max_buckets, Some(100));
3724            assert_eq!(agg.bucket_ttl_ms, Some(60_000));
3725        } else {
3726            panic!("Expected Aggregate step");
3727        }
3728    }
3729
3730    // ── SplitBuilder with filter inside ─────────────────────────────────────────
3731
3732    #[test]
3733    fn test_split_builder_with_filter_inside() {
3734        use camel_api::splitter::{SplitterConfig, split_body_lines};
3735
3736        let definition = RouteBuilder::from("timer:test")
3737            .route_id("split-with-filter")
3738            .split(SplitterConfig::new(split_body_lines()))
3739            .filter(|_ex| true)
3740            .to("mock:filtered-frag")
3741            .end_filter()
3742            .end_split()
3743            .build()
3744            .unwrap();
3745
3746        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3747            assert_eq!(steps.len(), 1);
3748            assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3749        } else {
3750            panic!("Expected Split step");
3751        }
3752    }
3753
3754    // ── WireTap additional tests ────────────────────────────────────────────────
3755
3756    #[test]
3757    fn test_wire_tap_multiple_taps() {
3758        let definition = RouteBuilder::from("timer:tick")
3759            .route_id("multi-wire-tap")
3760            .wire_tap("mock:tap1")
3761            .wire_tap("mock:tap2")
3762            .to("mock:result")
3763            .build()
3764            .unwrap();
3765
3766        assert_eq!(definition.steps().len(), 3);
3767        assert!(
3768            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap1")
3769        );
3770        assert!(
3771            matches!(&definition.steps()[1], BuilderStep::WireTap { uri } if uri == "mock:tap2")
3772        );
3773    }
3774
3775    // ── Error handler: explicit config after shorthand → Mixed mode ─────────────
3776
3777    #[test]
3778    fn test_builder_shorthand_then_explicit_mixed_mode() {
3779        let result = RouteBuilder::from("direct:start")
3780            .route_id("mixed-mode-2")
3781            .dead_letter_channel("log:dlc")
3782            .error_handler(ErrorHandlerConfig::log_only())
3783            .to("mock:out")
3784            .build();
3785
3786        let err = result.err().expect("mixed mode should fail");
3787        assert!(format!("{err}").contains("mixed error handler modes"));
3788    }
3789
3790    // ── build_canonical: empty from_uri error ───────────────────────────────────
3791
3792    #[test]
3793    fn test_build_canonical_empty_from_uri_errors() {
3794        let result = RouteBuilder::from("").route_id("test").build_canonical();
3795        assert!(result.is_err());
3796    }
3797
3798    #[test]
3799    fn test_build_canonical_missing_route_id_errors() {
3800        let result = RouteBuilder::from("direct:start").build_canonical();
3801        assert!(result.is_err());
3802        let err = result.unwrap_err().to_string();
3803        assert!(err.contains("route_id"));
3804    }
3805
3806    // ── SplitBuilder: aggregate inside split ────────────────────────────────────
3807
3808    #[test]
3809    fn test_split_builder_with_aggregate_inside() {
3810        use camel_api::aggregator::AggregatorConfig;
3811        use camel_api::splitter::{SplitterConfig, split_body_lines};
3812
3813        let definition = RouteBuilder::from("timer:test")
3814            .route_id("split-agg")
3815            .split(SplitterConfig::new(split_body_lines()))
3816            .aggregate(
3817                AggregatorConfig::correlate_by("frag-key")
3818                    .complete_when_size(3)
3819                    .build()
3820                    .unwrap(),
3821            )
3822            .end_split()
3823            .build()
3824            .unwrap();
3825
3826        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3827            assert_eq!(steps.len(), 1);
3828            assert!(matches!(&steps[0], BuilderStep::Aggregate { .. }));
3829        } else {
3830            panic!("Expected Split step");
3831        }
3832    }
3833
3834    // ── Throttle: steps collected inside throttle scope ─────────────────────────
3835
3836    #[test]
3837    fn test_throttle_builder_with_steps_inside() {
3838        let definition = RouteBuilder::from("timer:tick")
3839            .route_id("throttle-steps")
3840            .throttle(10, Duration::from_secs(1))
3841            .set_header("throttled", Value::Bool(true))
3842            .to("mock:throttled")
3843            .end_throttle()
3844            .build()
3845            .unwrap();
3846
3847        if let BuilderStep::Throttle { steps, .. } = &definition.steps()[0] {
3848            assert_eq!(steps.len(), 2);
3849        } else {
3850            panic!("Expected Throttle step");
3851        }
3852    }
3853
3854    // ── LoadBalance: steps collected inside scope ───────────────────────────────
3855
3856    #[test]
3857    fn test_load_balance_builder_with_steps_inside() {
3858        let definition = RouteBuilder::from("timer:tick")
3859            .route_id("lb-steps")
3860            .load_balance()
3861            .round_robin()
3862            .set_header("lb", Value::Bool(true))
3863            .to("mock:lb")
3864            .end_load_balance()
3865            .build()
3866            .unwrap();
3867
3868        if let BuilderStep::LoadBalance { steps, .. } = &definition.steps()[0] {
3869            assert_eq!(steps.len(), 2);
3870        } else {
3871            panic!("Expected LoadBalance step");
3872        }
3873    }
3874
3875    // ── Multicast: steps collected inside scope ─────────────────────────────────
3876
3877    #[test]
3878    fn test_multicast_builder_with_steps_inside() {
3879        let definition = RouteBuilder::from("timer:tick")
3880            .route_id("multicast-steps")
3881            .multicast()
3882            .set_header("mc", Value::Bool(true))
3883            .to("mock:multicast")
3884            .end_multicast()
3885            .build()
3886            .unwrap();
3887
3888        if let BuilderStep::Multicast { steps, .. } = &definition.steps()[0] {
3889            assert_eq!(steps.len(), 2);
3890        } else {
3891            panic!("Expected Multicast step");
3892        }
3893    }
3894
3895    // ── LoopBuilder: steps collected inside loop scope ──────────────────────────
3896
3897    #[test]
3898    fn test_loop_builder_with_steps_inside() {
3899        let definition = RouteBuilder::from("timer:tick")
3900            .route_id("loop-steps")
3901            .loop_count(3)
3902            .set_header("loop", Value::Bool(true))
3903            .to("mock:loop")
3904            .end_loop()
3905            .build()
3906            .unwrap();
3907
3908        if let BuilderStep::Loop { steps, .. } = &definition.steps()[0] {
3909            assert_eq!(steps.len(), 2);
3910        } else {
3911            panic!("Expected Loop step");
3912        }
3913    }
3914
3915    // ── canonical_step_name coverage for remaining variants ─────────────────────
3916
3917    #[test]
3918    fn test_build_canonical_rejects_loop_step() {
3919        let err = RouteBuilder::from("direct:start")
3920            .route_id("canonical-loop")
3921            .loop_count(3)
3922            .to("mock:loop")
3923            .end_loop()
3924            .build_canonical()
3925            .unwrap_err();
3926
3927        assert!(format!("{err}").contains("does not support step `loop`"));
3928    }
3929
3930    #[test]
3931    fn test_build_canonical_rejects_multicast_step() {
3932        let err = RouteBuilder::from("direct:start")
3933            .route_id("canonical-multicast")
3934            .multicast()
3935            .to("mock:a")
3936            .end_multicast()
3937            .build_canonical()
3938            .unwrap_err();
3939
3940        assert!(format!("{err}").contains("does not support step `multicast`"));
3941    }
3942
3943    #[test]
3944    fn test_build_canonical_rejects_throttle_step() {
3945        let err = RouteBuilder::from("direct:start")
3946            .route_id("canonical-throttle")
3947            .throttle(10, Duration::from_secs(1))
3948            .to("mock:result")
3949            .end_throttle()
3950            .build_canonical()
3951            .unwrap_err();
3952
3953        assert!(format!("{err}").contains("does not support step `throttle`"));
3954    }
3955
3956    #[test]
3957    fn test_build_canonical_rejects_load_balancer_step() {
3958        let err = RouteBuilder::from("direct:start")
3959            .route_id("canonical-lb")
3960            .load_balance()
3961            .round_robin()
3962            .to("mock:result")
3963            .end_load_balance()
3964            .build_canonical()
3965            .unwrap_err();
3966
3967        assert!(format!("{err}").contains("does not support step `load_balancer`"));
3968    }
3969
3970    #[test]
3971    fn test_build_canonical_rejects_bean_step() {
3972        let err = RouteBuilder::from("direct:start")
3973            .route_id("canonical-bean")
3974            .bean("myBean", "process")
3975            .build_canonical()
3976            .unwrap_err();
3977
3978        assert!(format!("{err}").contains("does not support step `bean`"));
3979    }
3980
3981    #[test]
3982    fn test_build_canonical_rejects_script_step() {
3983        let err = RouteBuilder::from("direct:start")
3984            .route_id("canonical-script")
3985            .script("rhai", "x = 1")
3986            .build_canonical()
3987            .unwrap_err();
3988
3989        assert!(format!("{err}").contains("does not support step `script`"));
3990    }
3991
3992    #[test]
3993    fn test_build_canonical_accepts_delay_step() {
3994        let spec = RouteBuilder::from("direct:start")
3995            .route_id("canonical-delay")
3996            .delay(Duration::from_millis(100))
3997            .build_canonical()
3998            .unwrap();
3999
4000        assert!(
4001            spec.steps.iter().any(
4002                |s| matches!(s, CanonicalStepSpec::Delay { delay_ms, .. } if *delay_ms == 100)
4003            )
4004        );
4005    }
4006
4007    #[test]
4008    fn test_build_canonical_accepts_wire_tap_step() {
4009        let spec = RouteBuilder::from("direct:start")
4010            .route_id("canonical-wiretap")
4011            .wire_tap("mock:tap")
4012            .build_canonical()
4013            .unwrap();
4014
4015        assert!(
4016            spec.steps
4017                .iter()
4018                .any(|s| matches!(s, CanonicalStepSpec::WireTap { uri } if uri == "mock:tap"))
4019        );
4020    }
4021
4022    #[test]
4023    fn test_build_canonical_rejects_dynamic_router_step() {
4024        let err = RouteBuilder::from("direct:start")
4025            .route_id("canonical-dyn-router")
4026            .dynamic_router(Arc::new(|_| Some("mock:a".to_string())))
4027            .build_canonical()
4028            .unwrap_err();
4029
4030        assert!(format!("{err}").contains("does not support step `dynamic_router`"));
4031    }
4032
4033    #[test]
4034    fn test_build_canonical_rejects_routing_slip_step() {
4035        let err = RouteBuilder::from("direct:start")
4036            .route_id("canonical-routing-slip")
4037            .routing_slip(Arc::new(|_| Some("mock:a".to_string())))
4038            .build_canonical()
4039            .unwrap_err();
4040
4041        assert!(format!("{err}").contains("does not support step `routing_slip`"));
4042    }
4043
4044    #[test]
4045    fn test_build_canonical_rejects_recipient_list_step() {
4046        let err = RouteBuilder::from("direct:start")
4047            .route_id("canonical-recipient")
4048            .recipient_list(Arc::new(|_| "mock:a".to_string()))
4049            .build_canonical()
4050            .unwrap_err();
4051
4052        assert!(format!("{err}").contains("does not support step `recipient_list`"));
4053    }
4054
4055    // ── extract_completion_fields: Any mode with predicate → error ──────────────
4056
4057    #[test]
4058    fn test_build_canonical_rejects_any_mode_with_predicate() {
4059        let err = RouteBuilder::from("direct:start")
4060            .route_id("canonical-any-pred")
4061            .aggregate(AggregatorConfig {
4062                header_name: "key".to_string(),
4063                completion: CompletionMode::Any(vec![
4064                    CompletionCondition::Size(5),
4065                    CompletionCondition::Predicate(Arc::new(|_| false)),
4066                ]),
4067                correlation: CorrelationStrategy::HeaderName("key".to_string()),
4068                strategy: AggregationStrategy::CollectAll,
4069                max_buckets: None,
4070                bucket_ttl: None,
4071                force_completion_on_stop: false,
4072                discard_on_timeout: false,
4073            })
4074            .build_canonical()
4075            .unwrap_err();
4076
4077        assert!(format!("{err}").contains("predicate completion"));
4078    }
4079
4080    // ── BUILDER-004: Validation errors for missing required fields ────────────
4081
4082    #[test]
4083    fn test_builder_validation_missing_from_uri() {
4084        let result = RouteBuilder::from("")
4085            .route_id("missing-uri-route")
4086            .to("log:info")
4087            .build();
4088        assert!(result.is_err(), "empty from URI should fail validation");
4089        let err = result.err().unwrap().to_string();
4090        assert!(
4091            err.contains("'from'") || err.contains("URI"),
4092            "error should mention from/URI, got: {err}"
4093        );
4094    }
4095
4096    #[test]
4097    fn test_builder_validation_invalid_step_uri_scheme() {
4098        let result = RouteBuilder::from("timer:tick")
4099            .route_id("bad-step-route")
4100            .to("not-a-valid-uri") // no scheme
4101            .build();
4102        // The builder itself accepts any URI string; validation happens at
4103        // resolution time. Verify the build succeeds (step URI is deferred).
4104        assert!(
4105            result.is_ok(),
4106            "builder should accept opaque step URIs; resolution happens later"
4107        );
4108    }
4109
4110    // ── BUILDER-006: Duplicate route IDs ──────────────────────────────────────
4111
4112    #[test]
4113    fn test_builder_duplicate_route_ids_produce_identical_definitions() {
4114        // The builder itself doesn't check for duplicates (that's context-level).
4115        // Verify both builds succeed with the same ID — detection is TODO(BUILDER-006).
4116        let route1 = RouteBuilder::from("direct:a")
4117            .route_id("dup-route")
4118            .to("mock:out")
4119            .build();
4120        let route2 = RouteBuilder::from("direct:b")
4121            .route_id("dup-route")
4122            .to("mock:out")
4123            .build();
4124
4125        assert!(route1.is_ok());
4126        assert!(route2.is_ok());
4127        assert_eq!(route1.unwrap().route_id(), route2.unwrap().route_id());
4128    }
4129}