Skip to main content

camel_builder/
lib.rs

1//! Fluent builder API for constructing Camel routes programmatically with EIP patterns.
2//!
3//! Main types: `RouteBuilder`, `StepAccumulator`, `SplitBuilder`, `ChoiceBuilder`, `MulticastBuilder`,
4//! `ThrottleBuilder`, `LoopBuilder`, `LoadBalancerBuilder`, `OnExceptionBuilder`.
5
6use camel_api::DelayConfig;
7use camel_api::aggregator::{
8    AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
9};
10use camel_api::body::Body;
11use camel_api::body_converter::BodyType;
12use camel_api::circuit_breaker::CircuitBreakerConfig;
13use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
14use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
15use camel_api::load_balancer::LoadBalancerConfig;
16use camel_api::loop_eip::{LoopConfig, LoopMode};
17use camel_api::multicast::{MulticastConfig, MulticastStrategy};
18use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
19use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
20use camel_api::splitter::SplitterConfig;
21use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
22use camel_api::{
23    BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
24    ProcessorFn, Value,
25    runtime::{
26        CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
27        CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
28        CanonicalWhenSpec,
29    },
30};
31use camel_component_api::ConcurrencyModel;
32use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
33use camel_processor::{
34    ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
35    StreamCacheService, UnmarshalService, builtin_data_format,
36};
37
38/// Shared step-accumulation methods for all builder types.
39///
40/// Implementors provide `steps_mut()` and get step-adding methods for free.
41/// `filter()` and other branching methods are NOT included — they return
42/// different types per builder and stay as per-builder methods.
43pub trait StepAccumulator: Sized {
44    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
45
46    fn to(mut self, endpoint: impl Into<String>) -> Self {
47        self.steps_mut().push(BuilderStep::To(endpoint.into()));
48        self
49    }
50
51    fn process<F, Fut>(mut self, f: F) -> Self
52    where
53        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
54        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
55    {
56        let svc = ProcessorFn::new(f);
57        self.steps_mut()
58            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
59        self
60    }
61
62    fn process_fn(mut self, processor: BoxProcessor) -> Self {
63        self.steps_mut().push(BuilderStep::Processor(processor));
64        self
65    }
66
67    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
68        let svc = SetHeader::new(IdentityProcessor, key, value);
69        self.steps_mut()
70            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
71        self
72    }
73
74    fn map_body<F>(mut self, mapper: F) -> Self
75    where
76        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
77    {
78        let svc = MapBody::new(IdentityProcessor, mapper);
79        self.steps_mut()
80            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
81        self
82    }
83
84    fn set_body<B>(mut self, body: B) -> Self
85    where
86        B: Into<Body> + Clone + Send + Sync + 'static,
87    {
88        let body: Body = body.into();
89        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
90        self.steps_mut()
91            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
92        self
93    }
94
95    /// Apache Camel-compatible alias for [`set_body`](Self::set_body).
96    ///
97    /// Transforms the message body using the given value. Semantically identical
98    /// to `set_body` — provided for familiarity with Apache Camel route DSLs.
99    fn transform<B>(self, body: B) -> Self
100    where
101        B: Into<Body> + Clone + Send + Sync + 'static,
102    {
103        self.set_body(body)
104    }
105
106    fn set_body_fn<F>(mut self, expr: F) -> Self
107    where
108        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
109    {
110        let svc = SetBody::new(IdentityProcessor, expr);
111        self.steps_mut()
112            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
113        self
114    }
115
116    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
117    where
118        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
119    {
120        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
121        self.steps_mut()
122            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
123        self
124    }
125
126    fn aggregate(mut self, config: AggregatorConfig) -> Self {
127        self.steps_mut().push(BuilderStep::Aggregate { config });
128        self
129    }
130
131    /// Stop processing this exchange immediately. No further steps in the
132    /// current pipeline will run.
133    ///
134    /// Can be used at any point in the route: directly on RouteBuilder,
135    /// inside `.filter()`, inside `.split()`, etc.
136    fn stop(mut self) -> Self {
137        self.steps_mut().push(BuilderStep::Stop);
138        self
139    }
140
141    fn delay(mut self, duration: std::time::Duration) -> Self {
142        self.steps_mut().push(BuilderStep::Delay {
143            config: DelayConfig::from_duration(duration),
144        });
145        self
146    }
147
148    fn delay_with_header(
149        mut self,
150        duration: std::time::Duration,
151        header: impl Into<String>,
152    ) -> Self {
153        self.steps_mut().push(BuilderStep::Delay {
154            config: DelayConfig::from_duration_with_header(duration, header),
155        });
156        self
157    }
158
159    /// Log a message at the specified level.
160    ///
161    /// The message will be logged when an exchange passes through this step.
162    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
163        self.steps_mut().push(BuilderStep::Log {
164            level,
165            message: message.into(),
166        });
167        self
168    }
169
170    /// Convert the message body to the target type.
171    ///
172    /// Supported: Text ↔ Json ↔ Bytes. `Body::Stream` always fails.
173    /// Returns `TypeConversionFailed` if conversion is not possible.
174    ///
175    /// # Example
176    /// ```ignore
177    /// route.set_body(Value::String(r#"{"x":1}"#.into()))
178    ///      .convert_body_to(BodyType::Json)
179    ///      .to("direct:next")
180    /// ```
181    fn convert_body_to(mut self, target: BodyType) -> Self {
182        let svc = ConvertBodyTo::new(IdentityProcessor, target);
183        self.steps_mut()
184            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
185        self
186    }
187
188    fn stream_cache(mut self, threshold: usize) -> Self {
189        let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
190        let svc = StreamCacheService::new(IdentityProcessor, config);
191        self.steps_mut()
192            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
193        self
194    }
195
196    /// Materialize `Body::Stream` into `Body::Bytes` using the default threshold (128 KB).
197    ///
198    /// Equivalent to `.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)`.
199    fn stream_cache_default(self) -> Self {
200        self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
201    }
202
203    /// Marshal the message body using the specified data format.
204    ///
205    /// Supported formats: `"json"`, `"xml"`. Returns `Err(CamelError::Config)` if
206    /// the format name is unknown.
207    /// Converts a structured body (e.g., `Body::Json`) to a wire-format body (e.g., `Body::Text`).
208    ///
209    /// # Example
210    /// ```ignore
211    /// route.marshal("json")?.to("direct:next")
212    /// ```
213    fn marshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
214        let name = format.into();
215        let df = builtin_data_format(&name)
216            .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
217        let svc = MarshalService::new(IdentityProcessor, df);
218        self.steps_mut()
219            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
220        Ok(self)
221    }
222
223    /// Unmarshal the message body using the specified data format.
224    ///
225    /// Supported formats: `"json"`, `"xml"`. Returns `Err(CamelError::Config)` if
226    /// the format name is unknown.
227    /// Converts a wire-format body (e.g., `Body::Text`) to a structured body (e.g., `Body::Json`).
228    ///
229    /// # Example
230    /// ```ignore
231    /// route.unmarshal("json")?.to("direct:next")
232    /// ```
233    fn unmarshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
234        let name = format.into();
235        let df = builtin_data_format(&name)
236            .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
237        let svc = UnmarshalService::new(IdentityProcessor, df);
238        self.steps_mut()
239            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
240        Ok(self)
241    }
242
243    /// Validate the exchange body against a schema file.
244    ///
245    /// Shorthand for `to("validator:path")`. Supports XSD, JSON Schema, and YAML.
246    ///
247    /// # Example
248    /// ```ignore
249    /// route.validate("schemas/order.xsd").to("direct:out")
250    /// ```
251    fn validate(mut self, schema_path: impl Into<String>) -> Self {
252        let path = schema_path.into();
253        let uri = if path.starts_with("validator:") {
254            path
255        } else {
256            format!("validator:{path}")
257        };
258        self.steps_mut().push(BuilderStep::To(uri));
259        self
260    }
261
262    /// Execute a script that can modify the exchange (headers, properties, body).
263    ///
264    /// The script has access to `headers`, `properties`, and `body` variables
265    /// and can modify them with assignment syntax: `headers["k"] = v`.
266    ///
267    /// # Example
268    /// ```ignore
269    /// // ignore: requires full CamelContext setup with registered language
270    /// route.script("rhai", r#"headers["tenant"] = "acme"; body = body + "_processed""#)
271    /// ```
272    fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
273        self.steps_mut().push(BuilderStep::Script {
274            language: language.into(),
275            script: script.into(),
276        });
277        self
278    }
279
280    /// EIP-7 enrich: synchronous content enrichment via a resolved producer.
281    ///
282    /// Calls the given endpoint URI as a producer, then merges the response
283    /// back into the original exchange body using the default `UseEnrichedBody`
284    /// strategy (original headers/properties are preserved).
285    fn enrich(mut self, uri: impl Into<String>) -> Self {
286        self.steps_mut().push(BuilderStep::Enrich {
287            uri: uri.into(),
288            strategy: None,
289            timeout_ms: None,
290        });
291        self
292    }
293
294    /// EIP-7 pollEnrich: blocking poll of a PollingConsumer with timeout.
295    ///
296    /// Reads from a polling endpoint (e.g., file) and merges the result
297    /// into the exchange body using the default `UseEnrichedBody` strategy.
298    /// `timeout_ms` controls how long to wait for data.
299    fn poll_enrich(mut self, uri: impl Into<String>, timeout_ms: u64) -> Self {
300        self.steps_mut().push(BuilderStep::PollEnrich {
301            uri: uri.into(),
302            strategy: None,
303            timeout_ms: Some(timeout_ms),
304        });
305        self
306    }
307
308    fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
309        self.steps_mut().push(BuilderStep::Bean {
310            name: name.into(),
311            method: method.into(),
312        });
313        self
314    }
315}
316
317/// A fluent builder for constructing routes.
318///
319/// # Example
320///
321/// ```ignore
322/// let definition = RouteBuilder::from("timer:tick?period=1000")
323///     .set_header("source", Value::String("timer".into()))
324///     .filter(|ex| ex.input.body.as_text().is_some())
325///     .to("log:info?showHeaders=true")
326///     .build()?;
327/// ```
328// TODO(BUILDER-003): RouteBuilder does not support clone-and-reuse semantics.
329// Once built, the builder is consumed. Add `Clone` or a `fork()` method to allow
330// reusing a partially-built route as a template for multiple routes.
331pub struct RouteBuilder {
332    from_uri: String,
333    steps: Vec<BuilderStep>,
334    error_handler: Option<ErrorHandlerConfig>,
335    error_handler_mode: ErrorHandlerMode,
336    circuit_breaker_config: Option<CircuitBreakerConfig>,
337    security_policy_config: Option<camel_api::security_policy::SecurityPolicyConfig>,
338    security_authenticator: Option<std::sync::Arc<dyn camel_auth::TokenAuthenticator>>,
339    concurrency: Option<ConcurrencyModel>,
340    route_id: Option<String>,
341    auto_startup: Option<bool>,
342    startup_order: Option<i32>,
343}
344
345#[derive(Default)]
346enum ErrorHandlerMode {
347    #[default]
348    None,
349    ExplicitConfig,
350    Shorthand {
351        dlc_uri: Option<String>,
352        specs: Vec<OnExceptionSpec>,
353    },
354    Mixed,
355}
356
357#[derive(Clone)]
358struct OnExceptionSpec {
359    matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
360    retry: Option<RedeliveryPolicy>,
361    handled_by: Option<String>,
362}
363
364impl RouteBuilder {
365    /// Start building a route from the given source endpoint URI.
366    pub fn from(endpoint: &str) -> Self {
367        Self {
368            from_uri: endpoint.to_string(),
369            steps: Vec::new(),
370            error_handler: None,
371            error_handler_mode: ErrorHandlerMode::None,
372            circuit_breaker_config: None,
373            security_policy_config: None,
374            security_authenticator: None,
375            concurrency: None,
376            route_id: None,
377            auto_startup: None,
378            startup_order: None,
379        }
380    }
381
382    /// Open a filter scope. Only exchanges matching `predicate` will be processed
383    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
384    /// and continue to steps after `.end_filter()`.
385    pub fn filter<F>(self, predicate: F) -> FilterBuilder
386    where
387        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
388    {
389        FilterBuilder {
390            parent: self,
391            predicate: std::sync::Arc::new(predicate),
392            steps: vec![],
393        }
394    }
395
396    /// Open a choice scope for content-based routing.
397    ///
398    /// Within the choice, you can define multiple `.when()` clauses and an
399    /// optional `.otherwise()` clause. The first matching `when` predicate
400    /// determines which sub-pipeline executes.
401    pub fn choice(self) -> ChoiceBuilder {
402        ChoiceBuilder {
403            parent: self,
404            whens: vec![],
405            _otherwise: None,
406        }
407    }
408
409    /// Add a WireTap step that sends a clone of the exchange to the given
410    /// endpoint URI (fire-and-forget). The original exchange continues
411    /// downstream unchanged.
412    pub fn wire_tap(mut self, endpoint: &str) -> Self {
413        self.steps.push(BuilderStep::WireTap {
414            uri: endpoint.to_string(),
415        });
416        self
417    }
418
419    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
420    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
421        self.error_handler_mode = match self.error_handler_mode {
422            ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
423                ErrorHandlerMode::ExplicitConfig
424            }
425            ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
426        };
427        self.error_handler = Some(config);
428        self
429    }
430
431    /// Set a dead letter channel URI for shorthand error handler mode.
432    pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
433        let uri = uri.into();
434        self.error_handler_mode = match self.error_handler_mode {
435            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
436                dlc_uri: Some(uri),
437                specs: Vec::new(),
438            },
439            ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
440                dlc_uri: Some(uri),
441                specs,
442            },
443            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
444        };
445        self
446    }
447
448    /// Add a shorthand exception policy scope. Call `.end_on_exception()` to return to route builder.
449    pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
450    where
451        F: Fn(&CamelError) -> bool + Send + Sync + 'static,
452    {
453        self.error_handler_mode = match self.error_handler_mode {
454            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
455                dlc_uri: None,
456                specs: Vec::new(),
457            },
458            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
459            shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
460        };
461
462        OnExceptionBuilder {
463            parent: self,
464            policy: OnExceptionSpec {
465                matches: std::sync::Arc::new(matches),
466                retry: None,
467                handled_by: None,
468            },
469        }
470    }
471
472    /// Set a circuit breaker for this route.
473    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
474        self.circuit_breaker_config = Some(config);
475        self
476    }
477
478    pub fn security_policy(
479        mut self,
480        config: camel_api::security_policy::SecurityPolicyConfig,
481    ) -> Self {
482        self.security_policy_config = Some(config);
483        self
484    }
485
486    pub fn security_authenticator(
487        mut self,
488        auth: std::sync::Arc<dyn camel_auth::TokenAuthenticator>,
489    ) -> Self {
490        self.security_authenticator = Some(auth);
491        self
492    }
493
494    /// Override the consumer's default concurrency model.
495    ///
496    /// When set, the pipeline spawns a task per exchange, processing them
497    /// concurrently. `max` limits the number of simultaneously active
498    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
499    ///
500    /// # Example
501    /// ```ignore
502    /// RouteBuilder::from("http://0.0.0.0:8080/api")
503    ///     .concurrent(16)  // max 16 in-flight pipeline executions
504    ///     .process(handle_request)
505    ///     .build()
506    /// ```
507    pub fn concurrent(mut self, max: usize) -> Self {
508        let max = if max == 0 { None } else { Some(max) };
509        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
510        self
511    }
512
513    /// Force sequential processing, overriding a concurrent-capable consumer.
514    ///
515    /// Useful for HTTP routes that mutate shared state and need ordering
516    /// guarantees.
517    pub fn sequential(mut self) -> Self {
518        self.concurrency = Some(ConcurrencyModel::Sequential);
519        self
520    }
521
522    /// Set the route ID for this route.
523    ///
524    /// If not set, the route will be assigned an auto-generated ID.
525    pub fn route_id(mut self, id: impl Into<String>) -> Self {
526        self.route_id = Some(id.into());
527        self
528    }
529
530    /// Set whether this route should automatically start when the context starts.
531    ///
532    /// Default is `true`.
533    pub fn auto_startup(mut self, auto: bool) -> Self {
534        self.auto_startup = Some(auto);
535        self
536    }
537
538    /// Set the startup order for this route.
539    ///
540    /// Routes with lower values start first. Default is 1000.
541    pub fn startup_order(mut self, order: i32) -> Self {
542        self.startup_order = Some(order);
543        self
544    }
545
546    /// Begin a Splitter sub-pipeline. Steps added after this call (until
547    /// `.end_split()`) will be executed per-fragment.
548    ///
549    /// Returns a `SplitBuilder` — you cannot call `.build()` until
550    /// `.end_split()` closes the split scope (enforced by the type system).
551    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
552        SplitBuilder {
553            parent: self,
554            config,
555            steps: Vec::new(),
556        }
557    }
558
559    /// Begin a Multicast sub-pipeline. Steps added after this call (until
560    /// `.end_multicast()`) will each receive a copy of the exchange.
561    ///
562    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
563    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
564    pub fn multicast(self) -> MulticastBuilder {
565        MulticastBuilder {
566            parent: self,
567            steps: Vec::new(),
568            config: MulticastConfig::new(),
569        }
570    }
571
572    /// Begin a Throttle sub-pipeline. Rate limits message processing to at most
573    /// `max_requests` per `period`. Steps inside the throttle scope are only
574    /// executed when the rate limit allows.
575    ///
576    /// Returns a `ThrottleBuilder` — you cannot call `.build()` until
577    /// `.end_throttle()` closes the throttle scope (enforced by the type system).
578    pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
579        ThrottleBuilder {
580            parent: self,
581            config: ThrottlerConfig::new(max_requests, period),
582            steps: Vec::new(),
583        }
584    }
585
586    /// Begin a Loop sub-pipeline that iterates a fixed number of times.
587    pub fn loop_count(self, count: usize) -> LoopBuilder {
588        LoopBuilder {
589            parent: self,
590            config: LoopConfig {
591                mode: LoopMode::Count(count),
592            },
593            steps: vec![],
594        }
595    }
596
597    /// Begin a Loop sub-pipeline that iterates while a predicate is true.
598    pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
599    where
600        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
601    {
602        LoopBuilder {
603            parent: self,
604            config: LoopConfig {
605                mode: LoopMode::While(std::sync::Arc::new(predicate)),
606            },
607            steps: vec![],
608        }
609    }
610
611    /// Begin a LoadBalance sub-pipeline. Distributes exchanges across multiple
612    /// endpoints using a configurable strategy (round-robin, random, weighted, failover).
613    ///
614    /// Returns a `LoadBalancerBuilder` — you cannot call `.build()` until
615    /// `.end_load_balance()` closes the load balance scope (enforced by the type system).
616    pub fn load_balance(self) -> LoadBalancerBuilder {
617        LoadBalancerBuilder {
618            parent: self,
619            config: LoadBalancerConfig::round_robin(),
620            steps: Vec::new(),
621        }
622    }
623
624    /// Add a dynamic router step that routes exchanges dynamically based on
625    /// expression evaluation at runtime.
626    ///
627    /// The expression receives the exchange and returns `Some(uri)` to route to
628    /// the next endpoint, or `None` to stop routing.
629    ///
630    /// # Example
631    /// ```ignore
632    /// RouteBuilder::from("timer:tick")
633    ///     .route_id("test-route")
634    ///     .dynamic_router(|ex| {
635    ///         ex.input.header("dest").and_then(|v| v.as_str().map(|s| s.to_string()))
636    ///     })
637    ///     .build()
638    /// ```
639    pub fn dynamic_router(self, expression: RouterExpression) -> Self {
640        self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
641    }
642
643    /// Add a dynamic router step with full configuration.
644    ///
645    /// Allows customization of URI delimiter, cache size, timeout, and other options.
646    pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
647        self.steps.push(BuilderStep::DynamicRouter { config });
648        self
649    }
650
651    pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
652        self.routing_slip_with_config(RoutingSlipConfig::new(expression))
653    }
654
655    pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
656        self.steps.push(BuilderStep::RoutingSlip { config });
657        self
658    }
659
660    pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
661        self.recipient_list_with_config(RecipientListConfig::new(expression))
662    }
663
664    pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
665        self.steps.push(BuilderStep::RecipientList { config });
666        self
667    }
668
669    /// Consume the builder and produce a [`RouteDefinition`].
670    // TODO(BUILDER-006): Validate duplicate route IDs. When a route with the same
671    // ID is already registered in the context, return `Err(CamelError::RouteError)`.
672    // Currently, duplicate IDs are silently accepted; detection should happen at
673    // `CamelContext::add_route_definition` time.
674    pub fn build(self) -> Result<RouteDefinition, CamelError> {
675        validate_uri(&self.from_uri)?;
676        let route_id = self
677            .route_id
678            .filter(|s| !s.trim().is_empty())
679            .ok_or_else(|| {
680                CamelError::RouteError(
681                    "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
682                        .to_string(),
683                )
684            })?;
685        let resolved_error_handler = match self.error_handler_mode {
686            ErrorHandlerMode::None => self.error_handler,
687            ErrorHandlerMode::ExplicitConfig => self.error_handler,
688            ErrorHandlerMode::Mixed => {
689                return Err(CamelError::RouteError(
690                    "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
691                ));
692            }
693            ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
694                let mut config = if let Some(uri) = dlc_uri {
695                    ErrorHandlerConfig::dead_letter_channel(uri)
696                } else {
697                    ErrorHandlerConfig::log_only()
698                };
699
700                for spec in specs {
701                    let matcher = spec.matches.clone();
702                    let mut builder = config.on_exception(move |e| matcher(e));
703
704                    if let Some(retry) = spec.retry {
705                        builder = builder.retry(retry.max_attempts).with_backoff(
706                            retry.initial_delay,
707                            retry.multiplier,
708                            retry.max_delay,
709                        );
710                        if retry.jitter_factor > 0.0 {
711                            builder = builder.with_jitter(retry.jitter_factor);
712                        }
713                    }
714
715                    if let Some(uri) = spec.handled_by {
716                        builder = builder.handled_by(uri);
717                    }
718
719                    config = builder.build();
720                }
721
722                Some(config)
723            }
724        };
725
726        let definition = RouteDefinition::new(self.from_uri, self.steps);
727        let definition = if let Some(eh) = resolved_error_handler {
728            definition.with_error_handler(eh)
729        } else {
730            definition
731        };
732        let definition = if let Some(cb) = self.circuit_breaker_config {
733            definition.with_circuit_breaker(cb)
734        } else {
735            definition
736        };
737        let definition = if let Some(sp) = self.security_policy_config {
738            definition.with_security_policy(sp)
739        } else {
740            definition
741        };
742        let definition = if let Some(auth) = self.security_authenticator {
743            definition.with_security_authenticator(auth)
744        } else {
745            definition
746        };
747        let definition = if let Some(concurrency) = self.concurrency {
748            definition.with_concurrency(concurrency)
749        } else {
750            definition
751        };
752        let definition = definition.with_route_id(route_id);
753        let definition = if let Some(auto) = self.auto_startup {
754            definition.with_auto_startup(auto)
755        } else {
756            definition
757        };
758        let definition = if let Some(order) = self.startup_order {
759            definition.with_startup_order(order)
760        } else {
761            definition
762        };
763        Ok(definition)
764    }
765
766    /// Compile this builder route into canonical v1 spec.
767    pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
768        validate_uri(&self.from_uri)?;
769        let route_id = self
770            .route_id
771            .filter(|s| !s.trim().is_empty())
772            .ok_or_else(|| {
773                CamelError::RouteError(
774                    "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
775                        .to_string(),
776                )
777            })?;
778
779        let steps = canonicalize_steps(self.steps)?;
780        let circuit_breaker = self
781            .circuit_breaker_config
782            .map(canonicalize_circuit_breaker);
783
784        if self.security_policy_config.is_some() {
785            return Err(CamelError::RouteError(
786                "routes with security_policy cannot use the canonical/hot-reload path (not yet supported)"
787                    .into(),
788            ));
789        }
790
791        let spec = CanonicalRouteSpec {
792            route_id,
793            from: self.from_uri,
794            steps,
795            circuit_breaker,
796            version: camel_api::CANONICAL_CONTRACT_VERSION,
797        };
798        spec.validate_contract()?;
799        Ok(spec)
800    }
801}
802
803pub struct OnExceptionBuilder {
804    parent: RouteBuilder,
805    policy: OnExceptionSpec,
806}
807
808impl OnExceptionBuilder {
809    pub fn retry(mut self, max_attempts: u32) -> Self {
810        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
811        self
812    }
813
814    pub fn with_backoff(
815        mut self,
816        initial: std::time::Duration,
817        multiplier: f64,
818        max: std::time::Duration,
819    ) -> Self {
820        if let Some(ref mut retry) = self.policy.retry {
821            retry.initial_delay = initial;
822            retry.multiplier = multiplier;
823            retry.max_delay = max;
824        } else {
825            tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
826        }
827        self
828    }
829
830    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
831        if let Some(ref mut retry) = self.policy.retry {
832            retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
833        } else {
834            tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
835        }
836        self
837    }
838
839    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
840        self.policy.handled_by = Some(uri.into());
841        self
842    }
843
844    pub fn end_on_exception(mut self) -> RouteBuilder {
845        if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
846            specs.push(self.policy);
847        }
848        self.parent
849    }
850}
851
852/// Validate that a URI is non-empty and contains a scheme component.
853fn validate_uri(uri: &str) -> Result<(), CamelError> {
854    let trimmed = uri.trim();
855    if trimmed.is_empty() {
856        return Err(CamelError::RouteError(
857            "route must have a 'from' URI".to_string(),
858        ));
859    }
860    if !trimmed.contains(':') {
861        return Err(CamelError::RouteError(
862            "URI must have a scheme (e.g. 'timer:tick')".to_string(),
863        ));
864    }
865    let scheme = trimmed.split(':').next().unwrap_or("");
866    if scheme.trim().is_empty() {
867        return Err(CamelError::RouteError(
868            "URI scheme must not be empty".to_string(),
869        ));
870    }
871    Ok(())
872}
873
874fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
875    let mut canonical = Vec::with_capacity(steps.len());
876    for step in steps {
877        canonical.push(canonicalize_step(step)?);
878    }
879    Ok(canonical)
880}
881
882fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
883    match step {
884        BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
885        BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
886        BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
887        BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
888        BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
889            delay_ms: config.delay_ms,
890            dynamic_header: config.dynamic_header,
891        }),
892        BuilderStep::DeclarativeScript { expression } => {
893            Ok(CanonicalStepSpec::Script { expression })
894        }
895        BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
896            predicate,
897            steps: canonicalize_steps(steps)?,
898        }),
899        BuilderStep::DeclarativeChoice { whens, otherwise } => {
900            let mut canonical_whens = Vec::with_capacity(whens.len());
901            for DeclarativeWhenStep { predicate, steps } in whens {
902                canonical_whens.push(CanonicalWhenSpec {
903                    predicate,
904                    steps: canonicalize_steps(steps)?,
905                });
906            }
907            let otherwise = match otherwise {
908                Some(steps) => Some(canonicalize_steps(steps)?),
909                None => None,
910            };
911            Ok(CanonicalStepSpec::Choice {
912                whens: canonical_whens,
913                otherwise,
914            })
915        }
916        BuilderStep::DeclarativeSplit {
917            expression,
918            aggregation,
919            parallel,
920            parallel_limit,
921            stop_on_exception,
922            steps,
923        } => Ok(CanonicalStepSpec::Split {
924            expression: CanonicalSplitExpressionSpec::Language(expression),
925            aggregation: canonicalize_split_aggregation(aggregation)?,
926            parallel,
927            parallel_limit,
928            stop_on_exception,
929            steps: canonicalize_steps(steps)?,
930        }),
931        BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate(
932            canonicalize_aggregate(config)?,
933        )),
934        other => {
935            let step_name = canonical_step_name(&other);
936            let detail = camel_api::canonical_contract_rejection_reason(step_name)
937                .unwrap_or("not included in canonical v1");
938            Err(CamelError::RouteError(format!(
939                "canonical v1 does not support step `{step_name}`: {detail}"
940            )))
941        }
942    }
943}
944
945fn canonicalize_split_aggregation(
946    strategy: camel_api::splitter::AggregationStrategy,
947) -> Result<CanonicalSplitAggregationSpec, CamelError> {
948    match strategy {
949        camel_api::splitter::AggregationStrategy::LastWins => {
950            Ok(CanonicalSplitAggregationSpec::LastWins)
951        }
952        camel_api::splitter::AggregationStrategy::CollectAll => {
953            Ok(CanonicalSplitAggregationSpec::CollectAll)
954        }
955        camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
956            "canonical v1 does not support custom split aggregation".to_string(),
957        )),
958        camel_api::splitter::AggregationStrategy::Original => {
959            Ok(CanonicalSplitAggregationSpec::Original)
960        }
961    }
962}
963
964fn extract_completion_fields(
965    mode: &CompletionMode,
966) -> Result<(Option<usize>, Option<u64>), CamelError> {
967    match mode {
968        CompletionMode::Single(cond) => match cond {
969            CompletionCondition::Size(n) => Ok((Some(*n), None)),
970            CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
971            CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
972                "canonical v1 does not support aggregate predicate completion".to_string(),
973            )),
974        },
975        CompletionMode::Any(conds) => {
976            let mut size = None;
977            let mut timeout_ms = None;
978            for cond in conds {
979                match cond {
980                    CompletionCondition::Size(n) => size = Some(*n),
981                    CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
982                    CompletionCondition::Predicate(_) => {
983                        return Err(CamelError::RouteError(
984                            "canonical v1 does not support aggregate predicate completion"
985                                .to_string(),
986                        ));
987                    }
988                }
989            }
990            Ok((size, timeout_ms))
991        }
992    }
993}
994
995fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
996    let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
997
998    let header = match &config.correlation {
999        CorrelationStrategy::HeaderName(h) => h.clone(),
1000        CorrelationStrategy::Expression { expr, .. } => expr.clone(),
1001        CorrelationStrategy::Fn(_) => {
1002            return Err(CamelError::RouteError(
1003                "canonical v1 does not support Fn correlation strategy".to_string(),
1004            ));
1005        }
1006    };
1007
1008    let correlation_key = match &config.correlation {
1009        CorrelationStrategy::HeaderName(_) => None,
1010        CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
1011        CorrelationStrategy::Fn(_) => unreachable!(),
1012    };
1013
1014    let strategy = match config.strategy {
1015        AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
1016        AggregationStrategy::Custom(_) => {
1017            return Err(CamelError::RouteError(
1018                "canonical v1 does not support custom aggregate strategy".to_string(),
1019            ));
1020        }
1021    };
1022    let bucket_ttl_ms = config
1023        .bucket_ttl
1024        .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
1025
1026    Ok(CanonicalAggregateSpec {
1027        header,
1028        completion_size,
1029        completion_timeout_ms,
1030        correlation_key,
1031        force_completion_on_stop: if config.force_completion_on_stop {
1032            Some(true)
1033        } else {
1034            None
1035        },
1036        discard_on_timeout: if config.discard_on_timeout {
1037            Some(true)
1038        } else {
1039            None
1040        },
1041        strategy,
1042        max_buckets: config.max_buckets,
1043        bucket_ttl_ms,
1044    })
1045}
1046
1047fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
1048    CanonicalCircuitBreakerSpec {
1049        failure_threshold: config.failure_threshold,
1050        open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
1051    }
1052}
1053
1054fn canonical_step_name(step: &BuilderStep) -> &'static str {
1055    match step {
1056        BuilderStep::Processor(_) => "processor",
1057        BuilderStep::To(_) => "to",
1058        BuilderStep::Stop => "stop",
1059        BuilderStep::Log { .. } => "log",
1060        BuilderStep::DeclarativeSetHeader { .. } => "set_header",
1061        BuilderStep::DeclarativeSetBody { .. } => "set_body",
1062        BuilderStep::DeclarativeFilter { .. } => "filter",
1063        BuilderStep::DeclarativeChoice { .. } => "choice",
1064        BuilderStep::DeclarativeScript { .. } => "script",
1065        BuilderStep::DeclarativeFunction { .. } => "function",
1066        BuilderStep::DeclarativeSplit { .. } => "split",
1067        BuilderStep::Split { .. } => "split",
1068        BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
1069        BuilderStep::Aggregate { .. } => "aggregate",
1070        BuilderStep::Filter { .. } => "filter",
1071        BuilderStep::Choice { .. } => "choice",
1072        BuilderStep::WireTap { .. } => "wire_tap",
1073        BuilderStep::Delay { .. } => "delay",
1074        BuilderStep::Multicast { .. } => "multicast",
1075        BuilderStep::DeclarativeLog { .. } => "log",
1076        BuilderStep::Bean { .. } => "bean",
1077        BuilderStep::Script { .. } => "script",
1078        BuilderStep::Throttle { .. } => "throttle",
1079        BuilderStep::LoadBalance { .. } => "load_balancer",
1080        BuilderStep::DynamicRouter { .. } => "dynamic_router",
1081        BuilderStep::RoutingSlip { .. } => "routing_slip",
1082        BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
1083        BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
1084        BuilderStep::RecipientList { .. } => "recipient_list",
1085        BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
1086        BuilderStep::DeclarativeSetProperty { .. } => "set_property",
1087        BuilderStep::Enrich { .. } => "enrich",
1088        BuilderStep::PollEnrich { .. } => "poll_enrich",
1089    }
1090}
1091
1092impl StepAccumulator for RouteBuilder {
1093    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1094        &mut self.steps
1095    }
1096}
1097
1098/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
1099///
1100/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1101/// but NOT `.build()` and NOT `.split()` (no nested splits).
1102///
1103/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
1104/// and returns the parent `RouteBuilder`.
1105pub struct SplitBuilder {
1106    parent: RouteBuilder,
1107    config: SplitterConfig,
1108    steps: Vec<BuilderStep>,
1109}
1110
1111impl SplitBuilder {
1112    /// Open a filter scope within the split sub-pipeline.
1113    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1114    where
1115        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1116    {
1117        FilterInSplitBuilder {
1118            parent: self,
1119            predicate: std::sync::Arc::new(predicate),
1120            steps: vec![],
1121        }
1122    }
1123
1124    /// Close the split scope. Packages the accumulated sub-steps into a
1125    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
1126    pub fn end_split(mut self) -> RouteBuilder {
1127        let split_step = BuilderStep::Split {
1128            config: self.config,
1129            steps: self.steps,
1130        };
1131        self.parent.steps.push(split_step);
1132        self.parent
1133    }
1134}
1135
1136impl StepAccumulator for SplitBuilder {
1137    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1138        &mut self.steps
1139    }
1140}
1141
1142/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
1143pub struct FilterBuilder {
1144    parent: RouteBuilder,
1145    predicate: FilterPredicate,
1146    steps: Vec<BuilderStep>,
1147}
1148
1149impl FilterBuilder {
1150    /// Close the filter scope. Packages the accumulated sub-steps into a
1151    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
1152    pub fn end_filter(mut self) -> RouteBuilder {
1153        let step = BuilderStep::Filter {
1154            predicate: self.predicate,
1155            steps: self.steps,
1156        };
1157        self.parent.steps.push(step);
1158        self.parent
1159    }
1160}
1161
1162impl StepAccumulator for FilterBuilder {
1163    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1164        &mut self.steps
1165    }
1166}
1167
1168/// Builder for a filter scope nested inside a `.split()` block.
1169pub struct FilterInSplitBuilder {
1170    parent: SplitBuilder,
1171    predicate: FilterPredicate,
1172    steps: Vec<BuilderStep>,
1173}
1174
1175impl FilterInSplitBuilder {
1176    /// Close the filter scope and return the parent `SplitBuilder`.
1177    pub fn end_filter(mut self) -> SplitBuilder {
1178        let step = BuilderStep::Filter {
1179            predicate: self.predicate,
1180            steps: self.steps,
1181        };
1182        self.parent.steps.push(step);
1183        self.parent
1184    }
1185}
1186
1187impl StepAccumulator for FilterInSplitBuilder {
1188    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1189        &mut self.steps
1190    }
1191}
1192
1193// ── Choice/When/Otherwise builders ─────────────────────────────────────────
1194
1195/// Builder for a `.choice()` ... `.end_choice()` block.
1196///
1197/// Accumulates `when` clauses and an optional `otherwise` clause.
1198/// Cannot call `.build()` until `.end_choice()` is called.
1199pub struct ChoiceBuilder {
1200    parent: RouteBuilder,
1201    whens: Vec<WhenStep>,
1202    _otherwise: Option<Vec<BuilderStep>>,
1203}
1204
1205impl ChoiceBuilder {
1206    /// Open a `when` clause. Only exchanges matching `predicate` will be
1207    /// processed by the steps inside the `.when()` ... `.end_when()` scope.
1208    pub fn when<F>(self, predicate: F) -> WhenBuilder
1209    where
1210        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1211    {
1212        WhenBuilder {
1213            parent: self,
1214            predicate: std::sync::Arc::new(predicate),
1215            steps: vec![],
1216        }
1217    }
1218
1219    /// Open an `otherwise` clause. Executed when no `when` predicate matched.
1220    ///
1221    /// Only one `otherwise` is allowed per `choice`. Call this after all `.when()` clauses.
1222    pub fn otherwise(self) -> OtherwiseBuilder {
1223        OtherwiseBuilder {
1224            parent: self,
1225            steps: vec![],
1226        }
1227    }
1228
1229    /// Close the choice scope. Packages all accumulated `when` clauses and
1230    /// optional `otherwise` into a `BuilderStep::Choice` and returns the
1231    /// parent `RouteBuilder`.
1232    pub fn end_choice(mut self) -> RouteBuilder {
1233        let step = BuilderStep::Choice {
1234            whens: self.whens,
1235            otherwise: self._otherwise,
1236        };
1237        self.parent.steps.push(step);
1238        self.parent
1239    }
1240}
1241
1242/// Builder for the sub-pipeline within a `.when()` ... `.end_when()` block.
1243pub struct WhenBuilder {
1244    parent: ChoiceBuilder,
1245    predicate: camel_api::FilterPredicate,
1246    steps: Vec<BuilderStep>,
1247}
1248
1249impl WhenBuilder {
1250    /// Close the when scope. Packages the accumulated sub-steps into a
1251    /// `WhenStep` and returns the parent `ChoiceBuilder`.
1252    pub fn end_when(mut self) -> ChoiceBuilder {
1253        self.parent.whens.push(WhenStep {
1254            predicate: self.predicate,
1255            steps: self.steps,
1256        });
1257        self.parent
1258    }
1259}
1260
1261impl StepAccumulator for WhenBuilder {
1262    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1263        &mut self.steps
1264    }
1265}
1266
1267/// Builder for the sub-pipeline within an `.otherwise()` ... `.end_otherwise()` block.
1268pub struct OtherwiseBuilder {
1269    parent: ChoiceBuilder,
1270    steps: Vec<BuilderStep>,
1271}
1272
1273impl OtherwiseBuilder {
1274    /// Close the otherwise scope and return the parent `ChoiceBuilder`.
1275    pub fn end_otherwise(self) -> ChoiceBuilder {
1276        let OtherwiseBuilder { mut parent, steps } = self;
1277        parent._otherwise = Some(steps);
1278        parent
1279    }
1280}
1281
1282impl StepAccumulator for OtherwiseBuilder {
1283    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1284        &mut self.steps
1285    }
1286}
1287
1288/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
1289///
1290/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1291/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
1292///
1293/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
1294/// and returns the parent `RouteBuilder`.
1295pub struct MulticastBuilder {
1296    parent: RouteBuilder,
1297    steps: Vec<BuilderStep>,
1298    config: MulticastConfig,
1299}
1300
1301impl MulticastBuilder {
1302    pub fn parallel(mut self, parallel: bool) -> Self {
1303        self.config = self.config.parallel(parallel);
1304        self
1305    }
1306
1307    pub fn parallel_limit(mut self, limit: usize) -> Self {
1308        self.config = self.config.parallel_limit(limit);
1309        self
1310    }
1311
1312    pub fn stop_on_exception(mut self, stop: bool) -> Self {
1313        self.config = self.config.stop_on_exception(stop);
1314        self
1315    }
1316
1317    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1318        self.config = self.config.timeout(duration);
1319        self
1320    }
1321
1322    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1323        self.config = self.config.aggregation(strategy);
1324        self
1325    }
1326
1327    pub fn end_multicast(mut self) -> RouteBuilder {
1328        let step = BuilderStep::Multicast {
1329            steps: self.steps,
1330            config: self.config,
1331        };
1332        self.parent.steps.push(step);
1333        self.parent
1334    }
1335}
1336
1337impl StepAccumulator for MulticastBuilder {
1338    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1339        &mut self.steps
1340    }
1341}
1342
1343/// Builder for the sub-pipeline within a `.throttle()` ... `.end_throttle()` block.
1344///
1345/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1346/// but NOT `.build()` and NOT `.throttle()` (no nested throttles).
1347///
1348/// Calling `.end_throttle()` packages the sub-steps into a `BuilderStep::Throttle`
1349/// and returns the parent `RouteBuilder`.
1350pub struct ThrottleBuilder {
1351    parent: RouteBuilder,
1352    config: ThrottlerConfig,
1353    steps: Vec<BuilderStep>,
1354}
1355
1356impl ThrottleBuilder {
1357    /// Set the throttle strategy. Default is `Delay`.
1358    ///
1359    /// - `Delay`: Queue messages until capacity available
1360    /// - `Reject`: Return error immediately when throttled
1361    /// - `Drop`: Silently discard excess messages
1362    pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1363        self.config = self.config.strategy(strategy);
1364        self
1365    }
1366
1367    /// Close the throttle scope. Packages the accumulated sub-steps into a
1368    /// `BuilderStep::Throttle` and returns the parent `RouteBuilder`.
1369    pub fn end_throttle(mut self) -> RouteBuilder {
1370        let step = BuilderStep::Throttle {
1371            config: self.config,
1372            steps: self.steps,
1373        };
1374        self.parent.steps.push(step);
1375        self.parent
1376    }
1377}
1378
1379impl StepAccumulator for ThrottleBuilder {
1380    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1381        &mut self.steps
1382    }
1383}
1384
1385/// Builder for the sub-pipeline within a `.loop_count()` / `.loop_while()` ... `.end_loop()` block.
1386pub struct LoopBuilder {
1387    parent: RouteBuilder,
1388    config: LoopConfig,
1389    steps: Vec<BuilderStep>,
1390}
1391
1392impl LoopBuilder {
1393    pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1394        LoopInLoopBuilder {
1395            parent: self,
1396            config: LoopConfig {
1397                mode: LoopMode::Count(count),
1398            },
1399            steps: vec![],
1400        }
1401    }
1402
1403    pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1404    where
1405        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1406    {
1407        LoopInLoopBuilder {
1408            parent: self,
1409            config: LoopConfig {
1410                mode: LoopMode::While(std::sync::Arc::new(predicate)),
1411            },
1412            steps: vec![],
1413        }
1414    }
1415
1416    pub fn end_loop(mut self) -> RouteBuilder {
1417        let step = BuilderStep::Loop {
1418            config: self.config,
1419            steps: self.steps,
1420        };
1421        self.parent.steps.push(step);
1422        self.parent
1423    }
1424}
1425
1426impl StepAccumulator for LoopBuilder {
1427    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1428        &mut self.steps
1429    }
1430}
1431
1432pub struct LoopInLoopBuilder {
1433    parent: LoopBuilder,
1434    config: LoopConfig,
1435    steps: Vec<BuilderStep>,
1436}
1437
1438impl LoopInLoopBuilder {
1439    pub fn end_loop(mut self) -> LoopBuilder {
1440        let step = BuilderStep::Loop {
1441            config: self.config,
1442            steps: self.steps,
1443        };
1444        self.parent.steps.push(step);
1445        self.parent
1446    }
1447}
1448
1449impl StepAccumulator for LoopInLoopBuilder {
1450    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1451        &mut self.steps
1452    }
1453}
1454
1455/// Builder for the sub-pipeline within a `.load_balance()` ... `.end_load_balance()` block.
1456///
1457/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1458/// but NOT `.build()` and NOT `.load_balance()` (no nested load balancers).
1459///
1460/// Calling `.end_load_balance()` packages the sub-steps into a `BuilderStep::LoadBalance`
1461/// and returns the parent `RouteBuilder`.
1462pub struct LoadBalancerBuilder {
1463    parent: RouteBuilder,
1464    config: LoadBalancerConfig,
1465    steps: Vec<BuilderStep>,
1466}
1467
1468impl LoadBalancerBuilder {
1469    /// Set the load balance strategy to round-robin (default).
1470    pub fn round_robin(mut self) -> Self {
1471        self.config = LoadBalancerConfig::round_robin();
1472        self
1473    }
1474
1475    /// Set the load balance strategy to random selection.
1476    pub fn random(mut self) -> Self {
1477        self.config = LoadBalancerConfig::random();
1478        self
1479    }
1480
1481    /// Set the load balance strategy to weighted selection.
1482    ///
1483    /// Each endpoint is assigned a weight that determines its probability
1484    /// of being selected.
1485    pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1486        self.config = LoadBalancerConfig::weighted(weights);
1487        self
1488    }
1489
1490    /// Set the load balance strategy to failover.
1491    ///
1492    /// Exchanges are sent to the first endpoint; on failure, the next endpoint
1493    /// is tried.
1494    pub fn failover(mut self) -> Self {
1495        self.config = LoadBalancerConfig::failover();
1496        self
1497    }
1498
1499    /// Enable or disable parallel execution of endpoints.
1500    ///
1501    /// When enabled, all endpoints receive the exchange simultaneously.
1502    /// When disabled (default), only one endpoint is selected per exchange.
1503    pub fn parallel(mut self, parallel: bool) -> Self {
1504        self.config = self.config.parallel(parallel);
1505        self
1506    }
1507
1508    /// Close the load balance scope. Packages the accumulated sub-steps into a
1509    /// `BuilderStep::LoadBalance` and returns the parent `RouteBuilder`.
1510    pub fn end_load_balance(mut self) -> RouteBuilder {
1511        let step = BuilderStep::LoadBalance {
1512            config: self.config,
1513            steps: self.steps,
1514        };
1515        self.parent.steps.push(step);
1516        self.parent
1517    }
1518}
1519
1520impl StepAccumulator for LoadBalancerBuilder {
1521    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1522        &mut self.steps
1523    }
1524}
1525
1526// ---------------------------------------------------------------------------
1527// Tests
1528// ---------------------------------------------------------------------------
1529
1530#[cfg(test)]
1531mod tests {
1532    use super::*;
1533    use camel_api::error_handler::ErrorHandlerConfig;
1534    use camel_api::load_balancer::LoadBalanceStrategy;
1535    use camel_api::{Exchange, Message};
1536    use camel_core::route::BuilderStep;
1537    use std::sync::Arc;
1538    use std::time::Duration;
1539    use tower::{Service, ServiceExt};
1540
1541    #[test]
1542    fn test_builder_from_creates_definition() {
1543        let definition = RouteBuilder::from("timer:tick")
1544            .route_id("test-route")
1545            .build()
1546            .unwrap();
1547        assert_eq!(definition.from_uri(), "timer:tick");
1548    }
1549
1550    #[test]
1551    fn test_builder_empty_from_uri_errors() {
1552        let result = RouteBuilder::from("").route_id("test-route").build();
1553        assert!(result.is_err());
1554    }
1555
1556    #[test]
1557    fn test_build_rejects_schemeless_uri() {
1558        let result = RouteBuilder::from("no-scheme-here")
1559            .route_id("test-route")
1560            .build();
1561        match result {
1562            Err(err) => {
1563                let err_msg = format!("{err}");
1564                assert!(
1565                    err_msg.contains("scheme"),
1566                    "expected scheme-related error, got: {err_msg}"
1567                );
1568            }
1569            Ok(_) => panic!("schemeless URI should fail"),
1570        }
1571    }
1572
1573    #[test]
1574    fn test_build_rejects_empty_scheme_uri() {
1575        let result = RouteBuilder::from(":missing-scheme")
1576            .route_id("test-route")
1577            .build();
1578        match result {
1579            Err(err) => {
1580                let err_msg = format!("{err}");
1581                assert!(
1582                    err_msg.contains("scheme"),
1583                    "expected scheme-related error, got: {err_msg}"
1584                );
1585            }
1586            Ok(_) => panic!("empty-scheme URI should fail"),
1587        }
1588    }
1589
1590    #[test]
1591    fn test_build_accepts_valid_uri() {
1592        let result = RouteBuilder::from("timer:tick")
1593            .route_id("test-route")
1594            .build();
1595        assert!(result.is_ok());
1596    }
1597
1598    #[test]
1599    fn test_build_canonical_rejects_schemeless_uri() {
1600        let result = RouteBuilder::from("no-scheme-here")
1601            .route_id("test-route")
1602            .build_canonical();
1603        assert!(result.is_err());
1604    }
1605
1606    #[test]
1607    fn test_builder_to_adds_step() {
1608        let definition = RouteBuilder::from("timer:tick")
1609            .route_id("test-route")
1610            .to("log:info")
1611            .build()
1612            .unwrap();
1613
1614        assert_eq!(definition.from_uri(), "timer:tick");
1615        // We can verify steps were added by checking the structure
1616        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1617    }
1618
1619    #[test]
1620    fn test_builder_filter_adds_filter_step() {
1621        let definition = RouteBuilder::from("timer:tick")
1622            .route_id("test-route")
1623            .filter(|_ex| true)
1624            .to("mock:result")
1625            .end_filter()
1626            .build()
1627            .unwrap();
1628
1629        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1630    }
1631
1632    #[test]
1633    fn test_builder_set_header_adds_processor_step() {
1634        let definition = RouteBuilder::from("timer:tick")
1635            .route_id("test-route")
1636            .set_header("key", Value::String("value".into()))
1637            .build()
1638            .unwrap();
1639
1640        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1641    }
1642
1643    #[test]
1644    fn test_builder_map_body_adds_processor_step() {
1645        let definition = RouteBuilder::from("timer:tick")
1646            .route_id("test-route")
1647            .map_body(|body| body)
1648            .build()
1649            .unwrap();
1650
1651        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1652    }
1653
1654    #[test]
1655    fn test_builder_process_adds_processor_step() {
1656        let definition = RouteBuilder::from("timer:tick")
1657            .route_id("test-route")
1658            .process(|ex| async move { Ok(ex) })
1659            .build()
1660            .unwrap();
1661
1662        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1663    }
1664
1665    #[test]
1666    fn test_builder_chain_multiple_steps() {
1667        let definition = RouteBuilder::from("timer:tick")
1668            .route_id("test-route")
1669            .set_header("source", Value::String("timer".into()))
1670            .filter(|ex| ex.input.header("source").is_some())
1671            .to("log:info")
1672            .end_filter()
1673            .to("mock:result")
1674            .build()
1675            .unwrap();
1676
1677        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
1678        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
1679        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
1680        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1681    }
1682
1683    #[test]
1684    fn test_loop_count_builder() {
1685        use camel_api::loop_eip::LoopMode;
1686
1687        let def = RouteBuilder::from("direct:start")
1688            .route_id("loop-test")
1689            .loop_count(3)
1690            .to("mock:inside")
1691            .end_loop()
1692            .to("mock:after")
1693            .build()
1694            .unwrap();
1695
1696        assert_eq!(def.steps().len(), 2);
1697        match &def.steps()[0] {
1698            BuilderStep::Loop { config, steps } => {
1699                assert!(matches!(config.mode, LoopMode::Count(3)));
1700                assert_eq!(steps.len(), 1);
1701            }
1702            other => panic!("Expected Loop, got {:?}", other),
1703        }
1704        assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1705    }
1706
1707    #[test]
1708    fn test_loop_while_builder() {
1709        use camel_api::loop_eip::LoopMode;
1710
1711        let def = RouteBuilder::from("direct:start")
1712            .route_id("loop-while-test")
1713            .loop_while(|_ex| true)
1714            .to("mock:retry")
1715            .end_loop()
1716            .build()
1717            .unwrap();
1718
1719        assert_eq!(def.steps().len(), 1);
1720        match &def.steps()[0] {
1721            BuilderStep::Loop { config, steps } => {
1722                assert!(matches!(config.mode, LoopMode::While(_)));
1723                assert_eq!(steps.len(), 1);
1724            }
1725            other => panic!("Expected Loop, got {:?}", other),
1726        }
1727    }
1728
1729    #[test]
1730    fn test_nested_loop_builder() {
1731        use camel_api::loop_eip::LoopMode;
1732
1733        let def = RouteBuilder::from("direct:start")
1734            .route_id("nested-loop-test")
1735            .loop_count(2)
1736            .to("mock:outer")
1737            .loop_count(3)
1738            .to("mock:inner")
1739            .end_loop()
1740            .end_loop()
1741            .to("mock:after")
1742            .build()
1743            .unwrap();
1744
1745        assert_eq!(def.steps().len(), 2);
1746        match &def.steps()[0] {
1747            BuilderStep::Loop { steps, .. } => {
1748                assert_eq!(steps.len(), 2);
1749                match &steps[1] {
1750                    BuilderStep::Loop {
1751                        config,
1752                        steps: inner_steps,
1753                    } => {
1754                        assert!(matches!(config.mode, LoopMode::Count(3)));
1755                        assert_eq!(inner_steps.len(), 1);
1756                    }
1757                    other => panic!("Expected nested Loop, got {:?}", other),
1758                }
1759            }
1760            other => panic!("Expected outer Loop, got {:?}", other),
1761        }
1762    }
1763
1764    // -----------------------------------------------------------------------
1765    // Processor behavior tests — exercise the real Tower services directly
1766    // -----------------------------------------------------------------------
1767
1768    #[tokio::test]
1769    async fn test_set_header_processor_works() {
1770        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1771        let exchange = Exchange::new(Message::new("test"));
1772        let result = svc.call(exchange).await.unwrap();
1773        assert_eq!(
1774            result.input.header("greeting"),
1775            Some(&Value::String("hello".into()))
1776        );
1777    }
1778
1779    #[tokio::test]
1780    async fn test_filter_processor_passes() {
1781        use camel_api::BoxProcessorExt;
1782        use camel_processor::FilterService;
1783
1784        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1785        let mut svc =
1786            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1787        let exchange = Exchange::new(Message::new("pass"));
1788        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1789        assert_eq!(result.input.body.as_text(), Some("pass"));
1790    }
1791
1792    #[tokio::test]
1793    async fn test_filter_processor_blocks() {
1794        use camel_api::BoxProcessorExt;
1795        use camel_processor::FilterService;
1796
1797        let sub = BoxProcessor::from_fn(|_ex| {
1798            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1799        });
1800        let mut svc =
1801            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1802        let exchange = Exchange::new(Message::new("reject"));
1803        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1804        assert_eq!(result.input.body.as_text(), Some("reject"));
1805    }
1806
1807    #[tokio::test]
1808    async fn test_map_body_processor_works() {
1809        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1810            if let Some(text) = body.as_text() {
1811                Body::Text(text.to_uppercase())
1812            } else {
1813                body
1814            }
1815        });
1816        let exchange = Exchange::new(Message::new("hello"));
1817        let result = mapper.oneshot(exchange).await.unwrap();
1818        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1819    }
1820
1821    #[tokio::test]
1822    async fn test_process_custom_processor_works() {
1823        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1824            ex.set_property("custom", Value::Bool(true));
1825            Ok(ex)
1826        });
1827        let exchange = Exchange::new(Message::default());
1828        let result = processor.oneshot(exchange).await.unwrap();
1829        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1830    }
1831
1832    // -----------------------------------------------------------------------
1833    // Sequential pipeline test
1834    // -----------------------------------------------------------------------
1835
1836    #[tokio::test]
1837    async fn test_compose_pipeline_runs_steps_in_order() {
1838        use camel_core::route::compose_pipeline;
1839
1840        let processors = vec![
1841            BoxProcessor::new(SetHeader::new(
1842                IdentityProcessor,
1843                "step",
1844                Value::String("one".into()),
1845            )),
1846            BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1847                if let Some(text) = body.as_text() {
1848                    Body::Text(format!("{}-processed", text))
1849                } else {
1850                    body
1851                }
1852            })),
1853        ];
1854
1855        let pipeline = compose_pipeline(processors);
1856        let exchange = Exchange::new(Message::new("hello"));
1857        let result = pipeline.oneshot(exchange).await.unwrap();
1858
1859        assert_eq!(
1860            result.input.header("step"),
1861            Some(&Value::String("one".into()))
1862        );
1863        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1864    }
1865
1866    #[tokio::test]
1867    async fn test_compose_pipeline_empty_is_identity() {
1868        use camel_core::route::compose_pipeline;
1869
1870        let pipeline = compose_pipeline(vec![]);
1871        let exchange = Exchange::new(Message::new("unchanged"));
1872        let result = pipeline.oneshot(exchange).await.unwrap();
1873        assert_eq!(result.input.body.as_text(), Some("unchanged"));
1874    }
1875
1876    // -----------------------------------------------------------------------
1877    // Circuit breaker builder tests
1878    // -----------------------------------------------------------------------
1879
1880    #[test]
1881    fn test_builder_circuit_breaker_sets_config() {
1882        use camel_api::circuit_breaker::CircuitBreakerConfig;
1883
1884        let config = CircuitBreakerConfig::new().failure_threshold(5);
1885        let definition = RouteBuilder::from("timer:tick")
1886            .route_id("test-route")
1887            .circuit_breaker(config)
1888            .build()
1889            .unwrap();
1890
1891        let cb = definition
1892            .circuit_breaker_config()
1893            .expect("circuit breaker should be set");
1894        assert_eq!(cb.failure_threshold, 5);
1895    }
1896
1897    #[test]
1898    fn test_builder_circuit_breaker_with_error_handler() {
1899        use camel_api::circuit_breaker::CircuitBreakerConfig;
1900        use camel_api::error_handler::ErrorHandlerConfig;
1901
1902        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1903        let eh_config = ErrorHandlerConfig::log_only();
1904
1905        let definition = RouteBuilder::from("timer:tick")
1906            .route_id("test-route")
1907            .to("log:info")
1908            .circuit_breaker(cb_config)
1909            .error_handler(eh_config)
1910            .build()
1911            .unwrap();
1912
1913        assert!(
1914            definition.circuit_breaker_config().is_some(),
1915            "circuit breaker config should be set"
1916        );
1917        // Route definition was built successfully with both configs.
1918    }
1919
1920    #[test]
1921    fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1922        let definition = RouteBuilder::from("direct:start")
1923            .route_id("test-route")
1924            .dead_letter_channel("log:dlc")
1925            .on_exception(|e| matches!(e, CamelError::Io(_)))
1926            .retry(3)
1927            .handled_by("log:io")
1928            .end_on_exception()
1929            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1930            .retry(1)
1931            .end_on_exception()
1932            .to("mock:out")
1933            .build()
1934            .expect("route should build");
1935
1936        let cfg = definition
1937            .error_handler_config()
1938            .expect("error handler should be set");
1939        assert_eq!(cfg.policies.len(), 2);
1940        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1941        assert_eq!(
1942            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1943            Some(3)
1944        );
1945        assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1946        assert_eq!(
1947            cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1948            Some(1)
1949        );
1950    }
1951
1952    #[test]
1953    fn test_builder_on_exception_mixed_mode_rejected() {
1954        let result = RouteBuilder::from("direct:start")
1955            .route_id("test-route")
1956            .error_handler(ErrorHandlerConfig::log_only())
1957            .on_exception(|_e| true)
1958            .end_on_exception()
1959            .to("mock:out")
1960            .build();
1961
1962        let err = result.err().expect("mixed mode should fail with an error");
1963
1964        assert!(
1965            format!("{err}").contains("mixed error handler modes"),
1966            "unexpected error: {err}"
1967        );
1968    }
1969
1970    #[test]
1971    fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1972        let definition = RouteBuilder::from("direct:start")
1973            .route_id("test-route")
1974            .on_exception(|_e| true)
1975            .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1976            .with_jitter(0.5)
1977            .end_on_exception()
1978            .to("mock:out")
1979            .build()
1980            .expect("route should build");
1981
1982        let cfg = definition
1983            .error_handler_config()
1984            .expect("error handler should be set");
1985        assert_eq!(cfg.policies.len(), 1);
1986        assert!(cfg.policies[0].retry.is_none());
1987    }
1988
1989    #[test]
1990    fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1991        let definition = RouteBuilder::from("direct:start")
1992            .route_id("test-route")
1993            .dead_letter_channel("log:dlc")
1994            .to("mock:out")
1995            .build()
1996            .expect("route should build");
1997
1998        let cfg = definition
1999            .error_handler_config()
2000            .expect("error handler should be set");
2001        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
2002        assert!(cfg.policies.is_empty());
2003    }
2004
2005    #[test]
2006    fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
2007        let definition = RouteBuilder::from("direct:start")
2008            .route_id("test-route")
2009            .dead_letter_channel("log:first")
2010            .on_exception(|e| matches!(e, CamelError::Io(_)))
2011            .retry(2)
2012            .end_on_exception()
2013            .dead_letter_channel("log:second")
2014            .to("mock:out")
2015            .build()
2016            .expect("route should build");
2017
2018        let cfg = definition
2019            .error_handler_config()
2020            .expect("error handler should be set");
2021        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2022        assert_eq!(cfg.policies.len(), 1);
2023        assert_eq!(
2024            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
2025            Some(2)
2026        );
2027    }
2028
2029    #[test]
2030    fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
2031        let definition = RouteBuilder::from("direct:start")
2032            .route_id("test-route")
2033            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
2034            .retry(1)
2035            .end_on_exception()
2036            .to("mock:out")
2037            .build()
2038            .expect("route should build");
2039
2040        let cfg = definition
2041            .error_handler_config()
2042            .expect("error handler should be set");
2043        assert!(cfg.dlc_uri.is_none());
2044        assert_eq!(cfg.policies.len(), 1);
2045    }
2046
2047    #[test]
2048    fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
2049        let first = ErrorHandlerConfig::dead_letter_channel("log:first");
2050        let second = ErrorHandlerConfig::dead_letter_channel("log:second");
2051
2052        let definition = RouteBuilder::from("direct:start")
2053            .route_id("test-route")
2054            .error_handler(first)
2055            .error_handler(second)
2056            .to("mock:out")
2057            .build()
2058            .expect("route should build");
2059
2060        let cfg = definition
2061            .error_handler_config()
2062            .expect("error handler should be set");
2063        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2064    }
2065
2066    // --- Splitter builder tests ---
2067
2068    #[test]
2069    fn test_split_builder_typestate() {
2070        use camel_api::splitter::{SplitterConfig, split_body_lines};
2071
2072        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
2073        let definition = RouteBuilder::from("timer:test?period=1000")
2074            .route_id("test-route")
2075            .split(SplitterConfig::new(split_body_lines()))
2076            .to("mock:per-fragment")
2077            .end_split()
2078            .to("mock:final")
2079            .build()
2080            .unwrap();
2081
2082        // Should have 2 top-level steps: Split + To("mock:final")
2083        assert_eq!(definition.steps().len(), 2);
2084    }
2085
2086    #[test]
2087    fn test_split_builder_steps_collected() {
2088        use camel_api::splitter::{SplitterConfig, split_body_lines};
2089
2090        let definition = RouteBuilder::from("timer:test?period=1000")
2091            .route_id("test-route")
2092            .split(SplitterConfig::new(split_body_lines()))
2093            .set_header("fragment", Value::String("yes".into()))
2094            .to("mock:per-fragment")
2095            .end_split()
2096            .build()
2097            .unwrap();
2098
2099        // Should have 1 top-level step: Split (containing 2 sub-steps)
2100        assert_eq!(definition.steps().len(), 1);
2101        match &definition.steps()[0] {
2102            BuilderStep::Split { steps, .. } => {
2103                assert_eq!(steps.len(), 2); // SetHeader + To
2104            }
2105            other => panic!("Expected Split, got {:?}", other),
2106        }
2107    }
2108
2109    #[test]
2110    fn test_split_builder_config_propagated() {
2111        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
2112
2113        let definition = RouteBuilder::from("timer:test?period=1000")
2114            .route_id("test-route")
2115            .split(
2116                SplitterConfig::new(split_body_lines())
2117                    .parallel(true)
2118                    .parallel_limit(4)
2119                    .aggregation(AggregationStrategy::CollectAll),
2120            )
2121            .to("mock:per-fragment")
2122            .end_split()
2123            .build()
2124            .unwrap();
2125
2126        match &definition.steps()[0] {
2127            BuilderStep::Split { config, .. } => {
2128                assert!(config.parallel);
2129                assert_eq!(config.parallel_limit, Some(4));
2130                assert!(matches!(
2131                    config.aggregation,
2132                    AggregationStrategy::CollectAll
2133                ));
2134            }
2135            other => panic!("Expected Split, got {:?}", other),
2136        }
2137    }
2138
2139    #[test]
2140    fn test_aggregate_builder_adds_step() {
2141        use camel_api::aggregator::AggregatorConfig;
2142        use camel_core::route::BuilderStep;
2143
2144        let definition = RouteBuilder::from("timer:tick")
2145            .route_id("test-route")
2146            .aggregate(
2147                AggregatorConfig::correlate_by("key")
2148                    .complete_when_size(2)
2149                    .build()
2150                    .unwrap(),
2151            )
2152            .build()
2153            .unwrap();
2154
2155        assert_eq!(definition.steps().len(), 1);
2156        assert!(matches!(
2157            definition.steps()[0],
2158            BuilderStep::Aggregate { .. }
2159        ));
2160    }
2161
2162    #[test]
2163    fn test_aggregate_in_split_builder() {
2164        use camel_api::aggregator::AggregatorConfig;
2165        use camel_api::splitter::{SplitterConfig, split_body_lines};
2166        use camel_core::route::BuilderStep;
2167
2168        let definition = RouteBuilder::from("timer:tick")
2169            .route_id("test-route")
2170            .split(SplitterConfig::new(split_body_lines()))
2171            .aggregate(
2172                AggregatorConfig::correlate_by("key")
2173                    .complete_when_size(1)
2174                    .build()
2175                    .unwrap(),
2176            )
2177            .end_split()
2178            .build()
2179            .unwrap();
2180
2181        assert_eq!(definition.steps().len(), 1);
2182        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2183            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2184        } else {
2185            panic!("expected Split step");
2186        }
2187    }
2188
2189    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
2190
2191    #[test]
2192    fn test_builder_set_body_static_adds_processor() {
2193        let definition = RouteBuilder::from("timer:tick")
2194            .route_id("test-route")
2195            .set_body("fixed")
2196            .build()
2197            .unwrap();
2198        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2199    }
2200
2201    #[test]
2202    fn test_builder_set_body_fn_adds_processor() {
2203        let definition = RouteBuilder::from("timer:tick")
2204            .route_id("test-route")
2205            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2206            .build()
2207            .unwrap();
2208        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2209    }
2210
2211    #[test]
2212    fn transform_alias_produces_same_as_set_body() {
2213        let route_transform = RouteBuilder::from("timer:tick")
2214            .route_id("test-route")
2215            .transform("hello")
2216            .build()
2217            .unwrap();
2218
2219        let route_set_body = RouteBuilder::from("timer:tick")
2220            .route_id("test-route")
2221            .set_body("hello")
2222            .build()
2223            .unwrap();
2224
2225        assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2226    }
2227
2228    #[test]
2229    fn test_builder_set_header_fn_adds_processor() {
2230        let definition = RouteBuilder::from("timer:tick")
2231            .route_id("test-route")
2232            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2233            .build()
2234            .unwrap();
2235        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2236    }
2237
2238    #[tokio::test]
2239    async fn test_set_body_static_processor_works() {
2240        use camel_core::route::compose_pipeline;
2241        let def = RouteBuilder::from("t:t")
2242            .route_id("test-route")
2243            .set_body("replaced")
2244            .build()
2245            .unwrap();
2246        let pipeline = compose_pipeline(
2247            def.steps()
2248                .iter()
2249                .filter_map(|s| {
2250                    if let BuilderStep::Processor(p) = s {
2251                        Some(p.clone())
2252                    } else {
2253                        None
2254                    }
2255                })
2256                .collect(),
2257        );
2258        let exchange = Exchange::new(Message::new("original"));
2259        let result = pipeline.oneshot(exchange).await.unwrap();
2260        assert_eq!(result.input.body.as_text(), Some("replaced"));
2261    }
2262
2263    #[tokio::test]
2264    async fn test_set_body_fn_processor_works() {
2265        use camel_core::route::compose_pipeline;
2266        let def = RouteBuilder::from("t:t")
2267            .route_id("test-route")
2268            .set_body_fn(|ex: &Exchange| {
2269                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2270            })
2271            .build()
2272            .unwrap();
2273        let pipeline = compose_pipeline(
2274            def.steps()
2275                .iter()
2276                .filter_map(|s| {
2277                    if let BuilderStep::Processor(p) = s {
2278                        Some(p.clone())
2279                    } else {
2280                        None
2281                    }
2282                })
2283                .collect(),
2284        );
2285        let exchange = Exchange::new(Message::new("hello"));
2286        let result = pipeline.oneshot(exchange).await.unwrap();
2287        assert_eq!(result.input.body.as_text(), Some("HELLO"));
2288    }
2289
2290    #[tokio::test]
2291    async fn test_set_header_fn_processor_works() {
2292        use camel_core::route::compose_pipeline;
2293        let def = RouteBuilder::from("t:t")
2294            .route_id("test-route")
2295            .set_header_fn("echo", |ex: &Exchange| {
2296                ex.input
2297                    .body
2298                    .as_text()
2299                    .map(|t| Value::String(t.into()))
2300                    .unwrap_or(Value::Null)
2301            })
2302            .build()
2303            .unwrap();
2304        let pipeline = compose_pipeline(
2305            def.steps()
2306                .iter()
2307                .filter_map(|s| {
2308                    if let BuilderStep::Processor(p) = s {
2309                        Some(p.clone())
2310                    } else {
2311                        None
2312                    }
2313                })
2314                .collect(),
2315        );
2316        let exchange = Exchange::new(Message::new("ping"));
2317        let result = pipeline.oneshot(exchange).await.unwrap();
2318        assert_eq!(
2319            result.input.header("echo"),
2320            Some(&Value::String("ping".into()))
2321        );
2322    }
2323
2324    // ── FilterBuilder typestate tests ─────────────────────────────────────
2325
2326    #[test]
2327    fn test_filter_builder_typestate() {
2328        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2329            .route_id("test-route")
2330            .filter(|_ex| true)
2331            .to("mock:inner")
2332            .end_filter()
2333            .to("mock:outer")
2334            .build();
2335        assert!(result.is_ok());
2336    }
2337
2338    #[test]
2339    fn test_filter_builder_steps_collected() {
2340        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2341            .route_id("test-route")
2342            .filter(|_ex| true)
2343            .to("mock:inner")
2344            .end_filter()
2345            .build()
2346            .unwrap();
2347
2348        assert_eq!(definition.steps().len(), 1);
2349        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2350    }
2351
2352    #[test]
2353    fn test_wire_tap_builder_adds_step() {
2354        let definition = RouteBuilder::from("timer:tick")
2355            .route_id("test-route")
2356            .wire_tap("mock:tap")
2357            .to("mock:result")
2358            .build()
2359            .unwrap();
2360
2361        assert_eq!(definition.steps().len(), 2);
2362        assert!(
2363            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2364        );
2365        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2366    }
2367
2368    // ── MulticastBuilder typestate tests ─────────────────────────────────────
2369
2370    #[test]
2371    fn test_multicast_builder_typestate() {
2372        let definition = RouteBuilder::from("timer:tick")
2373            .route_id("test-route")
2374            .multicast()
2375            .to("direct:a")
2376            .to("direct:b")
2377            .end_multicast()
2378            .to("mock:result")
2379            .build()
2380            .unwrap();
2381
2382        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
2383    }
2384
2385    #[test]
2386    fn test_multicast_builder_steps_collected() {
2387        let definition = RouteBuilder::from("timer:tick")
2388            .route_id("test-route")
2389            .multicast()
2390            .to("direct:a")
2391            .to("direct:b")
2392            .end_multicast()
2393            .build()
2394            .unwrap();
2395
2396        match &definition.steps()[0] {
2397            BuilderStep::Multicast { steps, .. } => {
2398                assert_eq!(steps.len(), 2);
2399            }
2400            other => panic!("Expected Multicast, got {:?}", other),
2401        }
2402    }
2403
2404    // ── Concurrency builder tests ─────────────────────────────────────
2405
2406    #[test]
2407    fn test_builder_concurrent_sets_concurrency() {
2408        use camel_component_api::ConcurrencyModel;
2409
2410        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2411            .route_id("test-route")
2412            .concurrent(16)
2413            .to("log:info")
2414            .build()
2415            .unwrap();
2416
2417        assert_eq!(
2418            definition.concurrency_override(),
2419            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2420        );
2421    }
2422
2423    #[test]
2424    fn test_builder_concurrent_zero_means_unbounded() {
2425        use camel_component_api::ConcurrencyModel;
2426
2427        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2428            .route_id("test-route")
2429            .concurrent(0)
2430            .to("log:info")
2431            .build()
2432            .unwrap();
2433
2434        assert_eq!(
2435            definition.concurrency_override(),
2436            Some(&ConcurrencyModel::Concurrent { max: None })
2437        );
2438    }
2439
2440    #[test]
2441    fn test_builder_sequential_sets_concurrency() {
2442        use camel_component_api::ConcurrencyModel;
2443
2444        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2445            .route_id("test-route")
2446            .sequential()
2447            .to("log:info")
2448            .build()
2449            .unwrap();
2450
2451        assert_eq!(
2452            definition.concurrency_override(),
2453            Some(&ConcurrencyModel::Sequential)
2454        );
2455    }
2456
2457    #[test]
2458    fn test_builder_default_concurrency_is_none() {
2459        let definition = RouteBuilder::from("timer:tick")
2460            .route_id("test-route")
2461            .to("log:info")
2462            .build()
2463            .unwrap();
2464
2465        assert_eq!(definition.concurrency_override(), None);
2466    }
2467
2468    // ── Route lifecycle builder tests ─────────────────────────────────────
2469
2470    #[test]
2471    fn test_builder_route_id_sets_id() {
2472        let definition = RouteBuilder::from("timer:tick")
2473            .route_id("my-route")
2474            .build()
2475            .unwrap();
2476
2477        assert_eq!(definition.route_id(), "my-route");
2478    }
2479
2480    #[test]
2481    fn test_build_without_route_id_fails() {
2482        let result = RouteBuilder::from("timer:tick?period=1000")
2483            .to("log:info")
2484            .build();
2485        let err = match result {
2486            Err(e) => e.to_string(),
2487            Ok(_) => panic!("build() should fail without route_id"),
2488        };
2489        assert!(
2490            err.contains("route_id"),
2491            "error should mention route_id, got: {}",
2492            err
2493        );
2494    }
2495
2496    #[test]
2497    fn test_builder_empty_route_id_rejected() {
2498        let result = RouteBuilder::from("timer:tick").route_id("").build();
2499        let err = result.err().expect("empty route_id should be rejected");
2500        assert!(matches!(err, CamelError::RouteError(_)));
2501    }
2502
2503    #[test]
2504    fn test_builder_whitespace_route_id_rejected() {
2505        let result = RouteBuilder::from("timer:tick").route_id("   ").build();
2506        assert!(result.is_err());
2507    }
2508
2509    #[test]
2510    fn test_builder_auto_startup_false() {
2511        let definition = RouteBuilder::from("timer:tick")
2512            .route_id("test-route")
2513            .auto_startup(false)
2514            .build()
2515            .unwrap();
2516
2517        assert!(!definition.auto_startup());
2518    }
2519
2520    #[test]
2521    fn test_builder_startup_order_custom() {
2522        let definition = RouteBuilder::from("timer:tick")
2523            .route_id("test-route")
2524            .startup_order(50)
2525            .build()
2526            .unwrap();
2527
2528        assert_eq!(definition.startup_order(), 50);
2529    }
2530
2531    #[test]
2532    fn test_builder_defaults() {
2533        let definition = RouteBuilder::from("timer:tick")
2534            .route_id("test-route")
2535            .build()
2536            .unwrap();
2537
2538        assert_eq!(definition.route_id(), "test-route");
2539        assert!(definition.auto_startup());
2540        assert_eq!(definition.startup_order(), 1000);
2541    }
2542
2543    // ── Choice typestate tests ──────────────────────────────────────────────────
2544
2545    #[test]
2546    fn test_choice_builder_single_when() {
2547        let definition = RouteBuilder::from("timer:tick")
2548            .route_id("test-route")
2549            .choice()
2550            .when(|ex: &Exchange| ex.input.header("type").is_some())
2551            .to("mock:typed")
2552            .end_when()
2553            .end_choice()
2554            .build()
2555            .unwrap();
2556        assert_eq!(definition.steps().len(), 1);
2557        assert!(
2558            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2559            if whens.len() == 1 && otherwise.is_none())
2560        );
2561    }
2562
2563    #[test]
2564    fn test_choice_builder_when_otherwise() {
2565        let definition = RouteBuilder::from("timer:tick")
2566            .route_id("test-route")
2567            .choice()
2568            .when(|ex: &Exchange| ex.input.header("a").is_some())
2569            .to("mock:a")
2570            .end_when()
2571            .otherwise()
2572            .to("mock:fallback")
2573            .end_otherwise()
2574            .end_choice()
2575            .build()
2576            .unwrap();
2577        assert!(
2578            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2579            if whens.len() == 1 && otherwise.is_some())
2580        );
2581    }
2582
2583    #[test]
2584    fn test_choice_builder_multiple_whens() {
2585        let definition = RouteBuilder::from("timer:tick")
2586            .route_id("test-route")
2587            .choice()
2588            .when(|ex: &Exchange| ex.input.header("a").is_some())
2589            .to("mock:a")
2590            .end_when()
2591            .when(|ex: &Exchange| ex.input.header("b").is_some())
2592            .to("mock:b")
2593            .end_when()
2594            .end_choice()
2595            .build()
2596            .unwrap();
2597        assert!(
2598            matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2599            if whens.len() == 2)
2600        );
2601    }
2602
2603    #[test]
2604    fn test_choice_step_after_choice() {
2605        // Steps after end_choice() are added to the outer pipeline, not inside choice.
2606        let definition = RouteBuilder::from("timer:tick")
2607            .route_id("test-route")
2608            .choice()
2609            .when(|_ex: &Exchange| true)
2610            .to("mock:inner")
2611            .end_when()
2612            .end_choice()
2613            .to("mock:outer") // must be step[1], not inside choice
2614            .build()
2615            .unwrap();
2616        assert_eq!(definition.steps().len(), 2);
2617        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2618    }
2619
2620    // ── Throttle typestate tests ──────────────────────────────────────────────────
2621
2622    #[test]
2623    fn test_throttle_builder_typestate() {
2624        let definition = RouteBuilder::from("timer:tick")
2625            .route_id("test-route")
2626            .throttle(10, std::time::Duration::from_secs(1))
2627            .to("mock:result")
2628            .end_throttle()
2629            .build()
2630            .unwrap();
2631
2632        assert_eq!(definition.steps().len(), 1);
2633        assert!(matches!(
2634            &definition.steps()[0],
2635            BuilderStep::Throttle { .. }
2636        ));
2637    }
2638
2639    #[test]
2640    fn test_throttle_builder_with_strategy() {
2641        let definition = RouteBuilder::from("timer:tick")
2642            .route_id("test-route")
2643            .throttle(10, std::time::Duration::from_secs(1))
2644            .strategy(ThrottleStrategy::Reject)
2645            .to("mock:result")
2646            .end_throttle()
2647            .build()
2648            .unwrap();
2649
2650        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2651            assert_eq!(config.strategy, ThrottleStrategy::Reject);
2652        } else {
2653            panic!("Expected Throttle step");
2654        }
2655    }
2656
2657    #[test]
2658    fn test_throttle_builder_steps_collected() {
2659        let definition = RouteBuilder::from("timer:tick")
2660            .route_id("test-route")
2661            .throttle(5, std::time::Duration::from_secs(1))
2662            .set_header("throttled", Value::Bool(true))
2663            .to("mock:throttled")
2664            .end_throttle()
2665            .build()
2666            .unwrap();
2667
2668        match &definition.steps()[0] {
2669            BuilderStep::Throttle { steps, .. } => {
2670                assert_eq!(steps.len(), 2); // SetHeader + To
2671            }
2672            other => panic!("Expected Throttle, got {:?}", other),
2673        }
2674    }
2675
2676    #[test]
2677    fn test_throttle_step_after_throttle() {
2678        // Steps after end_throttle() are added to the outer pipeline, not inside throttle.
2679        let definition = RouteBuilder::from("timer:tick")
2680            .route_id("test-route")
2681            .throttle(10, std::time::Duration::from_secs(1))
2682            .to("mock:inner")
2683            .end_throttle()
2684            .to("mock:outer")
2685            .build()
2686            .unwrap();
2687
2688        assert_eq!(definition.steps().len(), 2);
2689        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2690    }
2691
2692    // ── LoadBalance typestate tests ──────────────────────────────────────────────────
2693
2694    #[test]
2695    fn test_load_balance_builder_typestate() {
2696        let definition = RouteBuilder::from("timer:tick")
2697            .route_id("test-route")
2698            .load_balance()
2699            .round_robin()
2700            .to("mock:a")
2701            .to("mock:b")
2702            .end_load_balance()
2703            .build()
2704            .unwrap();
2705
2706        assert_eq!(definition.steps().len(), 1);
2707        assert!(matches!(
2708            &definition.steps()[0],
2709            BuilderStep::LoadBalance { .. }
2710        ));
2711    }
2712
2713    #[test]
2714    fn test_load_balance_builder_with_strategy() {
2715        let definition = RouteBuilder::from("timer:tick")
2716            .route_id("test-route")
2717            .load_balance()
2718            .random()
2719            .to("mock:result")
2720            .end_load_balance()
2721            .build()
2722            .unwrap();
2723
2724        if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2725            assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2726        } else {
2727            panic!("Expected LoadBalance step");
2728        }
2729    }
2730
2731    #[test]
2732    fn test_load_balance_builder_steps_collected() {
2733        let definition = RouteBuilder::from("timer:tick")
2734            .route_id("test-route")
2735            .load_balance()
2736            .set_header("lb", Value::Bool(true))
2737            .to("mock:a")
2738            .end_load_balance()
2739            .build()
2740            .unwrap();
2741
2742        match &definition.steps()[0] {
2743            BuilderStep::LoadBalance { steps, .. } => {
2744                assert_eq!(steps.len(), 2); // SetHeader + To
2745            }
2746            other => panic!("Expected LoadBalance, got {:?}", other),
2747        }
2748    }
2749
2750    #[test]
2751    fn test_load_balance_step_after_load_balance() {
2752        // Steps after end_load_balance() are added to the outer pipeline, not inside load_balance.
2753        let definition = RouteBuilder::from("timer:tick")
2754            .route_id("test-route")
2755            .load_balance()
2756            .to("mock:inner")
2757            .end_load_balance()
2758            .to("mock:outer")
2759            .build()
2760            .unwrap();
2761
2762        assert_eq!(definition.steps().len(), 2);
2763        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2764    }
2765
2766    // ── DynamicRouter typestate tests ──────────────────────────────────────────────────
2767
2768    #[test]
2769    fn test_dynamic_router_builder() {
2770        let definition = RouteBuilder::from("timer:tick")
2771            .route_id("test-route")
2772            .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2773            .build()
2774            .unwrap();
2775
2776        assert_eq!(definition.steps().len(), 1);
2777        assert!(matches!(
2778            &definition.steps()[0],
2779            BuilderStep::DynamicRouter { .. }
2780        ));
2781    }
2782
2783    #[test]
2784    fn test_dynamic_router_builder_with_config() {
2785        let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2786            .max_iterations(100)
2787            .cache_size(500);
2788
2789        let definition = RouteBuilder::from("timer:tick")
2790            .route_id("test-route")
2791            .dynamic_router_with_config(config)
2792            .build()
2793            .unwrap();
2794
2795        assert_eq!(definition.steps().len(), 1);
2796        if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2797            assert_eq!(config.max_iterations, 100);
2798            assert_eq!(config.cache_size, 500);
2799        } else {
2800            panic!("Expected DynamicRouter step");
2801        }
2802    }
2803
2804    #[test]
2805    fn test_dynamic_router_step_after_router() {
2806        // Steps after dynamic_router() are added to the outer pipeline.
2807        let definition = RouteBuilder::from("timer:tick")
2808            .route_id("test-route")
2809            .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2810            .to("mock:outer")
2811            .build()
2812            .unwrap();
2813
2814        assert_eq!(definition.steps().len(), 2);
2815        assert!(matches!(
2816            &definition.steps()[0],
2817            BuilderStep::DynamicRouter { .. }
2818        ));
2819        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2820    }
2821
2822    #[test]
2823    fn routing_slip_builder_creates_step() {
2824        use camel_api::RoutingSlipExpression;
2825
2826        let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2827
2828        let route = RouteBuilder::from("direct:start")
2829            .route_id("routing-slip-test")
2830            .routing_slip(expression)
2831            .build()
2832            .unwrap();
2833
2834        assert!(
2835            matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2836            "Expected RoutingSlip step"
2837        );
2838    }
2839
2840    #[test]
2841    fn routing_slip_with_config_builder_creates_step() {
2842        use camel_api::RoutingSlipConfig;
2843
2844        let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2845            .uri_delimiter("|")
2846            .cache_size(50)
2847            .ignore_invalid_endpoints(true);
2848
2849        let route = RouteBuilder::from("direct:start")
2850            .route_id("routing-slip-config-test")
2851            .routing_slip_with_config(config)
2852            .build()
2853            .unwrap();
2854
2855        if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2856            assert_eq!(config.uri_delimiter, "|");
2857            assert_eq!(config.cache_size, 50);
2858            assert!(config.ignore_invalid_endpoints);
2859        } else {
2860            panic!("Expected RoutingSlip step");
2861        }
2862    }
2863
2864    #[test]
2865    fn test_builder_marshal_adds_processor_step() {
2866        let definition = RouteBuilder::from("timer:tick")
2867            .route_id("test-route")
2868            .marshal("json")
2869            .unwrap()
2870            .build()
2871            .unwrap();
2872        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2873    }
2874
2875    #[test]
2876    fn test_builder_unmarshal_adds_processor_step() {
2877        let definition = RouteBuilder::from("timer:tick")
2878            .route_id("test-route")
2879            .unmarshal("json")
2880            .unwrap()
2881            .build()
2882            .unwrap();
2883        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2884    }
2885
2886    #[test]
2887    fn test_builder_stream_cache_adds_processor_step() {
2888        let definition = RouteBuilder::from("timer:tick")
2889            .route_id("test-route")
2890            .stream_cache(1024)
2891            .build()
2892            .unwrap();
2893        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2894    }
2895
2896    #[test]
2897    fn validate_adds_to_step_with_validator_uri() {
2898        let def = RouteBuilder::from("direct:in")
2899            .route_id("test")
2900            .validate("schemas/order.xsd")
2901            .build()
2902            .unwrap();
2903        let steps = def.steps();
2904        assert_eq!(steps.len(), 1);
2905        assert!(
2906            matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2907            "got: {:?}",
2908            steps[0]
2909        );
2910    }
2911
2912    #[test]
2913    fn test_builder_marshal_returns_err_for_unknown_format() {
2914        let result = RouteBuilder::from("timer:tick")
2915            .route_id("test-route")
2916            .marshal("csv");
2917        let err = match result {
2918            Err(e) => e,
2919            Ok(_) => panic!("marshal with unknown format should return Err"),
2920        };
2921        let msg = err.to_string();
2922        assert!(
2923            msg.contains("unknown data format"),
2924            "error should mention unknown format, got: {msg}"
2925        );
2926        assert!(
2927            msg.contains("csv"),
2928            "error should mention format name, got: {msg}"
2929        );
2930    }
2931
2932    #[test]
2933    fn test_builder_unmarshal_returns_err_for_unknown_format() {
2934        let result = RouteBuilder::from("timer:tick")
2935            .route_id("test-route")
2936            .unmarshal("csv");
2937        let err = match result {
2938            Err(e) => e,
2939            Ok(_) => panic!("unmarshal with unknown format should return Err"),
2940        };
2941        let msg = err.to_string();
2942        assert!(
2943            msg.contains("unknown data format"),
2944            "error should mention unknown format, got: {msg}"
2945        );
2946        assert!(
2947            msg.contains("csv"),
2948            "error should mention format name, got: {msg}"
2949        );
2950    }
2951
2952    #[test]
2953    fn test_builder_recipient_list_creates_step() {
2954        let route = RouteBuilder::from("direct:start")
2955            .route_id("recipient-list-test")
2956            .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2957            .build()
2958            .unwrap();
2959
2960        assert!(matches!(
2961            &route.steps()[0],
2962            BuilderStep::RecipientList { .. }
2963        ));
2964    }
2965
2966    #[test]
2967    fn test_builder_recipient_list_with_config_creates_step() {
2968        let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2969
2970        let route = RouteBuilder::from("direct:start")
2971            .route_id("recipient-list-config-test")
2972            .recipient_list_with_config(config)
2973            .build()
2974            .unwrap();
2975
2976        assert!(matches!(
2977            &route.steps()[0],
2978            BuilderStep::RecipientList { .. }
2979        ));
2980    }
2981
2982    #[test]
2983    fn test_builder_script_adds_script_step() {
2984        let route = RouteBuilder::from("direct:start")
2985            .route_id("script-test")
2986            .script("rhai", "headers[\"x\"] = \"y\"")
2987            .build()
2988            .unwrap();
2989
2990        assert!(matches!(
2991            &route.steps()[0],
2992            BuilderStep::Script { language, script }
2993            if language == "rhai" && script == "headers[\"x\"] = \"y\""
2994        ));
2995    }
2996
2997    #[test]
2998    fn test_builder_delay_and_delay_with_header_add_steps() {
2999        let route = RouteBuilder::from("direct:start")
3000            .route_id("delay-test")
3001            .delay(Duration::from_millis(250))
3002            .delay_with_header(Duration::from_millis(500), "x-delay")
3003            .build()
3004            .unwrap();
3005
3006        assert_eq!(route.steps().len(), 2);
3007        assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
3008        assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
3009    }
3010
3011    #[test]
3012    fn test_builder_log_and_stop_add_steps_in_order() {
3013        let route = RouteBuilder::from("direct:start")
3014            .route_id("log-stop-test")
3015            .log("hello", LogLevel::Info)
3016            .stop()
3017            .to("mock:after")
3018            .build()
3019            .unwrap();
3020
3021        assert_eq!(route.steps().len(), 3);
3022        assert!(matches!(
3023            &route.steps()[0],
3024            BuilderStep::Log { message, .. } if message == "hello"
3025        ));
3026        assert!(matches!(&route.steps()[1], BuilderStep::Stop));
3027        assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
3028    }
3029
3030    #[test]
3031    fn test_builder_stream_cache_default_adds_processor_step() {
3032        let route = RouteBuilder::from("direct:start")
3033            .route_id("stream-cache-default-test")
3034            .stream_cache_default()
3035            .build()
3036            .unwrap();
3037
3038        assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
3039    }
3040
3041    #[test]
3042    fn test_validate_preserves_existing_validator_prefix() {
3043        let route = RouteBuilder::from("direct:in")
3044            .route_id("validate-prefix-test")
3045            .validate("validator:schemas/order.xsd")
3046            .build()
3047            .unwrap();
3048
3049        assert!(matches!(
3050            &route.steps()[0],
3051            BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
3052        ));
3053    }
3054
3055    #[test]
3056    fn test_load_balance_builder_weighted_failover_parallel_config() {
3057        let route = RouteBuilder::from("direct:start")
3058            .route_id("lb-weighted-failover-parallel")
3059            .load_balance()
3060            .weighted(vec![
3061                ("direct:a".to_string(), 3),
3062                ("direct:b".to_string(), 1),
3063            ])
3064            .failover()
3065            .parallel(true)
3066            .to("mock:result")
3067            .end_load_balance()
3068            .build()
3069            .unwrap();
3070
3071        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3072            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3073            assert!(config.parallel);
3074        } else {
3075            panic!("Expected LoadBalance step");
3076        }
3077    }
3078
3079    #[test]
3080    fn test_multicast_builder_all_config_setters() {
3081        let route = RouteBuilder::from("direct:start")
3082            .route_id("multicast-config-test")
3083            .multicast()
3084            .parallel(true)
3085            .parallel_limit(4)
3086            .stop_on_exception(true)
3087            .timeout(Duration::from_millis(300))
3088            .aggregation(MulticastStrategy::Original)
3089            .to("mock:a")
3090            .end_multicast()
3091            .build()
3092            .unwrap();
3093
3094        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3095            assert!(config.parallel);
3096            assert_eq!(config.parallel_limit, Some(4));
3097            assert!(config.stop_on_exception);
3098            assert_eq!(config.timeout, Some(Duration::from_millis(300)));
3099            assert!(matches!(config.aggregation, MulticastStrategy::Original));
3100        } else {
3101            panic!("Expected Multicast step");
3102        }
3103    }
3104
3105    #[test]
3106    fn test_build_canonical_rejects_unsupported_processor_step() {
3107        let err = RouteBuilder::from("direct:start")
3108            .route_id("canonical-reject")
3109            .set_header("k", Value::String("v".into()))
3110            .build_canonical()
3111            .unwrap_err();
3112
3113        assert!(format!("{err}").contains("does not support step `processor`"));
3114    }
3115
3116    // ── LoadBalance strategy-specific tests ─────────────────────────────────────
3117
3118    #[test]
3119    fn test_load_balance_builder_weighted_strategy() {
3120        let route = RouteBuilder::from("direct:start")
3121            .route_id("lb-weighted")
3122            .load_balance()
3123            .weighted(vec![
3124                ("direct:a".to_string(), 5),
3125                ("direct:b".to_string(), 2),
3126                ("direct:c".to_string(), 1),
3127            ])
3128            .to("mock:result")
3129            .end_load_balance()
3130            .build()
3131            .unwrap();
3132
3133        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3134            assert!(matches!(config.strategy, LoadBalanceStrategy::Weighted(_)));
3135        } else {
3136            panic!("Expected LoadBalance step");
3137        }
3138    }
3139
3140    #[test]
3141    fn test_load_balance_builder_failover_strategy() {
3142        let route = RouteBuilder::from("direct:start")
3143            .route_id("lb-failover")
3144            .load_balance()
3145            .failover()
3146            .to("mock:primary")
3147            .end_load_balance()
3148            .build()
3149            .unwrap();
3150
3151        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3152            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3153            assert!(!config.parallel);
3154        } else {
3155            panic!("Expected LoadBalance step");
3156        }
3157    }
3158
3159    #[test]
3160    fn test_load_balance_builder_parallel_false_explicit() {
3161        let route = RouteBuilder::from("direct:start")
3162            .route_id("lb-parallel-false")
3163            .load_balance()
3164            .round_robin()
3165            .parallel(false)
3166            .to("mock:result")
3167            .end_load_balance()
3168            .build()
3169            .unwrap();
3170
3171        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3172            assert!(!config.parallel);
3173        } else {
3174            panic!("Expected LoadBalance step");
3175        }
3176    }
3177
3178    // ── FilterInSplitBuilder tests ──────────────────────────────────────────────
3179
3180    #[test]
3181    fn test_filter_in_split_builder_typestate() {
3182        use camel_api::splitter::{SplitterConfig, split_body_lines};
3183
3184        let definition = RouteBuilder::from("timer:test")
3185            .route_id("filter-in-split")
3186            .split(SplitterConfig::new(split_body_lines()))
3187            .filter(|_ex| true)
3188            .to("mock:filtered")
3189            .end_filter()
3190            .end_split()
3191            .build()
3192            .unwrap();
3193
3194        assert_eq!(definition.steps().len(), 1);
3195        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3196            assert_eq!(steps.len(), 1);
3197            assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3198        } else {
3199            panic!("Expected Split step");
3200        }
3201    }
3202
3203    #[test]
3204    fn test_filter_in_split_builder_multiple_steps() {
3205        use camel_api::splitter::{SplitterConfig, split_body_lines};
3206
3207        let definition = RouteBuilder::from("timer:test")
3208            .route_id("filter-in-split-multi")
3209            .split(SplitterConfig::new(split_body_lines()))
3210            .to("mock:before-filter")
3211            .filter(|_ex| true)
3212            .to("mock:inside-filter")
3213            .end_filter()
3214            .to("mock:after-filter")
3215            .end_split()
3216            .build()
3217            .unwrap();
3218
3219        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3220            // To("before-filter") + Filter{...} + To("after-filter") = 3
3221            assert_eq!(steps.len(), 3);
3222        } else {
3223            panic!("Expected Split step");
3224        }
3225    }
3226
3227    // ── build_canonical tests ───────────────────────────────────────────────────
3228
3229    #[test]
3230    fn test_build_canonical_with_circuit_breaker() {
3231        use camel_api::circuit_breaker::CircuitBreakerConfig;
3232
3233        let spec = RouteBuilder::from("direct:start")
3234            .route_id("canonical-cb")
3235            .circuit_breaker(CircuitBreakerConfig::new().failure_threshold(10))
3236            .to("mock:result")
3237            .build_canonical()
3238            .unwrap();
3239
3240        let cb = spec.circuit_breaker.expect("circuit breaker should be set");
3241        assert_eq!(cb.failure_threshold, 10);
3242    }
3243
3244    #[test]
3245    fn test_build_canonical_rejects_custom_split_aggregation() {
3246        use camel_api::splitter::{SplitterConfig, split_body_lines};
3247
3248        let err = RouteBuilder::from("direct:start")
3249            .route_id("canonical-custom-split")
3250            .split(SplitterConfig::new(split_body_lines()).aggregation(
3251                camel_api::splitter::AggregationStrategy::Custom(Arc::new(|_, ex| ex)),
3252            ))
3253            .to("mock:frag")
3254            .end_split()
3255            .build_canonical()
3256            .unwrap_err();
3257
3258        // Split with closure-based expression is rejected in canonical v1.
3259        assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3260    }
3261
3262    #[test]
3263    fn test_build_canonical_rejects_custom_aggregate_strategy() {
3264        let err = RouteBuilder::from("direct:start")
3265            .route_id("canonical-custom-agg")
3266            .aggregate(
3267                AggregatorConfig::correlate_by("key")
3268                    .complete_when_size(2)
3269                    .strategy(AggregationStrategy::Custom(Arc::new(|_, ex| ex)))
3270                    .build()
3271                    .unwrap(),
3272            )
3273            .build_canonical()
3274            .unwrap_err();
3275
3276        assert!(format!("{err}").contains("custom aggregate strategy"));
3277    }
3278
3279    #[test]
3280    fn test_build_canonical_rejects_fn_correlation_strategy() {
3281        let err = RouteBuilder::from("direct:start")
3282            .route_id("canonical-fn-corr")
3283            .aggregate(AggregatorConfig {
3284                header_name: "key".to_string(),
3285                completion: CompletionMode::Single(CompletionCondition::Size(1)),
3286                correlation: CorrelationStrategy::Fn(Arc::new(|_| Some("key".to_string()))),
3287                strategy: AggregationStrategy::CollectAll,
3288                max_buckets: None,
3289                bucket_ttl: None,
3290                force_completion_on_stop: false,
3291                discard_on_timeout: false,
3292            })
3293            .build_canonical()
3294            .unwrap_err();
3295
3296        assert!(format!("{err}").contains("Fn correlation strategy"));
3297    }
3298
3299    #[test]
3300    fn test_build_canonical_rejects_predicate_completion() {
3301        let err = RouteBuilder::from("direct:start")
3302            .route_id("canonical-pred-completion")
3303            .aggregate(AggregatorConfig {
3304                header_name: "key".to_string(),
3305                completion: CompletionMode::Single(CompletionCondition::Predicate(Arc::new(
3306                    |_| false,
3307                ))),
3308                correlation: CorrelationStrategy::HeaderName("key".to_string()),
3309                strategy: AggregationStrategy::CollectAll,
3310                max_buckets: None,
3311                bucket_ttl: None,
3312                force_completion_on_stop: false,
3313                discard_on_timeout: false,
3314            })
3315            .build_canonical()
3316            .unwrap_err();
3317
3318        assert!(format!("{err}").contains("predicate completion"));
3319    }
3320
3321    #[test]
3322    fn test_build_canonical_with_expression_correlation() {
3323        let spec = RouteBuilder::from("direct:start")
3324            .route_id("canonical-expr-corr")
3325            .aggregate(AggregatorConfig {
3326                header_name: "key".to_string(),
3327                completion: CompletionMode::Single(CompletionCondition::Size(1)),
3328                correlation: CorrelationStrategy::Expression {
3329                    expr: "header.key".to_string(),
3330                    language: "simple".to_string(),
3331                },
3332                strategy: AggregationStrategy::CollectAll,
3333                max_buckets: None,
3334                bucket_ttl: None,
3335                force_completion_on_stop: false,
3336                discard_on_timeout: false,
3337            })
3338            .build_canonical()
3339            .unwrap();
3340
3341        assert!(spec.steps.iter().any(|s| matches!(s, CanonicalStepSpec::Aggregate(a) if a.correlation_key == Some("header.key".to_string()))));
3342    }
3343
3344    #[test]
3345    fn test_build_canonical_split_rejected_with_closure_expression() {
3346        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
3347
3348        // Builder-based split uses closure expressions, which are not serializable.
3349        let err = RouteBuilder::from("direct:start")
3350            .route_id("canonical-split-last")
3351            .split(
3352                SplitterConfig::new(split_body_lines()).aggregation(AggregationStrategy::LastWins),
3353            )
3354            .to("mock:frag")
3355            .end_split()
3356            .build_canonical()
3357            .unwrap_err();
3358
3359        assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3360    }
3361
3362    // ── OnExceptionBuilder full chain tests ─────────────────────────────────────
3363
3364    #[test]
3365    fn test_on_exception_full_chain_retry_backoff_jitter_handled_by() {
3366        let definition = RouteBuilder::from("direct:start")
3367            .route_id("on-exception-full")
3368            .dead_letter_channel("log:dlc")
3369            .on_exception(|e| matches!(e, CamelError::Io(_)))
3370            .retry(5)
3371            .with_backoff(Duration::from_millis(10), 2.0, Duration::from_millis(500))
3372            .with_jitter(0.3)
3373            .handled_by("log:io-handler")
3374            .end_on_exception()
3375            .to("mock:out")
3376            .build()
3377            .unwrap();
3378
3379        let cfg = definition
3380            .error_handler_config()
3381            .expect("error handler should be set");
3382        assert_eq!(cfg.policies.len(), 1);
3383        let policy = &cfg.policies[0];
3384        let retry = policy.retry.as_ref().expect("retry should be set");
3385        assert_eq!(retry.max_attempts, 5);
3386        assert_eq!(retry.initial_delay, Duration::from_millis(10));
3387        assert_eq!(retry.multiplier, 2.0);
3388        assert_eq!(retry.max_delay, Duration::from_millis(500));
3389        assert!((retry.jitter_factor - 0.3).abs() < f64::EPSILON);
3390        assert_eq!(policy.handled_by.as_deref(), Some("log:io-handler"));
3391    }
3392
3393    #[test]
3394    fn test_on_exception_jitter_clamped_to_valid_range() {
3395        let definition = RouteBuilder::from("direct:start")
3396            .route_id("jitter-clamp")
3397            .on_exception(|_e| true)
3398            .retry(1)
3399            .with_jitter(5.0)
3400            .end_on_exception()
3401            .to("mock:out")
3402            .build()
3403            .unwrap();
3404
3405        let cfg = definition.error_handler_config().unwrap();
3406        let retry = cfg.policies[0].retry.as_ref().unwrap();
3407        assert!((retry.jitter_factor - 1.0).abs() < f64::EPSILON);
3408    }
3409
3410    // ── StepAccumulator: process_fn, convert_body_to, bean ──────────────────────
3411
3412    #[test]
3413    fn test_builder_process_fn_adds_processor_step() {
3414        use camel_api::BoxProcessorExt;
3415        let processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
3416        let definition = RouteBuilder::from("timer:tick")
3417            .route_id("process-fn-test")
3418            .process_fn(processor)
3419            .build()
3420            .unwrap();
3421
3422        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3423    }
3424
3425    #[test]
3426    fn test_builder_convert_body_to_adds_processor_step() {
3427        let definition = RouteBuilder::from("timer:tick")
3428            .route_id("convert-body-test")
3429            .convert_body_to(BodyType::Json)
3430            .build()
3431            .unwrap();
3432
3433        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3434    }
3435
3436    #[test]
3437    fn test_builder_bean_adds_bean_step() {
3438        let definition = RouteBuilder::from("timer:tick")
3439            .route_id("bean-test")
3440            .bean("myBean", "process")
3441            .build()
3442            .unwrap();
3443
3444        assert!(
3445            matches!(&definition.steps()[0], BuilderStep::Bean { name, method }
3446            if name == "myBean" && method == "process")
3447        );
3448    }
3449
3450    // ── Throttle strategy-specific tests ────────────────────────────────────────
3451
3452    #[test]
3453    fn test_throttle_builder_delay_strategy() {
3454        let definition = RouteBuilder::from("timer:tick")
3455            .route_id("throttle-delay")
3456            .throttle(10, Duration::from_secs(1))
3457            .strategy(ThrottleStrategy::Delay)
3458            .to("mock:result")
3459            .end_throttle()
3460            .build()
3461            .unwrap();
3462
3463        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3464            assert_eq!(config.strategy, ThrottleStrategy::Delay);
3465        } else {
3466            panic!("Expected Throttle step");
3467        }
3468    }
3469
3470    #[test]
3471    fn test_throttle_builder_drop_strategy() {
3472        let definition = RouteBuilder::from("timer:tick")
3473            .route_id("throttle-drop")
3474            .throttle(10, Duration::from_secs(1))
3475            .strategy(ThrottleStrategy::Drop)
3476            .to("mock:result")
3477            .end_throttle()
3478            .build()
3479            .unwrap();
3480
3481        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3482            assert_eq!(config.strategy, ThrottleStrategy::Drop);
3483        } else {
3484            panic!("Expected Throttle step");
3485        }
3486    }
3487
3488    // ── LoopInLoopBuilder with loop_while ───────────────────────────────────────
3489
3490    #[test]
3491    fn test_nested_loop_while_builder() {
3492        use camel_api::loop_eip::LoopMode;
3493
3494        let def = RouteBuilder::from("direct:start")
3495            .route_id("nested-loop-while")
3496            .loop_count(2)
3497            .to("mock:outer")
3498            .loop_while(|_ex| true)
3499            .to("mock:inner")
3500            .end_loop()
3501            .end_loop()
3502            .build()
3503            .unwrap();
3504
3505        assert_eq!(def.steps().len(), 1);
3506        if let BuilderStep::Loop { steps, .. } = &def.steps()[0] {
3507            assert_eq!(steps.len(), 2);
3508            if let BuilderStep::Loop { config, .. } = &steps[1] {
3509                assert!(matches!(config.mode, LoopMode::While(_)));
3510            } else {
3511                panic!("Expected inner Loop step");
3512            }
3513        } else {
3514            panic!("Expected outer Loop step");
3515        }
3516    }
3517
3518    // ── Choice with multiple whens + otherwise ──────────────────────────────────
3519
3520    #[test]
3521    fn test_choice_builder_multiple_whens_with_otherwise() {
3522        let definition = RouteBuilder::from("timer:tick")
3523            .route_id("choice-multi-otherwise")
3524            .choice()
3525            .when(|ex: &Exchange| ex.input.header("a").is_some())
3526            .to("mock:a")
3527            .end_when()
3528            .when(|ex: &Exchange| ex.input.header("b").is_some())
3529            .to("mock:b")
3530            .end_when()
3531            .when(|ex: &Exchange| ex.input.header("c").is_some())
3532            .to("mock:c")
3533            .end_when()
3534            .otherwise()
3535            .to("mock:fallback")
3536            .end_otherwise()
3537            .end_choice()
3538            .build()
3539            .unwrap();
3540
3541        if let BuilderStep::Choice { whens, otherwise } = &definition.steps()[0] {
3542            assert_eq!(whens.len(), 3);
3543            assert!(otherwise.is_some());
3544            assert_eq!(otherwise.as_ref().unwrap().len(), 1);
3545        } else {
3546            panic!("Expected Choice step");
3547        }
3548    }
3549
3550    // ── Multicast individual config tests ───────────────────────────────────────
3551
3552    #[test]
3553    fn test_multicast_builder_parallel_only() {
3554        let route = RouteBuilder::from("direct:start")
3555            .route_id("multicast-parallel")
3556            .multicast()
3557            .parallel(true)
3558            .to("mock:a")
3559            .end_multicast()
3560            .build()
3561            .unwrap();
3562
3563        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3564            assert!(config.parallel);
3565            assert_eq!(config.parallel_limit, None);
3566        } else {
3567            panic!("Expected Multicast step");
3568        }
3569    }
3570
3571    #[test]
3572    fn test_multicast_builder_timeout_only() {
3573        let route = RouteBuilder::from("direct:start")
3574            .route_id("multicast-timeout")
3575            .multicast()
3576            .timeout(Duration::from_secs(5))
3577            .to("mock:a")
3578            .end_multicast()
3579            .build()
3580            .unwrap();
3581
3582        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3583            assert_eq!(config.timeout, Some(Duration::from_secs(5)));
3584        } else {
3585            panic!("Expected Multicast step");
3586        }
3587    }
3588
3589    #[test]
3590    fn test_multicast_builder_aggregation_collect_all() {
3591        let route = RouteBuilder::from("direct:start")
3592            .route_id("multicast-collect")
3593            .multicast()
3594            .aggregation(MulticastStrategy::CollectAll)
3595            .to("mock:a")
3596            .end_multicast()
3597            .build()
3598            .unwrap();
3599
3600        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3601            assert!(matches!(config.aggregation, MulticastStrategy::CollectAll));
3602        } else {
3603            panic!("Expected Multicast step");
3604        }
3605    }
3606
3607    // ── extract_completion_fields: Any mode with multiple conditions ────────────
3608
3609    #[test]
3610    fn test_build_canonical_aggregate_any_completion_mode() {
3611        let spec = RouteBuilder::from("direct:start")
3612            .route_id("canonical-any-completion")
3613            .aggregate(
3614                AggregatorConfig::correlate_by("key")
3615                    .complete_on_size_or_timeout(10, Duration::from_secs(30))
3616                    .build()
3617                    .unwrap(),
3618            )
3619            .build_canonical()
3620            .unwrap();
3621
3622        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3623            assert_eq!(agg.completion_size, Some(10));
3624            assert_eq!(agg.completion_timeout_ms, Some(30_000));
3625        } else {
3626            panic!("Expected Aggregate step");
3627        }
3628    }
3629
3630    #[test]
3631    fn test_build_canonical_aggregate_timeout_completion() {
3632        let spec = RouteBuilder::from("direct:start")
3633            .route_id("canonical-timeout-completion")
3634            .aggregate(
3635                AggregatorConfig::correlate_by("key")
3636                    .complete_on_timeout(Duration::from_millis(500))
3637                    .build()
3638                    .unwrap(),
3639            )
3640            .build_canonical()
3641            .unwrap();
3642
3643        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3644            assert_eq!(agg.completion_size, None);
3645            assert_eq!(agg.completion_timeout_ms, Some(500));
3646        } else {
3647            panic!("Expected Aggregate step");
3648        }
3649    }
3650
3651    // ── canonicalize_aggregate: discard_on_timeout and force_completion_on_stop ─
3652
3653    #[test]
3654    fn test_build_canonical_aggregate_discard_on_timeout() {
3655        use camel_api::aggregator::AggregatorConfig;
3656
3657        let spec = RouteBuilder::from("direct:start")
3658            .route_id("canonical-discard-timeout")
3659            .aggregate(
3660                AggregatorConfig::correlate_by("key")
3661                    .complete_when_size(1)
3662                    .discard_on_timeout(true)
3663                    .build()
3664                    .unwrap(),
3665            )
3666            .build_canonical()
3667            .unwrap();
3668
3669        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3670            assert_eq!(agg.discard_on_timeout, Some(true));
3671        } else {
3672            panic!("Expected Aggregate step");
3673        }
3674    }
3675
3676    #[test]
3677    fn test_build_canonical_aggregate_force_completion_on_stop() {
3678        use camel_api::aggregator::AggregatorConfig;
3679
3680        let spec = RouteBuilder::from("direct:start")
3681            .route_id("canonical-force-stop")
3682            .aggregate(
3683                AggregatorConfig::correlate_by("key")
3684                    .complete_when_size(1)
3685                    .force_completion_on_stop(true)
3686                    .build()
3687                    .unwrap(),
3688            )
3689            .build_canonical()
3690            .unwrap();
3691
3692        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3693            assert_eq!(agg.force_completion_on_stop, Some(true));
3694        } else {
3695            panic!("Expected Aggregate step");
3696        }
3697    }
3698
3699    // ── build_canonical: max_buckets and bucket_ttl ─────────────────────────────
3700
3701    #[test]
3702    fn test_build_canonical_aggregate_max_buckets_and_ttl() {
3703        use camel_api::aggregator::AggregatorConfig;
3704
3705        let spec = RouteBuilder::from("direct:start")
3706            .route_id("canonical-buckets-ttl")
3707            .aggregate(
3708                AggregatorConfig::correlate_by("key")
3709                    .complete_when_size(1)
3710                    .max_buckets(100)
3711                    .bucket_ttl(Duration::from_secs(60))
3712                    .build()
3713                    .unwrap(),
3714            )
3715            .build_canonical()
3716            .unwrap();
3717
3718        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3719            assert_eq!(agg.max_buckets, Some(100));
3720            assert_eq!(agg.bucket_ttl_ms, Some(60_000));
3721        } else {
3722            panic!("Expected Aggregate step");
3723        }
3724    }
3725
3726    // ── SplitBuilder with filter inside ─────────────────────────────────────────
3727
3728    #[test]
3729    fn test_split_builder_with_filter_inside() {
3730        use camel_api::splitter::{SplitterConfig, split_body_lines};
3731
3732        let definition = RouteBuilder::from("timer:test")
3733            .route_id("split-with-filter")
3734            .split(SplitterConfig::new(split_body_lines()))
3735            .filter(|_ex| true)
3736            .to("mock:filtered-frag")
3737            .end_filter()
3738            .end_split()
3739            .build()
3740            .unwrap();
3741
3742        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3743            assert_eq!(steps.len(), 1);
3744            assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3745        } else {
3746            panic!("Expected Split step");
3747        }
3748    }
3749
3750    // ── WireTap additional tests ────────────────────────────────────────────────
3751
3752    #[test]
3753    fn test_wire_tap_multiple_taps() {
3754        let definition = RouteBuilder::from("timer:tick")
3755            .route_id("multi-wire-tap")
3756            .wire_tap("mock:tap1")
3757            .wire_tap("mock:tap2")
3758            .to("mock:result")
3759            .build()
3760            .unwrap();
3761
3762        assert_eq!(definition.steps().len(), 3);
3763        assert!(
3764            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap1")
3765        );
3766        assert!(
3767            matches!(&definition.steps()[1], BuilderStep::WireTap { uri } if uri == "mock:tap2")
3768        );
3769    }
3770
3771    // ── Error handler: explicit config after shorthand → Mixed mode ─────────────
3772
3773    #[test]
3774    fn test_builder_shorthand_then_explicit_mixed_mode() {
3775        let result = RouteBuilder::from("direct:start")
3776            .route_id("mixed-mode-2")
3777            .dead_letter_channel("log:dlc")
3778            .error_handler(ErrorHandlerConfig::log_only())
3779            .to("mock:out")
3780            .build();
3781
3782        let err = result.err().expect("mixed mode should fail");
3783        assert!(format!("{err}").contains("mixed error handler modes"));
3784    }
3785
3786    // ── build_canonical: empty from_uri error ───────────────────────────────────
3787
3788    #[test]
3789    fn test_build_canonical_empty_from_uri_errors() {
3790        let result = RouteBuilder::from("").route_id("test").build_canonical();
3791        assert!(result.is_err());
3792    }
3793
3794    #[test]
3795    fn test_build_canonical_missing_route_id_errors() {
3796        let result = RouteBuilder::from("direct:start").build_canonical();
3797        assert!(result.is_err());
3798        let err = result.unwrap_err().to_string();
3799        assert!(err.contains("route_id"));
3800    }
3801
3802    // ── SplitBuilder: aggregate inside split ────────────────────────────────────
3803
3804    #[test]
3805    fn test_split_builder_with_aggregate_inside() {
3806        use camel_api::aggregator::AggregatorConfig;
3807        use camel_api::splitter::{SplitterConfig, split_body_lines};
3808
3809        let definition = RouteBuilder::from("timer:test")
3810            .route_id("split-agg")
3811            .split(SplitterConfig::new(split_body_lines()))
3812            .aggregate(
3813                AggregatorConfig::correlate_by("frag-key")
3814                    .complete_when_size(3)
3815                    .build()
3816                    .unwrap(),
3817            )
3818            .end_split()
3819            .build()
3820            .unwrap();
3821
3822        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3823            assert_eq!(steps.len(), 1);
3824            assert!(matches!(&steps[0], BuilderStep::Aggregate { .. }));
3825        } else {
3826            panic!("Expected Split step");
3827        }
3828    }
3829
3830    // ── Throttle: steps collected inside throttle scope ─────────────────────────
3831
3832    #[test]
3833    fn test_throttle_builder_with_steps_inside() {
3834        let definition = RouteBuilder::from("timer:tick")
3835            .route_id("throttle-steps")
3836            .throttle(10, Duration::from_secs(1))
3837            .set_header("throttled", Value::Bool(true))
3838            .to("mock:throttled")
3839            .end_throttle()
3840            .build()
3841            .unwrap();
3842
3843        if let BuilderStep::Throttle { steps, .. } = &definition.steps()[0] {
3844            assert_eq!(steps.len(), 2);
3845        } else {
3846            panic!("Expected Throttle step");
3847        }
3848    }
3849
3850    // ── LoadBalance: steps collected inside scope ───────────────────────────────
3851
3852    #[test]
3853    fn test_load_balance_builder_with_steps_inside() {
3854        let definition = RouteBuilder::from("timer:tick")
3855            .route_id("lb-steps")
3856            .load_balance()
3857            .round_robin()
3858            .set_header("lb", Value::Bool(true))
3859            .to("mock:lb")
3860            .end_load_balance()
3861            .build()
3862            .unwrap();
3863
3864        if let BuilderStep::LoadBalance { steps, .. } = &definition.steps()[0] {
3865            assert_eq!(steps.len(), 2);
3866        } else {
3867            panic!("Expected LoadBalance step");
3868        }
3869    }
3870
3871    // ── Multicast: steps collected inside scope ─────────────────────────────────
3872
3873    #[test]
3874    fn test_multicast_builder_with_steps_inside() {
3875        let definition = RouteBuilder::from("timer:tick")
3876            .route_id("multicast-steps")
3877            .multicast()
3878            .set_header("mc", Value::Bool(true))
3879            .to("mock:multicast")
3880            .end_multicast()
3881            .build()
3882            .unwrap();
3883
3884        if let BuilderStep::Multicast { steps, .. } = &definition.steps()[0] {
3885            assert_eq!(steps.len(), 2);
3886        } else {
3887            panic!("Expected Multicast step");
3888        }
3889    }
3890
3891    // ── LoopBuilder: steps collected inside loop scope ──────────────────────────
3892
3893    #[test]
3894    fn test_loop_builder_with_steps_inside() {
3895        let definition = RouteBuilder::from("timer:tick")
3896            .route_id("loop-steps")
3897            .loop_count(3)
3898            .set_header("loop", Value::Bool(true))
3899            .to("mock:loop")
3900            .end_loop()
3901            .build()
3902            .unwrap();
3903
3904        if let BuilderStep::Loop { steps, .. } = &definition.steps()[0] {
3905            assert_eq!(steps.len(), 2);
3906        } else {
3907            panic!("Expected Loop step");
3908        }
3909    }
3910
3911    // ── canonical_step_name coverage for remaining variants ─────────────────────
3912
3913    #[test]
3914    fn test_build_canonical_rejects_loop_step() {
3915        let err = RouteBuilder::from("direct:start")
3916            .route_id("canonical-loop")
3917            .loop_count(3)
3918            .to("mock:loop")
3919            .end_loop()
3920            .build_canonical()
3921            .unwrap_err();
3922
3923        assert!(format!("{err}").contains("does not support step `loop`"));
3924    }
3925
3926    #[test]
3927    fn test_build_canonical_rejects_multicast_step() {
3928        let err = RouteBuilder::from("direct:start")
3929            .route_id("canonical-multicast")
3930            .multicast()
3931            .to("mock:a")
3932            .end_multicast()
3933            .build_canonical()
3934            .unwrap_err();
3935
3936        assert!(format!("{err}").contains("does not support step `multicast`"));
3937    }
3938
3939    #[test]
3940    fn test_build_canonical_rejects_throttle_step() {
3941        let err = RouteBuilder::from("direct:start")
3942            .route_id("canonical-throttle")
3943            .throttle(10, Duration::from_secs(1))
3944            .to("mock:result")
3945            .end_throttle()
3946            .build_canonical()
3947            .unwrap_err();
3948
3949        assert!(format!("{err}").contains("does not support step `throttle`"));
3950    }
3951
3952    #[test]
3953    fn test_build_canonical_rejects_load_balancer_step() {
3954        let err = RouteBuilder::from("direct:start")
3955            .route_id("canonical-lb")
3956            .load_balance()
3957            .round_robin()
3958            .to("mock:result")
3959            .end_load_balance()
3960            .build_canonical()
3961            .unwrap_err();
3962
3963        assert!(format!("{err}").contains("does not support step `load_balancer`"));
3964    }
3965
3966    #[test]
3967    fn test_build_canonical_rejects_bean_step() {
3968        let err = RouteBuilder::from("direct:start")
3969            .route_id("canonical-bean")
3970            .bean("myBean", "process")
3971            .build_canonical()
3972            .unwrap_err();
3973
3974        assert!(format!("{err}").contains("does not support step `bean`"));
3975    }
3976
3977    #[test]
3978    fn test_build_canonical_rejects_script_step() {
3979        let err = RouteBuilder::from("direct:start")
3980            .route_id("canonical-script")
3981            .script("rhai", "x = 1")
3982            .build_canonical()
3983            .unwrap_err();
3984
3985        assert!(format!("{err}").contains("does not support step `script`"));
3986    }
3987
3988    #[test]
3989    fn test_build_canonical_accepts_delay_step() {
3990        let spec = RouteBuilder::from("direct:start")
3991            .route_id("canonical-delay")
3992            .delay(Duration::from_millis(100))
3993            .build_canonical()
3994            .unwrap();
3995
3996        assert!(
3997            spec.steps.iter().any(
3998                |s| matches!(s, CanonicalStepSpec::Delay { delay_ms, .. } if *delay_ms == 100)
3999            )
4000        );
4001    }
4002
4003    #[test]
4004    fn test_build_canonical_accepts_wire_tap_step() {
4005        let spec = RouteBuilder::from("direct:start")
4006            .route_id("canonical-wiretap")
4007            .wire_tap("mock:tap")
4008            .build_canonical()
4009            .unwrap();
4010
4011        assert!(
4012            spec.steps
4013                .iter()
4014                .any(|s| matches!(s, CanonicalStepSpec::WireTap { uri } if uri == "mock:tap"))
4015        );
4016    }
4017
4018    #[test]
4019    fn test_build_canonical_rejects_dynamic_router_step() {
4020        let err = RouteBuilder::from("direct:start")
4021            .route_id("canonical-dyn-router")
4022            .dynamic_router(Arc::new(|_| Some("mock:a".to_string())))
4023            .build_canonical()
4024            .unwrap_err();
4025
4026        assert!(format!("{err}").contains("does not support step `dynamic_router`"));
4027    }
4028
4029    #[test]
4030    fn test_build_canonical_rejects_routing_slip_step() {
4031        let err = RouteBuilder::from("direct:start")
4032            .route_id("canonical-routing-slip")
4033            .routing_slip(Arc::new(|_| Some("mock:a".to_string())))
4034            .build_canonical()
4035            .unwrap_err();
4036
4037        assert!(format!("{err}").contains("does not support step `routing_slip`"));
4038    }
4039
4040    #[test]
4041    fn test_build_canonical_rejects_recipient_list_step() {
4042        let err = RouteBuilder::from("direct:start")
4043            .route_id("canonical-recipient")
4044            .recipient_list(Arc::new(|_| "mock:a".to_string()))
4045            .build_canonical()
4046            .unwrap_err();
4047
4048        assert!(format!("{err}").contains("does not support step `recipient_list`"));
4049    }
4050
4051    // ── extract_completion_fields: Any mode with predicate → error ──────────────
4052
4053    #[test]
4054    fn test_build_canonical_rejects_any_mode_with_predicate() {
4055        let err = RouteBuilder::from("direct:start")
4056            .route_id("canonical-any-pred")
4057            .aggregate(AggregatorConfig {
4058                header_name: "key".to_string(),
4059                completion: CompletionMode::Any(vec![
4060                    CompletionCondition::Size(5),
4061                    CompletionCondition::Predicate(Arc::new(|_| false)),
4062                ]),
4063                correlation: CorrelationStrategy::HeaderName("key".to_string()),
4064                strategy: AggregationStrategy::CollectAll,
4065                max_buckets: None,
4066                bucket_ttl: None,
4067                force_completion_on_stop: false,
4068                discard_on_timeout: false,
4069            })
4070            .build_canonical()
4071            .unwrap_err();
4072
4073        assert!(format!("{err}").contains("predicate completion"));
4074    }
4075
4076    // ── BUILDER-004: Validation errors for missing required fields ────────────
4077
4078    #[test]
4079    fn test_builder_validation_missing_from_uri() {
4080        let result = RouteBuilder::from("")
4081            .route_id("missing-uri-route")
4082            .to("log:info")
4083            .build();
4084        assert!(result.is_err(), "empty from URI should fail validation");
4085        let err = result.err().unwrap().to_string();
4086        assert!(
4087            err.contains("'from'") || err.contains("URI"),
4088            "error should mention from/URI, got: {err}"
4089        );
4090    }
4091
4092    #[test]
4093    fn test_builder_validation_invalid_step_uri_scheme() {
4094        let result = RouteBuilder::from("timer:tick")
4095            .route_id("bad-step-route")
4096            .to("not-a-valid-uri") // no scheme
4097            .build();
4098        // The builder itself accepts any URI string; validation happens at
4099        // resolution time. Verify the build succeeds (step URI is deferred).
4100        assert!(
4101            result.is_ok(),
4102            "builder should accept opaque step URIs; resolution happens later"
4103        );
4104    }
4105
4106    // ── BUILDER-006: Duplicate route IDs ──────────────────────────────────────
4107
4108    #[test]
4109    fn test_builder_duplicate_route_ids_produce_identical_definitions() {
4110        // The builder itself doesn't check for duplicates (that's context-level).
4111        // Verify both builds succeed with the same ID — detection is TODO(BUILDER-006).
4112        let route1 = RouteBuilder::from("direct:a")
4113            .route_id("dup-route")
4114            .to("mock:out")
4115            .build();
4116        let route2 = RouteBuilder::from("direct:b")
4117            .route_id("dup-route")
4118            .to("mock:out")
4119            .build();
4120
4121        assert!(route1.is_ok());
4122        assert!(route2.is_ok());
4123        assert_eq!(route1.unwrap().route_id(), route2.unwrap().route_id());
4124    }
4125}