Skip to main content

camel_builder/
lib.rs

1//! Fluent builder API for constructing Camel routes programmatically with EIP patterns.
2//!
3//! Main types: `RouteBuilder`, `StepAccumulator`, `SplitBuilder`, `ChoiceBuilder`, `MulticastBuilder`,
4//! `ThrottleBuilder`, `LoopBuilder`, `LoadBalancerBuilder`, `OnExceptionBuilder`.
5
6use camel_api::DelayConfig;
7use camel_api::aggregator::{
8    AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
9};
10use camel_api::body::Body;
11use camel_api::body_converter::BodyType;
12use camel_api::circuit_breaker::CircuitBreakerConfig;
13use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
14use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
15use camel_api::load_balancer::LoadBalancerConfig;
16use camel_api::loop_eip::{LoopConfig, LoopMode};
17use camel_api::multicast::{MulticastConfig, MulticastStrategy};
18use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
19use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
20use camel_api::splitter::SplitterConfig;
21use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
22use camel_api::{
23    BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
24    ProcessorFn, Value,
25    runtime::{
26        CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
27        CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
28        CanonicalWhenSpec,
29    },
30};
31use camel_component_api::ConcurrencyModel;
32use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
33use camel_processor::{
34    ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
35    StreamCacheService, UnmarshalService, builtin_data_format,
36};
37
38/// Shared step-accumulation methods for all builder types.
39///
40/// Implementors provide `steps_mut()` and get step-adding methods for free.
41/// `filter()` and other branching methods are NOT included — they return
42/// different types per builder and stay as per-builder methods.
43pub trait StepAccumulator: Sized {
44    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
45
46    fn to(mut self, endpoint: impl Into<String>) -> Self {
47        self.steps_mut().push(BuilderStep::To(endpoint.into()));
48        self
49    }
50
51    fn process<F, Fut>(mut self, f: F) -> Self
52    where
53        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
54        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
55    {
56        let svc = ProcessorFn::new(f);
57        self.steps_mut()
58            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
59        self
60    }
61
62    fn process_fn(mut self, processor: BoxProcessor) -> Self {
63        self.steps_mut().push(BuilderStep::Processor(processor));
64        self
65    }
66
67    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
68        let svc = SetHeader::new(IdentityProcessor, key, value);
69        self.steps_mut()
70            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
71        self
72    }
73
74    fn map_body<F>(mut self, mapper: F) -> Self
75    where
76        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
77    {
78        let svc = MapBody::new(IdentityProcessor, mapper);
79        self.steps_mut()
80            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
81        self
82    }
83
84    fn set_body<B>(mut self, body: B) -> Self
85    where
86        B: Into<Body> + Clone + Send + Sync + 'static,
87    {
88        let body: Body = body.into();
89        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
90        self.steps_mut()
91            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
92        self
93    }
94
95    /// Apache Camel-compatible alias for [`set_body`](Self::set_body).
96    ///
97    /// Transforms the message body using the given value. Semantically identical
98    /// to `set_body` — provided for familiarity with Apache Camel route DSLs.
99    fn transform<B>(self, body: B) -> Self
100    where
101        B: Into<Body> + Clone + Send + Sync + 'static,
102    {
103        self.set_body(body)
104    }
105
106    fn set_body_fn<F>(mut self, expr: F) -> Self
107    where
108        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
109    {
110        let svc = SetBody::new(IdentityProcessor, expr);
111        self.steps_mut()
112            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
113        self
114    }
115
116    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
117    where
118        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
119    {
120        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
121        self.steps_mut()
122            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
123        self
124    }
125
126    fn aggregate(mut self, config: AggregatorConfig) -> Self {
127        self.steps_mut().push(BuilderStep::Aggregate { config });
128        self
129    }
130
131    /// Stop processing this exchange immediately. No further steps in the
132    /// current pipeline will run.
133    ///
134    /// Can be used at any point in the route: directly on RouteBuilder,
135    /// inside `.filter()`, inside `.split()`, etc.
136    fn stop(mut self) -> Self {
137        self.steps_mut().push(BuilderStep::Stop);
138        self
139    }
140
141    fn delay(mut self, duration: std::time::Duration) -> Self {
142        self.steps_mut().push(BuilderStep::Delay {
143            config: DelayConfig::from_duration(duration),
144        });
145        self
146    }
147
148    fn delay_with_header(
149        mut self,
150        duration: std::time::Duration,
151        header: impl Into<String>,
152    ) -> Self {
153        self.steps_mut().push(BuilderStep::Delay {
154            config: DelayConfig::from_duration_with_header(duration, header),
155        });
156        self
157    }
158
159    /// Log a message at the specified level.
160    ///
161    /// The message will be logged when an exchange passes through this step.
162    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
163        self.steps_mut().push(BuilderStep::Log {
164            level,
165            message: message.into(),
166        });
167        self
168    }
169
170    /// Convert the message body to the target type.
171    ///
172    /// Supported: Text ↔ Json ↔ Bytes. `Body::Stream` always fails.
173    /// Returns `TypeConversionFailed` if conversion is not possible.
174    ///
175    /// # Example
176    /// ```ignore
177    /// route.set_body(Value::String(r#"{"x":1}"#.into()))
178    ///      .convert_body_to(BodyType::Json)
179    ///      .to("direct:next")
180    /// ```
181    fn convert_body_to(mut self, target: BodyType) -> Self {
182        let svc = ConvertBodyTo::new(IdentityProcessor, target);
183        self.steps_mut()
184            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
185        self
186    }
187
188    fn stream_cache(mut self, threshold: usize) -> Self {
189        let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
190        let svc = StreamCacheService::new(IdentityProcessor, config);
191        self.steps_mut()
192            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
193        self
194    }
195
196    /// Materialize `Body::Stream` into `Body::Bytes` using the default threshold (128 KB).
197    ///
198    /// Equivalent to `.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)`.
199    fn stream_cache_default(self) -> Self {
200        self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
201    }
202
203    /// Marshal the message body using the specified data format.
204    ///
205    /// Supported formats: `"json"`, `"xml"`. Returns `Err(CamelError::Config)` if
206    /// the format name is unknown.
207    /// Converts a structured body (e.g., `Body::Json`) to a wire-format body (e.g., `Body::Text`).
208    ///
209    /// # Example
210    /// ```ignore
211    /// route.marshal("json")?.to("direct:next")
212    /// ```
213    fn marshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
214        let name = format.into();
215        let df = builtin_data_format(&name)
216            .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
217        let svc = MarshalService::new(IdentityProcessor, df);
218        self.steps_mut()
219            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
220        Ok(self)
221    }
222
223    /// Unmarshal the message body using the specified data format.
224    ///
225    /// Supported formats: `"json"`, `"xml"`. Returns `Err(CamelError::Config)` if
226    /// the format name is unknown.
227    /// Converts a wire-format body (e.g., `Body::Text`) to a structured body (e.g., `Body::Json`).
228    ///
229    /// # Example
230    /// ```ignore
231    /// route.unmarshal("json")?.to("direct:next")
232    /// ```
233    fn unmarshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
234        let name = format.into();
235        let df = builtin_data_format(&name)
236            .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
237        let svc = UnmarshalService::new(IdentityProcessor, df);
238        self.steps_mut()
239            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
240        Ok(self)
241    }
242
243    /// Validate the exchange body against a schema file.
244    ///
245    /// Shorthand for `to("validator:path")`. Supports XSD, JSON Schema, and YAML.
246    ///
247    /// # Example
248    /// ```ignore
249    /// route.validate("schemas/order.xsd").to("direct:out")
250    /// ```
251    fn validate(mut self, schema_path: impl Into<String>) -> Self {
252        let path = schema_path.into();
253        let uri = if path.starts_with("validator:") {
254            path
255        } else {
256            format!("validator:{path}")
257        };
258        self.steps_mut().push(BuilderStep::To(uri));
259        self
260    }
261
262    /// Execute a script that can modify the exchange (headers, properties, body).
263    ///
264    /// The script has access to `headers`, `properties`, and `body` variables
265    /// and can modify them with assignment syntax: `headers["k"] = v`.
266    ///
267    /// # Example
268    /// ```ignore
269    /// // ignore: requires full CamelContext setup with registered language
270    /// route.script("rhai", r#"headers["tenant"] = "acme"; body = body + "_processed""#)
271    /// ```
272    fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
273        self.steps_mut().push(BuilderStep::Script {
274            language: language.into(),
275            script: script.into(),
276        });
277        self
278    }
279
280    /// EIP-7 enrich: synchronous content enrichment via a resolved producer.
281    ///
282    /// Calls the given endpoint URI as a producer, then merges the response
283    /// back into the original exchange body using the default `UseEnrichedBody`
284    /// strategy (original headers/properties are preserved).
285    fn enrich(mut self, uri: impl Into<String>) -> Self {
286        self.steps_mut().push(BuilderStep::Enrich {
287            uri: uri.into(),
288            strategy: None,
289            timeout_ms: None,
290        });
291        self
292    }
293
294    /// EIP-7 pollEnrich: blocking poll of a PollingConsumer with timeout.
295    ///
296    /// Reads from a polling endpoint (e.g., file) and merges the result
297    /// into the exchange body using the default `UseEnrichedBody` strategy.
298    /// `timeout_ms` controls how long to wait for data.
299    fn poll_enrich(mut self, uri: impl Into<String>, timeout_ms: u64) -> Self {
300        self.steps_mut().push(BuilderStep::PollEnrich {
301            uri: uri.into(),
302            strategy: None,
303            timeout_ms: Some(timeout_ms),
304        });
305        self
306    }
307
308    fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
309        self.steps_mut().push(BuilderStep::Bean {
310            name: name.into(),
311            method: method.into(),
312        });
313        self
314    }
315}
316
317/// A fluent builder for constructing routes.
318///
319/// # Example
320///
321/// ```ignore
322/// let definition = RouteBuilder::from("timer:tick?period=1000")
323///     .set_header("source", Value::String("timer".into()))
324///     .filter(|ex| ex.input.body.as_text().is_some())
325///     .to("log:info?showHeaders=true")
326///     .build()?;
327/// ```
328// TODO(BUILDER-003): RouteBuilder does not support clone-and-reuse semantics.
329// Once built, the builder is consumed. Add `Clone` or a `fork()` method to allow
330// reusing a partially-built route as a template for multiple routes.
331pub struct RouteBuilder {
332    from_uri: String,
333    steps: Vec<BuilderStep>,
334    error_handler: Option<ErrorHandlerConfig>,
335    error_handler_mode: ErrorHandlerMode,
336    circuit_breaker_config: Option<CircuitBreakerConfig>,
337    security_policy_config: Option<camel_api::security_policy::SecurityPolicyConfig>,
338    security_authenticator: Option<std::sync::Arc<dyn camel_auth::TokenAuthenticator>>,
339    concurrency: Option<ConcurrencyModel>,
340    route_id: Option<String>,
341    auto_startup: Option<bool>,
342    startup_order: Option<i32>,
343}
344
345#[derive(Default)]
346enum ErrorHandlerMode {
347    #[default]
348    None,
349    ExplicitConfig,
350    Shorthand {
351        dlc_uri: Option<String>,
352        specs: Vec<OnExceptionSpec>,
353    },
354    Mixed,
355}
356
357#[derive(Clone)]
358struct OnExceptionSpec {
359    matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
360    retry: Option<RedeliveryPolicy>,
361    handled_by: Option<String>,
362}
363
364impl RouteBuilder {
365    /// Start building a route from the given source endpoint URI.
366    pub fn from(endpoint: &str) -> Self {
367        Self {
368            from_uri: endpoint.to_string(),
369            steps: Vec::new(),
370            error_handler: None,
371            error_handler_mode: ErrorHandlerMode::None,
372            circuit_breaker_config: None,
373            security_policy_config: None,
374            security_authenticator: None,
375            concurrency: None,
376            route_id: None,
377            auto_startup: None,
378            startup_order: None,
379        }
380    }
381
382    /// Open a filter scope. Only exchanges matching `predicate` will be processed
383    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
384    /// and continue to steps after `.end_filter()`.
385    pub fn filter<F>(self, predicate: F) -> FilterBuilder
386    where
387        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
388    {
389        FilterBuilder {
390            parent: self,
391            predicate: std::sync::Arc::new(predicate),
392            steps: vec![],
393        }
394    }
395
396    /// Open a choice scope for content-based routing.
397    ///
398    /// Within the choice, you can define multiple `.when()` clauses and an
399    /// optional `.otherwise()` clause. The first matching `when` predicate
400    /// determines which sub-pipeline executes.
401    pub fn choice(self) -> ChoiceBuilder {
402        ChoiceBuilder {
403            parent: self,
404            whens: vec![],
405            _otherwise: None,
406        }
407    }
408
409    /// Add a WireTap step that sends a clone of the exchange to the given
410    /// endpoint URI (fire-and-forget). The original exchange continues
411    /// downstream unchanged.
412    pub fn wire_tap(mut self, endpoint: &str) -> Self {
413        self.steps.push(BuilderStep::WireTap {
414            uri: endpoint.to_string(),
415        });
416        self
417    }
418
419    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
420    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
421        self.error_handler_mode = match self.error_handler_mode {
422            ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
423                ErrorHandlerMode::ExplicitConfig
424            }
425            ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
426        };
427        self.error_handler = Some(config);
428        self
429    }
430
431    /// Set a dead letter channel URI for shorthand error handler mode.
432    pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
433        let uri = uri.into();
434        self.error_handler_mode = match self.error_handler_mode {
435            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
436                dlc_uri: Some(uri),
437                specs: Vec::new(),
438            },
439            ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
440                dlc_uri: Some(uri),
441                specs,
442            },
443            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
444        };
445        self
446    }
447
448    /// Add a shorthand exception policy scope. Call `.end_on_exception()` to return to route builder.
449    pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
450    where
451        F: Fn(&CamelError) -> bool + Send + Sync + 'static,
452    {
453        self.error_handler_mode = match self.error_handler_mode {
454            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
455                dlc_uri: None,
456                specs: Vec::new(),
457            },
458            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
459            shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
460        };
461
462        OnExceptionBuilder {
463            parent: self,
464            policy: OnExceptionSpec {
465                matches: std::sync::Arc::new(matches),
466                retry: None,
467                handled_by: None,
468            },
469        }
470    }
471
472    /// Set a circuit breaker for this route.
473    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
474        self.circuit_breaker_config = Some(config);
475        self
476    }
477
478    pub fn security_policy(
479        mut self,
480        config: camel_api::security_policy::SecurityPolicyConfig,
481    ) -> Self {
482        self.security_policy_config = Some(config);
483        self
484    }
485
486    pub fn security_authenticator(
487        mut self,
488        auth: std::sync::Arc<dyn camel_auth::TokenAuthenticator>,
489    ) -> Self {
490        self.security_authenticator = Some(auth);
491        self
492    }
493
494    /// Override the consumer's default concurrency model.
495    ///
496    /// When set, the pipeline spawns a task per exchange, processing them
497    /// concurrently. `max` limits the number of simultaneously active
498    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
499    ///
500    /// # Example
501    /// ```ignore
502    /// RouteBuilder::from("http://0.0.0.0:8080/api")
503    ///     .concurrent(16)  // max 16 in-flight pipeline executions
504    ///     .process(handle_request)
505    ///     .build()
506    /// ```
507    pub fn concurrent(mut self, max: usize) -> Self {
508        let max = if max == 0 { None } else { Some(max) };
509        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
510        self
511    }
512
513    /// Force sequential processing, overriding a concurrent-capable consumer.
514    ///
515    /// Useful for HTTP routes that mutate shared state and need ordering
516    /// guarantees.
517    pub fn sequential(mut self) -> Self {
518        self.concurrency = Some(ConcurrencyModel::Sequential);
519        self
520    }
521
522    /// Set the route ID for this route.
523    ///
524    /// If not set, the route will be assigned an auto-generated ID.
525    pub fn route_id(mut self, id: impl Into<String>) -> Self {
526        self.route_id = Some(id.into());
527        self
528    }
529
530    /// Set whether this route should automatically start when the context starts.
531    ///
532    /// Default is `true`.
533    pub fn auto_startup(mut self, auto: bool) -> Self {
534        self.auto_startup = Some(auto);
535        self
536    }
537
538    /// Set the startup order for this route.
539    ///
540    /// Routes with lower values start first. Default is 1000.
541    pub fn startup_order(mut self, order: i32) -> Self {
542        self.startup_order = Some(order);
543        self
544    }
545
546    /// Begin a Splitter sub-pipeline. Steps added after this call (until
547    /// `.end_split()`) will be executed per-fragment.
548    ///
549    /// Returns a `SplitBuilder` — you cannot call `.build()` until
550    /// `.end_split()` closes the split scope (enforced by the type system).
551    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
552        SplitBuilder {
553            parent: self,
554            config,
555            steps: Vec::new(),
556        }
557    }
558
559    /// Begin a Multicast sub-pipeline. Steps added after this call (until
560    /// `.end_multicast()`) will each receive a copy of the exchange.
561    ///
562    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
563    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
564    pub fn multicast(self) -> MulticastBuilder {
565        MulticastBuilder {
566            parent: self,
567            steps: Vec::new(),
568            config: MulticastConfig::new(),
569        }
570    }
571
572    /// Begin a Throttle sub-pipeline. Rate limits message processing to at most
573    /// `max_requests` per `period`. Steps inside the throttle scope are only
574    /// executed when the rate limit allows.
575    ///
576    /// Returns a `ThrottleBuilder` — you cannot call `.build()` until
577    /// `.end_throttle()` closes the throttle scope (enforced by the type system).
578    pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
579        ThrottleBuilder {
580            parent: self,
581            config: ThrottlerConfig::new(max_requests, period),
582            steps: Vec::new(),
583        }
584    }
585
586    /// Begin a Loop sub-pipeline that iterates a fixed number of times.
587    pub fn loop_count(self, count: usize) -> LoopBuilder {
588        LoopBuilder {
589            parent: self,
590            config: LoopConfig {
591                mode: LoopMode::Count(count),
592            },
593            steps: vec![],
594        }
595    }
596
597    /// Begin a Loop sub-pipeline that iterates while a predicate is true.
598    pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
599    where
600        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
601    {
602        LoopBuilder {
603            parent: self,
604            config: LoopConfig {
605                mode: LoopMode::While(std::sync::Arc::new(predicate)),
606            },
607            steps: vec![],
608        }
609    }
610
611    /// Begin a LoadBalance sub-pipeline. Distributes exchanges across multiple
612    /// endpoints using a configurable strategy (round-robin, random, weighted, failover).
613    ///
614    /// Returns a `LoadBalancerBuilder` — you cannot call `.build()` until
615    /// `.end_load_balance()` closes the load balance scope (enforced by the type system).
616    pub fn load_balance(self) -> LoadBalancerBuilder {
617        LoadBalancerBuilder {
618            parent: self,
619            config: LoadBalancerConfig::round_robin(),
620            steps: Vec::new(),
621        }
622    }
623
624    /// Add a dynamic router step that routes exchanges dynamically based on
625    /// expression evaluation at runtime.
626    ///
627    /// The expression receives the exchange and returns `Some(uri)` to route to
628    /// the next endpoint, or `None` to stop routing.
629    ///
630    /// # Example
631    /// ```ignore
632    /// RouteBuilder::from("timer:tick")
633    ///     .route_id("test-route")
634    ///     .dynamic_router(|ex| {
635    ///         ex.input.header("dest").and_then(|v| v.as_str().map(|s| s.to_string()))
636    ///     })
637    ///     .build()
638    /// ```
639    pub fn dynamic_router(self, expression: RouterExpression) -> Self {
640        self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
641    }
642
643    /// Add a dynamic router step with full configuration.
644    ///
645    /// Allows customization of URI delimiter, cache size, timeout, and other options.
646    pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
647        self.steps.push(BuilderStep::DynamicRouter { config });
648        self
649    }
650
651    pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
652        self.routing_slip_with_config(RoutingSlipConfig::new(expression))
653    }
654
655    pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
656        self.steps.push(BuilderStep::RoutingSlip { config });
657        self
658    }
659
660    pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
661        self.recipient_list_with_config(RecipientListConfig::new(expression))
662    }
663
664    pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
665        self.steps.push(BuilderStep::RecipientList { config });
666        self
667    }
668
669    /// Consume the builder and produce a [`RouteDefinition`].
670    // TODO(BUILDER-006): Validate duplicate route IDs. When a route with the same
671    // ID is already registered in the context, return `Err(CamelError::RouteError)`.
672    // Currently, duplicate IDs are silently accepted; detection should happen at
673    // `CamelContext::add_route_definition` time.
674    pub fn build(self) -> Result<RouteDefinition, CamelError> {
675        validate_uri(&self.from_uri)?;
676        let route_id = self
677            .route_id
678            .filter(|s| !s.trim().is_empty())
679            .ok_or_else(|| {
680                CamelError::RouteError(
681                    "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
682                        .to_string(),
683                )
684            })?;
685        let resolved_error_handler = match self.error_handler_mode {
686            ErrorHandlerMode::None => self.error_handler,
687            ErrorHandlerMode::ExplicitConfig => self.error_handler,
688            ErrorHandlerMode::Mixed => {
689                return Err(CamelError::RouteError(
690                    "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
691                ));
692            }
693            ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
694                let mut config = if let Some(uri) = dlc_uri {
695                    ErrorHandlerConfig::dead_letter_channel(uri)
696                } else {
697                    ErrorHandlerConfig::log_only()
698                };
699
700                for spec in specs {
701                    let matcher = spec.matches.clone();
702                    let mut builder = config.on_exception(move |e| matcher(e));
703
704                    if let Some(retry) = spec.retry {
705                        builder = builder.retry(retry.max_attempts).with_backoff(
706                            retry.initial_delay,
707                            retry.multiplier,
708                            retry.max_delay,
709                        );
710                        if retry.jitter_factor > 0.0 {
711                            builder = builder.with_jitter(retry.jitter_factor);
712                        }
713                    }
714
715                    if let Some(uri) = spec.handled_by {
716                        builder = builder.handled_by(uri);
717                    }
718
719                    config = builder.build();
720                }
721
722                Some(config)
723            }
724        };
725
726        let definition = RouteDefinition::new(self.from_uri, self.steps);
727        let definition = if let Some(eh) = resolved_error_handler {
728            definition.with_error_handler(eh)
729        } else {
730            definition
731        };
732        let definition = if let Some(cb) = self.circuit_breaker_config {
733            definition.with_circuit_breaker(cb)
734        } else {
735            definition
736        };
737        let definition = if let Some(sp) = self.security_policy_config {
738            definition.with_security_policy(sp)
739        } else {
740            definition
741        };
742        let definition = if let Some(auth) = self.security_authenticator {
743            definition.with_security_authenticator(auth)
744        } else {
745            definition
746        };
747        let definition = if let Some(concurrency) = self.concurrency {
748            definition.with_concurrency(concurrency)
749        } else {
750            definition
751        };
752        let definition = definition.with_route_id(route_id);
753        let definition = if let Some(auto) = self.auto_startup {
754            definition.with_auto_startup(auto)
755        } else {
756            definition
757        };
758        let definition = if let Some(order) = self.startup_order {
759            definition.with_startup_order(order)
760        } else {
761            definition
762        };
763        Ok(definition)
764    }
765
766    /// Compile this builder route into canonical spec.
767    pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
768        validate_uri(&self.from_uri)?;
769        let route_id = self
770            .route_id
771            .filter(|s| !s.trim().is_empty())
772            .ok_or_else(|| {
773                CamelError::RouteError(
774                    "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
775                        .to_string(),
776                )
777            })?;
778
779        let steps = canonicalize_steps(self.steps)?;
780        let circuit_breaker = self
781            .circuit_breaker_config
782            .map(canonicalize_circuit_breaker);
783
784        if self.security_policy_config.is_some() {
785            return Err(CamelError::RouteError(
786                "routes with security_policy cannot use the canonical/hot-reload path (not yet supported)"
787                    .into(),
788            ));
789        }
790
791        let spec = CanonicalRouteSpec {
792            route_id,
793            from: self.from_uri,
794            steps,
795            circuit_breaker,
796            auto_startup: None,
797            startup_order: None,
798            concurrency: None,
799            version: camel_api::CANONICAL_CONTRACT_VERSION,
800        };
801        spec.validate_contract()?;
802        Ok(spec)
803    }
804}
805
806pub struct OnExceptionBuilder {
807    parent: RouteBuilder,
808    policy: OnExceptionSpec,
809}
810
811impl OnExceptionBuilder {
812    pub fn retry(mut self, max_attempts: u32) -> Self {
813        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
814        self
815    }
816
817    pub fn with_backoff(
818        mut self,
819        initial: std::time::Duration,
820        multiplier: f64,
821        max: std::time::Duration,
822    ) -> Self {
823        if let Some(ref mut retry) = self.policy.retry {
824            retry.initial_delay = initial;
825            retry.multiplier = multiplier;
826            retry.max_delay = max;
827        } else {
828            tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
829        }
830        self
831    }
832
833    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
834        if let Some(ref mut retry) = self.policy.retry {
835            retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
836        } else {
837            tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
838        }
839        self
840    }
841
842    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
843        self.policy.handled_by = Some(uri.into());
844        self
845    }
846
847    pub fn end_on_exception(mut self) -> RouteBuilder {
848        if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
849            specs.push(self.policy);
850        }
851        self.parent
852    }
853}
854
855/// Validate that a URI is non-empty and contains a scheme component.
856fn validate_uri(uri: &str) -> Result<(), CamelError> {
857    let trimmed = uri.trim();
858    if trimmed.is_empty() {
859        return Err(CamelError::RouteError(
860            "route must have a 'from' URI".to_string(),
861        ));
862    }
863    if !trimmed.contains(':') {
864        return Err(CamelError::RouteError(
865            "URI must have a scheme (e.g. 'timer:tick')".to_string(),
866        ));
867    }
868    let scheme = trimmed.split(':').next().unwrap_or("");
869    if scheme.trim().is_empty() {
870        return Err(CamelError::RouteError(
871            "URI scheme must not be empty".to_string(),
872        ));
873    }
874    Ok(())
875}
876
877fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
878    let mut canonical = Vec::with_capacity(steps.len());
879    for step in steps {
880        canonical.push(canonicalize_step(step)?);
881    }
882    Ok(canonical)
883}
884
885fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
886    match step {
887        BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
888        BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
889        BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
890        BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
891        BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
892            delay_ms: config.delay_ms,
893            dynamic_header: config.dynamic_header,
894        }),
895        BuilderStep::DeclarativeScript { expression } => {
896            Ok(CanonicalStepSpec::Script { expression })
897        }
898        BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
899            predicate,
900            steps: canonicalize_steps(steps)?,
901        }),
902        BuilderStep::DeclarativeChoice { whens, otherwise } => {
903            let mut canonical_whens = Vec::with_capacity(whens.len());
904            for DeclarativeWhenStep { predicate, steps } in whens {
905                canonical_whens.push(CanonicalWhenSpec {
906                    predicate,
907                    steps: canonicalize_steps(steps)?,
908                });
909            }
910            let otherwise = match otherwise {
911                Some(steps) => Some(canonicalize_steps(steps)?),
912                None => None,
913            };
914            Ok(CanonicalStepSpec::Choice {
915                whens: canonical_whens,
916                otherwise,
917            })
918        }
919        BuilderStep::DeclarativeSplit {
920            expression,
921            aggregation,
922            parallel,
923            parallel_limit,
924            stop_on_exception,
925            steps,
926        } => Ok(CanonicalStepSpec::Split {
927            expression: CanonicalSplitExpressionSpec::Language(expression),
928            aggregation: canonicalize_split_aggregation(aggregation)?,
929            parallel,
930            parallel_limit,
931            stop_on_exception,
932            steps: canonicalize_steps(steps)?,
933        }),
934        BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate(
935            canonicalize_aggregate(config)?,
936        )),
937        other => {
938            let step_name = canonical_step_name(&other);
939            let detail = camel_api::canonical_contract_rejection_reason(step_name)
940                .unwrap_or("not included in canonical v1");
941            Err(CamelError::RouteError(format!(
942                "canonical v1 does not support step `{step_name}`: {detail}"
943            )))
944        }
945    }
946}
947
948fn canonicalize_split_aggregation(
949    strategy: camel_api::splitter::AggregationStrategy,
950) -> Result<CanonicalSplitAggregationSpec, CamelError> {
951    match strategy {
952        camel_api::splitter::AggregationStrategy::LastWins => {
953            Ok(CanonicalSplitAggregationSpec::LastWins)
954        }
955        camel_api::splitter::AggregationStrategy::CollectAll => {
956            Ok(CanonicalSplitAggregationSpec::CollectAll)
957        }
958        camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
959            "canonical v1 does not support custom split aggregation".to_string(),
960        )),
961        camel_api::splitter::AggregationStrategy::Original => {
962            Ok(CanonicalSplitAggregationSpec::Original)
963        }
964    }
965}
966
967fn extract_completion_fields(
968    mode: &CompletionMode,
969) -> Result<(Option<usize>, Option<u64>), CamelError> {
970    match mode {
971        CompletionMode::Single(cond) => match cond {
972            CompletionCondition::Size(n) => Ok((Some(*n), None)),
973            CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
974            CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
975                "canonical v1 does not support aggregate predicate completion".to_string(),
976            )),
977        },
978        CompletionMode::Any(conds) => {
979            let mut size = None;
980            let mut timeout_ms = None;
981            for cond in conds {
982                match cond {
983                    CompletionCondition::Size(n) => size = Some(*n),
984                    CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
985                    CompletionCondition::Predicate(_) => {
986                        return Err(CamelError::RouteError(
987                            "canonical v1 does not support aggregate predicate completion"
988                                .to_string(),
989                        ));
990                    }
991                }
992            }
993            Ok((size, timeout_ms))
994        }
995    }
996}
997
998fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
999    let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
1000
1001    let header = match &config.correlation {
1002        CorrelationStrategy::HeaderName(h) => h.clone(),
1003        CorrelationStrategy::Expression { expr, .. } => expr.clone(),
1004        CorrelationStrategy::Fn(_) => {
1005            return Err(CamelError::RouteError(
1006                "canonical v1 does not support Fn correlation strategy".to_string(),
1007            ));
1008        }
1009    };
1010
1011    let correlation_key = match &config.correlation {
1012        CorrelationStrategy::HeaderName(_) => None,
1013        CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
1014        CorrelationStrategy::Fn(_) => unreachable!(),
1015    };
1016
1017    let strategy = match config.strategy {
1018        AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
1019        AggregationStrategy::Custom(_) => {
1020            return Err(CamelError::RouteError(
1021                "canonical v1 does not support custom aggregate strategy".to_string(),
1022            ));
1023        }
1024    };
1025    let bucket_ttl_ms = config
1026        .bucket_ttl
1027        .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
1028
1029    Ok(CanonicalAggregateSpec {
1030        header,
1031        completion_size,
1032        completion_timeout_ms,
1033        correlation_key,
1034        force_completion_on_stop: if config.force_completion_on_stop {
1035            Some(true)
1036        } else {
1037            None
1038        },
1039        discard_on_timeout: if config.discard_on_timeout {
1040            Some(true)
1041        } else {
1042            None
1043        },
1044        strategy,
1045        max_buckets: config.max_buckets,
1046        bucket_ttl_ms,
1047    })
1048}
1049
1050fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
1051    CanonicalCircuitBreakerSpec {
1052        failure_threshold: config.failure_threshold,
1053        open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
1054    }
1055}
1056
1057fn canonical_step_name(step: &BuilderStep) -> &'static str {
1058    match step {
1059        BuilderStep::Processor(_) => "processor",
1060        BuilderStep::To(_) => "to",
1061        BuilderStep::Stop => "stop",
1062        BuilderStep::Log { .. } => "log",
1063        BuilderStep::DeclarativeSetHeader { .. } => "set_header",
1064        BuilderStep::DeclarativeSetBody { .. } => "set_body",
1065        BuilderStep::DeclarativeFilter { .. } => "filter",
1066        BuilderStep::DeclarativeChoice { .. } => "choice",
1067        BuilderStep::DeclarativeScript { .. } => "script",
1068        BuilderStep::DeclarativeFunction { .. } => "function",
1069        BuilderStep::DeclarativeSplit { .. } => "split",
1070        BuilderStep::Split { .. } => "split",
1071        BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
1072        BuilderStep::Aggregate { .. } => "aggregate",
1073        BuilderStep::Filter { .. } => "filter",
1074        BuilderStep::Choice { .. } => "choice",
1075        BuilderStep::WireTap { .. } => "wire_tap",
1076        BuilderStep::Delay { .. } => "delay",
1077        BuilderStep::Multicast { .. } => "multicast",
1078        BuilderStep::DeclarativeLog { .. } => "log",
1079        BuilderStep::Bean { .. } => "bean",
1080        BuilderStep::Script { .. } => "script",
1081        BuilderStep::Throttle { .. } => "throttle",
1082        BuilderStep::LoadBalance { .. } => "load_balancer",
1083        BuilderStep::DynamicRouter { .. } => "dynamic_router",
1084        BuilderStep::RoutingSlip { .. } => "routing_slip",
1085        BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
1086        BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
1087        BuilderStep::RecipientList { .. } => "recipient_list",
1088        BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
1089        BuilderStep::DeclarativeSetProperty { .. } => "set_property",
1090        BuilderStep::Enrich { .. } => "enrich",
1091        BuilderStep::PollEnrich { .. } => "poll_enrich",
1092    }
1093}
1094
1095impl StepAccumulator for RouteBuilder {
1096    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1097        &mut self.steps
1098    }
1099}
1100
1101/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
1102///
1103/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1104/// but NOT `.build()` and NOT `.split()` (no nested splits).
1105///
1106/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
1107/// and returns the parent `RouteBuilder`.
1108pub struct SplitBuilder {
1109    parent: RouteBuilder,
1110    config: SplitterConfig,
1111    steps: Vec<BuilderStep>,
1112}
1113
1114impl SplitBuilder {
1115    /// Open a filter scope within the split sub-pipeline.
1116    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1117    where
1118        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1119    {
1120        FilterInSplitBuilder {
1121            parent: self,
1122            predicate: std::sync::Arc::new(predicate),
1123            steps: vec![],
1124        }
1125    }
1126
1127    /// Close the split scope. Packages the accumulated sub-steps into a
1128    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
1129    pub fn end_split(mut self) -> RouteBuilder {
1130        let split_step = BuilderStep::Split {
1131            config: self.config,
1132            steps: self.steps,
1133        };
1134        self.parent.steps.push(split_step);
1135        self.parent
1136    }
1137}
1138
1139impl StepAccumulator for SplitBuilder {
1140    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1141        &mut self.steps
1142    }
1143}
1144
1145/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
1146pub struct FilterBuilder {
1147    parent: RouteBuilder,
1148    predicate: FilterPredicate,
1149    steps: Vec<BuilderStep>,
1150}
1151
1152impl FilterBuilder {
1153    /// Close the filter scope. Packages the accumulated sub-steps into a
1154    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
1155    pub fn end_filter(mut self) -> RouteBuilder {
1156        let step = BuilderStep::Filter {
1157            predicate: self.predicate,
1158            steps: self.steps,
1159        };
1160        self.parent.steps.push(step);
1161        self.parent
1162    }
1163}
1164
1165impl StepAccumulator for FilterBuilder {
1166    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1167        &mut self.steps
1168    }
1169}
1170
1171/// Builder for a filter scope nested inside a `.split()` block.
1172pub struct FilterInSplitBuilder {
1173    parent: SplitBuilder,
1174    predicate: FilterPredicate,
1175    steps: Vec<BuilderStep>,
1176}
1177
1178impl FilterInSplitBuilder {
1179    /// Close the filter scope and return the parent `SplitBuilder`.
1180    pub fn end_filter(mut self) -> SplitBuilder {
1181        let step = BuilderStep::Filter {
1182            predicate: self.predicate,
1183            steps: self.steps,
1184        };
1185        self.parent.steps.push(step);
1186        self.parent
1187    }
1188}
1189
1190impl StepAccumulator for FilterInSplitBuilder {
1191    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1192        &mut self.steps
1193    }
1194}
1195
1196// ── Choice/When/Otherwise builders ─────────────────────────────────────────
1197
1198/// Builder for a `.choice()` ... `.end_choice()` block.
1199///
1200/// Accumulates `when` clauses and an optional `otherwise` clause.
1201/// Cannot call `.build()` until `.end_choice()` is called.
1202pub struct ChoiceBuilder {
1203    parent: RouteBuilder,
1204    whens: Vec<WhenStep>,
1205    _otherwise: Option<Vec<BuilderStep>>,
1206}
1207
1208impl ChoiceBuilder {
1209    /// Open a `when` clause. Only exchanges matching `predicate` will be
1210    /// processed by the steps inside the `.when()` ... `.end_when()` scope.
1211    pub fn when<F>(self, predicate: F) -> WhenBuilder
1212    where
1213        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1214    {
1215        WhenBuilder {
1216            parent: self,
1217            predicate: std::sync::Arc::new(predicate),
1218            steps: vec![],
1219        }
1220    }
1221
1222    /// Open an `otherwise` clause. Executed when no `when` predicate matched.
1223    ///
1224    /// Only one `otherwise` is allowed per `choice`. Call this after all `.when()` clauses.
1225    pub fn otherwise(self) -> OtherwiseBuilder {
1226        OtherwiseBuilder {
1227            parent: self,
1228            steps: vec![],
1229        }
1230    }
1231
1232    /// Close the choice scope. Packages all accumulated `when` clauses and
1233    /// optional `otherwise` into a `BuilderStep::Choice` and returns the
1234    /// parent `RouteBuilder`.
1235    pub fn end_choice(mut self) -> RouteBuilder {
1236        let step = BuilderStep::Choice {
1237            whens: self.whens,
1238            otherwise: self._otherwise,
1239        };
1240        self.parent.steps.push(step);
1241        self.parent
1242    }
1243}
1244
1245/// Builder for the sub-pipeline within a `.when()` ... `.end_when()` block.
1246pub struct WhenBuilder {
1247    parent: ChoiceBuilder,
1248    predicate: camel_api::FilterPredicate,
1249    steps: Vec<BuilderStep>,
1250}
1251
1252impl WhenBuilder {
1253    /// Close the when scope. Packages the accumulated sub-steps into a
1254    /// `WhenStep` and returns the parent `ChoiceBuilder`.
1255    pub fn end_when(mut self) -> ChoiceBuilder {
1256        self.parent.whens.push(WhenStep {
1257            predicate: self.predicate,
1258            steps: self.steps,
1259        });
1260        self.parent
1261    }
1262}
1263
1264impl StepAccumulator for WhenBuilder {
1265    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1266        &mut self.steps
1267    }
1268}
1269
1270/// Builder for the sub-pipeline within an `.otherwise()` ... `.end_otherwise()` block.
1271pub struct OtherwiseBuilder {
1272    parent: ChoiceBuilder,
1273    steps: Vec<BuilderStep>,
1274}
1275
1276impl OtherwiseBuilder {
1277    /// Close the otherwise scope and return the parent `ChoiceBuilder`.
1278    pub fn end_otherwise(self) -> ChoiceBuilder {
1279        let OtherwiseBuilder { mut parent, steps } = self;
1280        parent._otherwise = Some(steps);
1281        parent
1282    }
1283}
1284
1285impl StepAccumulator for OtherwiseBuilder {
1286    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1287        &mut self.steps
1288    }
1289}
1290
1291/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
1292///
1293/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1294/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
1295///
1296/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
1297/// and returns the parent `RouteBuilder`.
1298pub struct MulticastBuilder {
1299    parent: RouteBuilder,
1300    steps: Vec<BuilderStep>,
1301    config: MulticastConfig,
1302}
1303
1304impl MulticastBuilder {
1305    pub fn parallel(mut self, parallel: bool) -> Self {
1306        self.config = self.config.parallel(parallel);
1307        self
1308    }
1309
1310    pub fn parallel_limit(mut self, limit: usize) -> Self {
1311        self.config = self.config.parallel_limit(limit);
1312        self
1313    }
1314
1315    pub fn stop_on_exception(mut self, stop: bool) -> Self {
1316        self.config = self.config.stop_on_exception(stop);
1317        self
1318    }
1319
1320    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1321        self.config = self.config.timeout(duration);
1322        self
1323    }
1324
1325    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1326        self.config = self.config.aggregation(strategy);
1327        self
1328    }
1329
1330    pub fn end_multicast(mut self) -> RouteBuilder {
1331        let step = BuilderStep::Multicast {
1332            steps: self.steps,
1333            config: self.config,
1334        };
1335        self.parent.steps.push(step);
1336        self.parent
1337    }
1338}
1339
1340impl StepAccumulator for MulticastBuilder {
1341    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1342        &mut self.steps
1343    }
1344}
1345
1346/// Builder for the sub-pipeline within a `.throttle()` ... `.end_throttle()` block.
1347///
1348/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1349/// but NOT `.build()` and NOT `.throttle()` (no nested throttles).
1350///
1351/// Calling `.end_throttle()` packages the sub-steps into a `BuilderStep::Throttle`
1352/// and returns the parent `RouteBuilder`.
1353pub struct ThrottleBuilder {
1354    parent: RouteBuilder,
1355    config: ThrottlerConfig,
1356    steps: Vec<BuilderStep>,
1357}
1358
1359impl ThrottleBuilder {
1360    /// Set the throttle strategy. Default is `Delay`.
1361    ///
1362    /// - `Delay`: Queue messages until capacity available
1363    /// - `Reject`: Return error immediately when throttled
1364    /// - `Drop`: Silently discard excess messages
1365    pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1366        self.config = self.config.strategy(strategy);
1367        self
1368    }
1369
1370    /// Close the throttle scope. Packages the accumulated sub-steps into a
1371    /// `BuilderStep::Throttle` and returns the parent `RouteBuilder`.
1372    pub fn end_throttle(mut self) -> RouteBuilder {
1373        let step = BuilderStep::Throttle {
1374            config: self.config,
1375            steps: self.steps,
1376        };
1377        self.parent.steps.push(step);
1378        self.parent
1379    }
1380}
1381
1382impl StepAccumulator for ThrottleBuilder {
1383    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1384        &mut self.steps
1385    }
1386}
1387
1388/// Builder for the sub-pipeline within a `.loop_count()` / `.loop_while()` ... `.end_loop()` block.
1389pub struct LoopBuilder {
1390    parent: RouteBuilder,
1391    config: LoopConfig,
1392    steps: Vec<BuilderStep>,
1393}
1394
1395impl LoopBuilder {
1396    pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1397        LoopInLoopBuilder {
1398            parent: self,
1399            config: LoopConfig {
1400                mode: LoopMode::Count(count),
1401            },
1402            steps: vec![],
1403        }
1404    }
1405
1406    pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1407    where
1408        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1409    {
1410        LoopInLoopBuilder {
1411            parent: self,
1412            config: LoopConfig {
1413                mode: LoopMode::While(std::sync::Arc::new(predicate)),
1414            },
1415            steps: vec![],
1416        }
1417    }
1418
1419    pub fn end_loop(mut self) -> RouteBuilder {
1420        let step = BuilderStep::Loop {
1421            config: self.config,
1422            steps: self.steps,
1423        };
1424        self.parent.steps.push(step);
1425        self.parent
1426    }
1427}
1428
1429impl StepAccumulator for LoopBuilder {
1430    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1431        &mut self.steps
1432    }
1433}
1434
1435pub struct LoopInLoopBuilder {
1436    parent: LoopBuilder,
1437    config: LoopConfig,
1438    steps: Vec<BuilderStep>,
1439}
1440
1441impl LoopInLoopBuilder {
1442    pub fn end_loop(mut self) -> LoopBuilder {
1443        let step = BuilderStep::Loop {
1444            config: self.config,
1445            steps: self.steps,
1446        };
1447        self.parent.steps.push(step);
1448        self.parent
1449    }
1450}
1451
1452impl StepAccumulator for LoopInLoopBuilder {
1453    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1454        &mut self.steps
1455    }
1456}
1457
1458/// Builder for the sub-pipeline within a `.load_balance()` ... `.end_load_balance()` block.
1459///
1460/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1461/// but NOT `.build()` and NOT `.load_balance()` (no nested load balancers).
1462///
1463/// Calling `.end_load_balance()` packages the sub-steps into a `BuilderStep::LoadBalance`
1464/// and returns the parent `RouteBuilder`.
1465pub struct LoadBalancerBuilder {
1466    parent: RouteBuilder,
1467    config: LoadBalancerConfig,
1468    steps: Vec<BuilderStep>,
1469}
1470
1471impl LoadBalancerBuilder {
1472    /// Set the load balance strategy to round-robin (default).
1473    pub fn round_robin(mut self) -> Self {
1474        self.config = LoadBalancerConfig::round_robin();
1475        self
1476    }
1477
1478    /// Set the load balance strategy to random selection.
1479    pub fn random(mut self) -> Self {
1480        self.config = LoadBalancerConfig::random();
1481        self
1482    }
1483
1484    /// Set the load balance strategy to weighted selection.
1485    ///
1486    /// Each endpoint is assigned a weight that determines its probability
1487    /// of being selected.
1488    pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1489        self.config = LoadBalancerConfig::weighted(weights);
1490        self
1491    }
1492
1493    /// Set the load balance strategy to failover.
1494    ///
1495    /// Exchanges are sent to the first endpoint; on failure, the next endpoint
1496    /// is tried.
1497    pub fn failover(mut self) -> Self {
1498        self.config = LoadBalancerConfig::failover();
1499        self
1500    }
1501
1502    /// Enable or disable parallel execution of endpoints.
1503    ///
1504    /// When enabled, all endpoints receive the exchange simultaneously.
1505    /// When disabled (default), only one endpoint is selected per exchange.
1506    pub fn parallel(mut self, parallel: bool) -> Self {
1507        self.config = self.config.parallel(parallel);
1508        self
1509    }
1510
1511    /// Close the load balance scope. Packages the accumulated sub-steps into a
1512    /// `BuilderStep::LoadBalance` and returns the parent `RouteBuilder`.
1513    pub fn end_load_balance(mut self) -> RouteBuilder {
1514        let step = BuilderStep::LoadBalance {
1515            config: self.config,
1516            steps: self.steps,
1517        };
1518        self.parent.steps.push(step);
1519        self.parent
1520    }
1521}
1522
1523impl StepAccumulator for LoadBalancerBuilder {
1524    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1525        &mut self.steps
1526    }
1527}
1528
1529// ---------------------------------------------------------------------------
1530// Tests
1531// ---------------------------------------------------------------------------
1532
1533#[cfg(test)]
1534mod tests {
1535    use super::*;
1536    use camel_api::error_handler::ErrorHandlerConfig;
1537    use camel_api::load_balancer::LoadBalanceStrategy;
1538    use camel_api::{Exchange, Message};
1539    use camel_core::route::BuilderStep;
1540    use std::sync::Arc;
1541    use std::time::Duration;
1542    use tower::{Service, ServiceExt};
1543
1544    #[test]
1545    fn test_builder_from_creates_definition() {
1546        let definition = RouteBuilder::from("timer:tick")
1547            .route_id("test-route")
1548            .build()
1549            .unwrap();
1550        assert_eq!(definition.from_uri(), "timer:tick");
1551    }
1552
1553    #[test]
1554    fn test_builder_empty_from_uri_errors() {
1555        let result = RouteBuilder::from("").route_id("test-route").build();
1556        assert!(result.is_err());
1557    }
1558
1559    #[test]
1560    fn test_build_rejects_schemeless_uri() {
1561        let result = RouteBuilder::from("no-scheme-here")
1562            .route_id("test-route")
1563            .build();
1564        match result {
1565            Err(err) => {
1566                let err_msg = format!("{err}");
1567                assert!(
1568                    err_msg.contains("scheme"),
1569                    "expected scheme-related error, got: {err_msg}"
1570                );
1571            }
1572            Ok(_) => panic!("schemeless URI should fail"),
1573        }
1574    }
1575
1576    #[test]
1577    fn test_build_rejects_empty_scheme_uri() {
1578        let result = RouteBuilder::from(":missing-scheme")
1579            .route_id("test-route")
1580            .build();
1581        match result {
1582            Err(err) => {
1583                let err_msg = format!("{err}");
1584                assert!(
1585                    err_msg.contains("scheme"),
1586                    "expected scheme-related error, got: {err_msg}"
1587                );
1588            }
1589            Ok(_) => panic!("empty-scheme URI should fail"),
1590        }
1591    }
1592
1593    #[test]
1594    fn test_build_accepts_valid_uri() {
1595        let result = RouteBuilder::from("timer:tick")
1596            .route_id("test-route")
1597            .build();
1598        assert!(result.is_ok());
1599    }
1600
1601    #[test]
1602    fn test_build_canonical_rejects_schemeless_uri() {
1603        let result = RouteBuilder::from("no-scheme-here")
1604            .route_id("test-route")
1605            .build_canonical();
1606        assert!(result.is_err());
1607    }
1608
1609    #[test]
1610    fn test_builder_to_adds_step() {
1611        let definition = RouteBuilder::from("timer:tick")
1612            .route_id("test-route")
1613            .to("log:info")
1614            .build()
1615            .unwrap();
1616
1617        assert_eq!(definition.from_uri(), "timer:tick");
1618        // We can verify steps were added by checking the structure
1619        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1620    }
1621
1622    #[test]
1623    fn test_builder_filter_adds_filter_step() {
1624        let definition = RouteBuilder::from("timer:tick")
1625            .route_id("test-route")
1626            .filter(|_ex| true)
1627            .to("mock:result")
1628            .end_filter()
1629            .build()
1630            .unwrap();
1631
1632        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1633    }
1634
1635    #[test]
1636    fn test_builder_set_header_adds_processor_step() {
1637        let definition = RouteBuilder::from("timer:tick")
1638            .route_id("test-route")
1639            .set_header("key", Value::String("value".into()))
1640            .build()
1641            .unwrap();
1642
1643        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1644    }
1645
1646    #[test]
1647    fn test_builder_map_body_adds_processor_step() {
1648        let definition = RouteBuilder::from("timer:tick")
1649            .route_id("test-route")
1650            .map_body(|body| body)
1651            .build()
1652            .unwrap();
1653
1654        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1655    }
1656
1657    #[test]
1658    fn test_builder_process_adds_processor_step() {
1659        let definition = RouteBuilder::from("timer:tick")
1660            .route_id("test-route")
1661            .process(|ex| async move { Ok(ex) })
1662            .build()
1663            .unwrap();
1664
1665        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1666    }
1667
1668    #[test]
1669    fn test_builder_chain_multiple_steps() {
1670        let definition = RouteBuilder::from("timer:tick")
1671            .route_id("test-route")
1672            .set_header("source", Value::String("timer".into()))
1673            .filter(|ex| ex.input.header("source").is_some())
1674            .to("log:info")
1675            .end_filter()
1676            .to("mock:result")
1677            .build()
1678            .unwrap();
1679
1680        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
1681        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
1682        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
1683        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1684    }
1685
1686    #[test]
1687    fn test_loop_count_builder() {
1688        use camel_api::loop_eip::LoopMode;
1689
1690        let def = RouteBuilder::from("direct:start")
1691            .route_id("loop-test")
1692            .loop_count(3)
1693            .to("mock:inside")
1694            .end_loop()
1695            .to("mock:after")
1696            .build()
1697            .unwrap();
1698
1699        assert_eq!(def.steps().len(), 2);
1700        match &def.steps()[0] {
1701            BuilderStep::Loop { config, steps } => {
1702                assert!(matches!(config.mode, LoopMode::Count(3)));
1703                assert_eq!(steps.len(), 1);
1704            }
1705            other => panic!("Expected Loop, got {:?}", other),
1706        }
1707        assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1708    }
1709
1710    #[test]
1711    fn test_loop_while_builder() {
1712        use camel_api::loop_eip::LoopMode;
1713
1714        let def = RouteBuilder::from("direct:start")
1715            .route_id("loop-while-test")
1716            .loop_while(|_ex| true)
1717            .to("mock:retry")
1718            .end_loop()
1719            .build()
1720            .unwrap();
1721
1722        assert_eq!(def.steps().len(), 1);
1723        match &def.steps()[0] {
1724            BuilderStep::Loop { config, steps } => {
1725                assert!(matches!(config.mode, LoopMode::While(_)));
1726                assert_eq!(steps.len(), 1);
1727            }
1728            other => panic!("Expected Loop, got {:?}", other),
1729        }
1730    }
1731
1732    #[test]
1733    fn test_nested_loop_builder() {
1734        use camel_api::loop_eip::LoopMode;
1735
1736        let def = RouteBuilder::from("direct:start")
1737            .route_id("nested-loop-test")
1738            .loop_count(2)
1739            .to("mock:outer")
1740            .loop_count(3)
1741            .to("mock:inner")
1742            .end_loop()
1743            .end_loop()
1744            .to("mock:after")
1745            .build()
1746            .unwrap();
1747
1748        assert_eq!(def.steps().len(), 2);
1749        match &def.steps()[0] {
1750            BuilderStep::Loop { steps, .. } => {
1751                assert_eq!(steps.len(), 2);
1752                match &steps[1] {
1753                    BuilderStep::Loop {
1754                        config,
1755                        steps: inner_steps,
1756                    } => {
1757                        assert!(matches!(config.mode, LoopMode::Count(3)));
1758                        assert_eq!(inner_steps.len(), 1);
1759                    }
1760                    other => panic!("Expected nested Loop, got {:?}", other),
1761                }
1762            }
1763            other => panic!("Expected outer Loop, got {:?}", other),
1764        }
1765    }
1766
1767    // -----------------------------------------------------------------------
1768    // Processor behavior tests — exercise the real Tower services directly
1769    // -----------------------------------------------------------------------
1770
1771    #[tokio::test]
1772    async fn test_set_header_processor_works() {
1773        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1774        let exchange = Exchange::new(Message::new("test"));
1775        let result = svc.call(exchange).await.unwrap();
1776        assert_eq!(
1777            result.input.header("greeting"),
1778            Some(&Value::String("hello".into()))
1779        );
1780    }
1781
1782    #[tokio::test]
1783    async fn test_filter_processor_passes() {
1784        use camel_api::BoxProcessorExt;
1785        use camel_processor::FilterService;
1786
1787        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1788        let mut svc =
1789            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1790        let exchange = Exchange::new(Message::new("pass"));
1791        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1792        assert_eq!(result.input.body.as_text(), Some("pass"));
1793    }
1794
1795    #[tokio::test]
1796    async fn test_filter_processor_blocks() {
1797        use camel_api::BoxProcessorExt;
1798        use camel_processor::FilterService;
1799
1800        let sub = BoxProcessor::from_fn(|_ex| {
1801            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1802        });
1803        let mut svc =
1804            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1805        let exchange = Exchange::new(Message::new("reject"));
1806        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1807        assert_eq!(result.input.body.as_text(), Some("reject"));
1808    }
1809
1810    #[tokio::test]
1811    async fn test_map_body_processor_works() {
1812        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1813            if let Some(text) = body.as_text() {
1814                Body::Text(text.to_uppercase())
1815            } else {
1816                body
1817            }
1818        });
1819        let exchange = Exchange::new(Message::new("hello"));
1820        let result = mapper.oneshot(exchange).await.unwrap();
1821        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1822    }
1823
1824    #[tokio::test]
1825    async fn test_process_custom_processor_works() {
1826        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1827            ex.set_property("custom", Value::Bool(true));
1828            Ok(ex)
1829        });
1830        let exchange = Exchange::new(Message::default());
1831        let result = processor.oneshot(exchange).await.unwrap();
1832        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1833    }
1834
1835    // -----------------------------------------------------------------------
1836    // Sequential pipeline test
1837    // -----------------------------------------------------------------------
1838
1839    #[tokio::test]
1840    async fn test_compose_pipeline_runs_steps_in_order() {
1841        use camel_core::route::compose_pipeline;
1842
1843        let processors = vec![
1844            BoxProcessor::new(SetHeader::new(
1845                IdentityProcessor,
1846                "step",
1847                Value::String("one".into()),
1848            )),
1849            BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1850                if let Some(text) = body.as_text() {
1851                    Body::Text(format!("{}-processed", text))
1852                } else {
1853                    body
1854                }
1855            })),
1856        ];
1857
1858        let pipeline = compose_pipeline(processors);
1859        let exchange = Exchange::new(Message::new("hello"));
1860        let result = pipeline.oneshot(exchange).await.unwrap();
1861
1862        assert_eq!(
1863            result.input.header("step"),
1864            Some(&Value::String("one".into()))
1865        );
1866        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1867    }
1868
1869    #[tokio::test]
1870    async fn test_compose_pipeline_empty_is_identity() {
1871        use camel_core::route::compose_pipeline;
1872
1873        let pipeline = compose_pipeline(vec![]);
1874        let exchange = Exchange::new(Message::new("unchanged"));
1875        let result = pipeline.oneshot(exchange).await.unwrap();
1876        assert_eq!(result.input.body.as_text(), Some("unchanged"));
1877    }
1878
1879    // -----------------------------------------------------------------------
1880    // Circuit breaker builder tests
1881    // -----------------------------------------------------------------------
1882
1883    #[test]
1884    fn test_builder_circuit_breaker_sets_config() {
1885        use camel_api::circuit_breaker::CircuitBreakerConfig;
1886
1887        let config = CircuitBreakerConfig::new().failure_threshold(5);
1888        let definition = RouteBuilder::from("timer:tick")
1889            .route_id("test-route")
1890            .circuit_breaker(config)
1891            .build()
1892            .unwrap();
1893
1894        let cb = definition
1895            .circuit_breaker_config()
1896            .expect("circuit breaker should be set");
1897        assert_eq!(cb.failure_threshold, 5);
1898    }
1899
1900    #[test]
1901    fn test_builder_circuit_breaker_with_error_handler() {
1902        use camel_api::circuit_breaker::CircuitBreakerConfig;
1903        use camel_api::error_handler::ErrorHandlerConfig;
1904
1905        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1906        let eh_config = ErrorHandlerConfig::log_only();
1907
1908        let definition = RouteBuilder::from("timer:tick")
1909            .route_id("test-route")
1910            .to("log:info")
1911            .circuit_breaker(cb_config)
1912            .error_handler(eh_config)
1913            .build()
1914            .unwrap();
1915
1916        assert!(
1917            definition.circuit_breaker_config().is_some(),
1918            "circuit breaker config should be set"
1919        );
1920        // Route definition was built successfully with both configs.
1921    }
1922
1923    #[test]
1924    fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1925        let definition = RouteBuilder::from("direct:start")
1926            .route_id("test-route")
1927            .dead_letter_channel("log:dlc")
1928            .on_exception(|e| matches!(e, CamelError::Io(_)))
1929            .retry(3)
1930            .handled_by("log:io")
1931            .end_on_exception()
1932            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1933            .retry(1)
1934            .end_on_exception()
1935            .to("mock:out")
1936            .build()
1937            .expect("route should build");
1938
1939        let cfg = definition
1940            .error_handler_config()
1941            .expect("error handler should be set");
1942        assert_eq!(cfg.policies.len(), 2);
1943        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1944        assert_eq!(
1945            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1946            Some(3)
1947        );
1948        assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1949        assert_eq!(
1950            cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1951            Some(1)
1952        );
1953    }
1954
1955    #[test]
1956    fn test_builder_on_exception_mixed_mode_rejected() {
1957        let result = RouteBuilder::from("direct:start")
1958            .route_id("test-route")
1959            .error_handler(ErrorHandlerConfig::log_only())
1960            .on_exception(|_e| true)
1961            .end_on_exception()
1962            .to("mock:out")
1963            .build();
1964
1965        let err = result.err().expect("mixed mode should fail with an error");
1966
1967        assert!(
1968            format!("{err}").contains("mixed error handler modes"),
1969            "unexpected error: {err}"
1970        );
1971    }
1972
1973    #[test]
1974    fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1975        let definition = RouteBuilder::from("direct:start")
1976            .route_id("test-route")
1977            .on_exception(|_e| true)
1978            .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1979            .with_jitter(0.5)
1980            .end_on_exception()
1981            .to("mock:out")
1982            .build()
1983            .expect("route should build");
1984
1985        let cfg = definition
1986            .error_handler_config()
1987            .expect("error handler should be set");
1988        assert_eq!(cfg.policies.len(), 1);
1989        assert!(cfg.policies[0].retry.is_none());
1990    }
1991
1992    #[test]
1993    fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1994        let definition = RouteBuilder::from("direct:start")
1995            .route_id("test-route")
1996            .dead_letter_channel("log:dlc")
1997            .to("mock:out")
1998            .build()
1999            .expect("route should build");
2000
2001        let cfg = definition
2002            .error_handler_config()
2003            .expect("error handler should be set");
2004        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
2005        assert!(cfg.policies.is_empty());
2006    }
2007
2008    #[test]
2009    fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
2010        let definition = RouteBuilder::from("direct:start")
2011            .route_id("test-route")
2012            .dead_letter_channel("log:first")
2013            .on_exception(|e| matches!(e, CamelError::Io(_)))
2014            .retry(2)
2015            .end_on_exception()
2016            .dead_letter_channel("log:second")
2017            .to("mock:out")
2018            .build()
2019            .expect("route should build");
2020
2021        let cfg = definition
2022            .error_handler_config()
2023            .expect("error handler should be set");
2024        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2025        assert_eq!(cfg.policies.len(), 1);
2026        assert_eq!(
2027            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
2028            Some(2)
2029        );
2030    }
2031
2032    #[test]
2033    fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
2034        let definition = RouteBuilder::from("direct:start")
2035            .route_id("test-route")
2036            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
2037            .retry(1)
2038            .end_on_exception()
2039            .to("mock:out")
2040            .build()
2041            .expect("route should build");
2042
2043        let cfg = definition
2044            .error_handler_config()
2045            .expect("error handler should be set");
2046        assert!(cfg.dlc_uri.is_none());
2047        assert_eq!(cfg.policies.len(), 1);
2048    }
2049
2050    #[test]
2051    fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
2052        let first = ErrorHandlerConfig::dead_letter_channel("log:first");
2053        let second = ErrorHandlerConfig::dead_letter_channel("log:second");
2054
2055        let definition = RouteBuilder::from("direct:start")
2056            .route_id("test-route")
2057            .error_handler(first)
2058            .error_handler(second)
2059            .to("mock:out")
2060            .build()
2061            .expect("route should build");
2062
2063        let cfg = definition
2064            .error_handler_config()
2065            .expect("error handler should be set");
2066        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2067    }
2068
2069    // --- Splitter builder tests ---
2070
2071    #[test]
2072    fn test_split_builder_typestate() {
2073        use camel_api::splitter::{SplitterConfig, split_body_lines};
2074
2075        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
2076        let definition = RouteBuilder::from("timer:test?period=1000")
2077            .route_id("test-route")
2078            .split(SplitterConfig::new(split_body_lines()))
2079            .to("mock:per-fragment")
2080            .end_split()
2081            .to("mock:final")
2082            .build()
2083            .unwrap();
2084
2085        // Should have 2 top-level steps: Split + To("mock:final")
2086        assert_eq!(definition.steps().len(), 2);
2087    }
2088
2089    #[test]
2090    fn test_split_builder_steps_collected() {
2091        use camel_api::splitter::{SplitterConfig, split_body_lines};
2092
2093        let definition = RouteBuilder::from("timer:test?period=1000")
2094            .route_id("test-route")
2095            .split(SplitterConfig::new(split_body_lines()))
2096            .set_header("fragment", Value::String("yes".into()))
2097            .to("mock:per-fragment")
2098            .end_split()
2099            .build()
2100            .unwrap();
2101
2102        // Should have 1 top-level step: Split (containing 2 sub-steps)
2103        assert_eq!(definition.steps().len(), 1);
2104        match &definition.steps()[0] {
2105            BuilderStep::Split { steps, .. } => {
2106                assert_eq!(steps.len(), 2); // SetHeader + To
2107            }
2108            other => panic!("Expected Split, got {:?}", other),
2109        }
2110    }
2111
2112    #[test]
2113    fn test_split_builder_config_propagated() {
2114        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
2115
2116        let definition = RouteBuilder::from("timer:test?period=1000")
2117            .route_id("test-route")
2118            .split(
2119                SplitterConfig::new(split_body_lines())
2120                    .parallel(true)
2121                    .parallel_limit(4)
2122                    .aggregation(AggregationStrategy::CollectAll),
2123            )
2124            .to("mock:per-fragment")
2125            .end_split()
2126            .build()
2127            .unwrap();
2128
2129        match &definition.steps()[0] {
2130            BuilderStep::Split { config, .. } => {
2131                assert!(config.parallel);
2132                assert_eq!(config.parallel_limit, Some(4));
2133                assert!(matches!(
2134                    config.aggregation,
2135                    AggregationStrategy::CollectAll
2136                ));
2137            }
2138            other => panic!("Expected Split, got {:?}", other),
2139        }
2140    }
2141
2142    #[test]
2143    fn test_aggregate_builder_adds_step() {
2144        use camel_api::aggregator::AggregatorConfig;
2145        use camel_core::route::BuilderStep;
2146
2147        let definition = RouteBuilder::from("timer:tick")
2148            .route_id("test-route")
2149            .aggregate(
2150                AggregatorConfig::correlate_by("key")
2151                    .complete_when_size(2)
2152                    .build()
2153                    .unwrap(),
2154            )
2155            .build()
2156            .unwrap();
2157
2158        assert_eq!(definition.steps().len(), 1);
2159        assert!(matches!(
2160            definition.steps()[0],
2161            BuilderStep::Aggregate { .. }
2162        ));
2163    }
2164
2165    #[test]
2166    fn test_aggregate_in_split_builder() {
2167        use camel_api::aggregator::AggregatorConfig;
2168        use camel_api::splitter::{SplitterConfig, split_body_lines};
2169        use camel_core::route::BuilderStep;
2170
2171        let definition = RouteBuilder::from("timer:tick")
2172            .route_id("test-route")
2173            .split(SplitterConfig::new(split_body_lines()))
2174            .aggregate(
2175                AggregatorConfig::correlate_by("key")
2176                    .complete_when_size(1)
2177                    .build()
2178                    .unwrap(),
2179            )
2180            .end_split()
2181            .build()
2182            .unwrap();
2183
2184        assert_eq!(definition.steps().len(), 1);
2185        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2186            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2187        } else {
2188            panic!("expected Split step");
2189        }
2190    }
2191
2192    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
2193
2194    #[test]
2195    fn test_builder_set_body_static_adds_processor() {
2196        let definition = RouteBuilder::from("timer:tick")
2197            .route_id("test-route")
2198            .set_body("fixed")
2199            .build()
2200            .unwrap();
2201        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2202    }
2203
2204    #[test]
2205    fn test_builder_set_body_fn_adds_processor() {
2206        let definition = RouteBuilder::from("timer:tick")
2207            .route_id("test-route")
2208            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2209            .build()
2210            .unwrap();
2211        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2212    }
2213
2214    #[test]
2215    fn transform_alias_produces_same_as_set_body() {
2216        let route_transform = RouteBuilder::from("timer:tick")
2217            .route_id("test-route")
2218            .transform("hello")
2219            .build()
2220            .unwrap();
2221
2222        let route_set_body = RouteBuilder::from("timer:tick")
2223            .route_id("test-route")
2224            .set_body("hello")
2225            .build()
2226            .unwrap();
2227
2228        assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2229    }
2230
2231    #[test]
2232    fn test_builder_set_header_fn_adds_processor() {
2233        let definition = RouteBuilder::from("timer:tick")
2234            .route_id("test-route")
2235            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2236            .build()
2237            .unwrap();
2238        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2239    }
2240
2241    #[tokio::test]
2242    async fn test_set_body_static_processor_works() {
2243        use camel_core::route::compose_pipeline;
2244        let def = RouteBuilder::from("t:t")
2245            .route_id("test-route")
2246            .set_body("replaced")
2247            .build()
2248            .unwrap();
2249        let pipeline = compose_pipeline(
2250            def.steps()
2251                .iter()
2252                .filter_map(|s| {
2253                    if let BuilderStep::Processor(p) = s {
2254                        Some(p.clone())
2255                    } else {
2256                        None
2257                    }
2258                })
2259                .collect(),
2260        );
2261        let exchange = Exchange::new(Message::new("original"));
2262        let result = pipeline.oneshot(exchange).await.unwrap();
2263        assert_eq!(result.input.body.as_text(), Some("replaced"));
2264    }
2265
2266    #[tokio::test]
2267    async fn test_set_body_fn_processor_works() {
2268        use camel_core::route::compose_pipeline;
2269        let def = RouteBuilder::from("t:t")
2270            .route_id("test-route")
2271            .set_body_fn(|ex: &Exchange| {
2272                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2273            })
2274            .build()
2275            .unwrap();
2276        let pipeline = compose_pipeline(
2277            def.steps()
2278                .iter()
2279                .filter_map(|s| {
2280                    if let BuilderStep::Processor(p) = s {
2281                        Some(p.clone())
2282                    } else {
2283                        None
2284                    }
2285                })
2286                .collect(),
2287        );
2288        let exchange = Exchange::new(Message::new("hello"));
2289        let result = pipeline.oneshot(exchange).await.unwrap();
2290        assert_eq!(result.input.body.as_text(), Some("HELLO"));
2291    }
2292
2293    #[tokio::test]
2294    async fn test_set_header_fn_processor_works() {
2295        use camel_core::route::compose_pipeline;
2296        let def = RouteBuilder::from("t:t")
2297            .route_id("test-route")
2298            .set_header_fn("echo", |ex: &Exchange| {
2299                ex.input
2300                    .body
2301                    .as_text()
2302                    .map(|t| Value::String(t.into()))
2303                    .unwrap_or(Value::Null)
2304            })
2305            .build()
2306            .unwrap();
2307        let pipeline = compose_pipeline(
2308            def.steps()
2309                .iter()
2310                .filter_map(|s| {
2311                    if let BuilderStep::Processor(p) = s {
2312                        Some(p.clone())
2313                    } else {
2314                        None
2315                    }
2316                })
2317                .collect(),
2318        );
2319        let exchange = Exchange::new(Message::new("ping"));
2320        let result = pipeline.oneshot(exchange).await.unwrap();
2321        assert_eq!(
2322            result.input.header("echo"),
2323            Some(&Value::String("ping".into()))
2324        );
2325    }
2326
2327    // ── FilterBuilder typestate tests ─────────────────────────────────────
2328
2329    #[test]
2330    fn test_filter_builder_typestate() {
2331        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2332            .route_id("test-route")
2333            .filter(|_ex| true)
2334            .to("mock:inner")
2335            .end_filter()
2336            .to("mock:outer")
2337            .build();
2338        assert!(result.is_ok());
2339    }
2340
2341    #[test]
2342    fn test_filter_builder_steps_collected() {
2343        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2344            .route_id("test-route")
2345            .filter(|_ex| true)
2346            .to("mock:inner")
2347            .end_filter()
2348            .build()
2349            .unwrap();
2350
2351        assert_eq!(definition.steps().len(), 1);
2352        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2353    }
2354
2355    #[test]
2356    fn test_wire_tap_builder_adds_step() {
2357        let definition = RouteBuilder::from("timer:tick")
2358            .route_id("test-route")
2359            .wire_tap("mock:tap")
2360            .to("mock:result")
2361            .build()
2362            .unwrap();
2363
2364        assert_eq!(definition.steps().len(), 2);
2365        assert!(
2366            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2367        );
2368        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2369    }
2370
2371    // ── MulticastBuilder typestate tests ─────────────────────────────────────
2372
2373    #[test]
2374    fn test_multicast_builder_typestate() {
2375        let definition = RouteBuilder::from("timer:tick")
2376            .route_id("test-route")
2377            .multicast()
2378            .to("direct:a")
2379            .to("direct:b")
2380            .end_multicast()
2381            .to("mock:result")
2382            .build()
2383            .unwrap();
2384
2385        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
2386    }
2387
2388    #[test]
2389    fn test_multicast_builder_steps_collected() {
2390        let definition = RouteBuilder::from("timer:tick")
2391            .route_id("test-route")
2392            .multicast()
2393            .to("direct:a")
2394            .to("direct:b")
2395            .end_multicast()
2396            .build()
2397            .unwrap();
2398
2399        match &definition.steps()[0] {
2400            BuilderStep::Multicast { steps, .. } => {
2401                assert_eq!(steps.len(), 2);
2402            }
2403            other => panic!("Expected Multicast, got {:?}", other),
2404        }
2405    }
2406
2407    // ── Concurrency builder tests ─────────────────────────────────────
2408
2409    #[test]
2410    fn test_builder_concurrent_sets_concurrency() {
2411        use camel_component_api::ConcurrencyModel;
2412
2413        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2414            .route_id("test-route")
2415            .concurrent(16)
2416            .to("log:info")
2417            .build()
2418            .unwrap();
2419
2420        assert_eq!(
2421            definition.concurrency_override(),
2422            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2423        );
2424    }
2425
2426    #[test]
2427    fn test_builder_concurrent_zero_means_unbounded() {
2428        use camel_component_api::ConcurrencyModel;
2429
2430        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2431            .route_id("test-route")
2432            .concurrent(0)
2433            .to("log:info")
2434            .build()
2435            .unwrap();
2436
2437        assert_eq!(
2438            definition.concurrency_override(),
2439            Some(&ConcurrencyModel::Concurrent { max: None })
2440        );
2441    }
2442
2443    #[test]
2444    fn test_builder_sequential_sets_concurrency() {
2445        use camel_component_api::ConcurrencyModel;
2446
2447        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2448            .route_id("test-route")
2449            .sequential()
2450            .to("log:info")
2451            .build()
2452            .unwrap();
2453
2454        assert_eq!(
2455            definition.concurrency_override(),
2456            Some(&ConcurrencyModel::Sequential)
2457        );
2458    }
2459
2460    #[test]
2461    fn test_builder_default_concurrency_is_none() {
2462        let definition = RouteBuilder::from("timer:tick")
2463            .route_id("test-route")
2464            .to("log:info")
2465            .build()
2466            .unwrap();
2467
2468        assert_eq!(definition.concurrency_override(), None);
2469    }
2470
2471    // ── Route lifecycle builder tests ─────────────────────────────────────
2472
2473    #[test]
2474    fn test_builder_route_id_sets_id() {
2475        let definition = RouteBuilder::from("timer:tick")
2476            .route_id("my-route")
2477            .build()
2478            .unwrap();
2479
2480        assert_eq!(definition.route_id(), "my-route");
2481    }
2482
2483    #[test]
2484    fn test_build_without_route_id_fails() {
2485        let result = RouteBuilder::from("timer:tick?period=1000")
2486            .to("log:info")
2487            .build();
2488        let err = match result {
2489            Err(e) => e.to_string(),
2490            Ok(_) => panic!("build() should fail without route_id"),
2491        };
2492        assert!(
2493            err.contains("route_id"),
2494            "error should mention route_id, got: {}",
2495            err
2496        );
2497    }
2498
2499    #[test]
2500    fn test_builder_empty_route_id_rejected() {
2501        let result = RouteBuilder::from("timer:tick").route_id("").build();
2502        let err = result.err().expect("empty route_id should be rejected");
2503        assert!(matches!(err, CamelError::RouteError(_)));
2504    }
2505
2506    #[test]
2507    fn test_builder_whitespace_route_id_rejected() {
2508        let result = RouteBuilder::from("timer:tick").route_id("   ").build();
2509        assert!(result.is_err());
2510    }
2511
2512    #[test]
2513    fn test_builder_auto_startup_false() {
2514        let definition = RouteBuilder::from("timer:tick")
2515            .route_id("test-route")
2516            .auto_startup(false)
2517            .build()
2518            .unwrap();
2519
2520        assert!(!definition.auto_startup());
2521    }
2522
2523    #[test]
2524    fn test_builder_startup_order_custom() {
2525        let definition = RouteBuilder::from("timer:tick")
2526            .route_id("test-route")
2527            .startup_order(50)
2528            .build()
2529            .unwrap();
2530
2531        assert_eq!(definition.startup_order(), 50);
2532    }
2533
2534    #[test]
2535    fn test_builder_defaults() {
2536        let definition = RouteBuilder::from("timer:tick")
2537            .route_id("test-route")
2538            .build()
2539            .unwrap();
2540
2541        assert_eq!(definition.route_id(), "test-route");
2542        assert!(definition.auto_startup());
2543        assert_eq!(definition.startup_order(), 1000);
2544    }
2545
2546    // ── Choice typestate tests ──────────────────────────────────────────────────
2547
2548    #[test]
2549    fn test_choice_builder_single_when() {
2550        let definition = RouteBuilder::from("timer:tick")
2551            .route_id("test-route")
2552            .choice()
2553            .when(|ex: &Exchange| ex.input.header("type").is_some())
2554            .to("mock:typed")
2555            .end_when()
2556            .end_choice()
2557            .build()
2558            .unwrap();
2559        assert_eq!(definition.steps().len(), 1);
2560        assert!(
2561            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2562            if whens.len() == 1 && otherwise.is_none())
2563        );
2564    }
2565
2566    #[test]
2567    fn test_choice_builder_when_otherwise() {
2568        let definition = RouteBuilder::from("timer:tick")
2569            .route_id("test-route")
2570            .choice()
2571            .when(|ex: &Exchange| ex.input.header("a").is_some())
2572            .to("mock:a")
2573            .end_when()
2574            .otherwise()
2575            .to("mock:fallback")
2576            .end_otherwise()
2577            .end_choice()
2578            .build()
2579            .unwrap();
2580        assert!(
2581            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2582            if whens.len() == 1 && otherwise.is_some())
2583        );
2584    }
2585
2586    #[test]
2587    fn test_choice_builder_multiple_whens() {
2588        let definition = RouteBuilder::from("timer:tick")
2589            .route_id("test-route")
2590            .choice()
2591            .when(|ex: &Exchange| ex.input.header("a").is_some())
2592            .to("mock:a")
2593            .end_when()
2594            .when(|ex: &Exchange| ex.input.header("b").is_some())
2595            .to("mock:b")
2596            .end_when()
2597            .end_choice()
2598            .build()
2599            .unwrap();
2600        assert!(
2601            matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2602            if whens.len() == 2)
2603        );
2604    }
2605
2606    #[test]
2607    fn test_choice_step_after_choice() {
2608        // Steps after end_choice() are added to the outer pipeline, not inside choice.
2609        let definition = RouteBuilder::from("timer:tick")
2610            .route_id("test-route")
2611            .choice()
2612            .when(|_ex: &Exchange| true)
2613            .to("mock:inner")
2614            .end_when()
2615            .end_choice()
2616            .to("mock:outer") // must be step[1], not inside choice
2617            .build()
2618            .unwrap();
2619        assert_eq!(definition.steps().len(), 2);
2620        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2621    }
2622
2623    // ── Throttle typestate tests ──────────────────────────────────────────────────
2624
2625    #[test]
2626    fn test_throttle_builder_typestate() {
2627        let definition = RouteBuilder::from("timer:tick")
2628            .route_id("test-route")
2629            .throttle(10, std::time::Duration::from_secs(1))
2630            .to("mock:result")
2631            .end_throttle()
2632            .build()
2633            .unwrap();
2634
2635        assert_eq!(definition.steps().len(), 1);
2636        assert!(matches!(
2637            &definition.steps()[0],
2638            BuilderStep::Throttle { .. }
2639        ));
2640    }
2641
2642    #[test]
2643    fn test_throttle_builder_with_strategy() {
2644        let definition = RouteBuilder::from("timer:tick")
2645            .route_id("test-route")
2646            .throttle(10, std::time::Duration::from_secs(1))
2647            .strategy(ThrottleStrategy::Reject)
2648            .to("mock:result")
2649            .end_throttle()
2650            .build()
2651            .unwrap();
2652
2653        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2654            assert_eq!(config.strategy, ThrottleStrategy::Reject);
2655        } else {
2656            panic!("Expected Throttle step");
2657        }
2658    }
2659
2660    #[test]
2661    fn test_throttle_builder_steps_collected() {
2662        let definition = RouteBuilder::from("timer:tick")
2663            .route_id("test-route")
2664            .throttle(5, std::time::Duration::from_secs(1))
2665            .set_header("throttled", Value::Bool(true))
2666            .to("mock:throttled")
2667            .end_throttle()
2668            .build()
2669            .unwrap();
2670
2671        match &definition.steps()[0] {
2672            BuilderStep::Throttle { steps, .. } => {
2673                assert_eq!(steps.len(), 2); // SetHeader + To
2674            }
2675            other => panic!("Expected Throttle, got {:?}", other),
2676        }
2677    }
2678
2679    #[test]
2680    fn test_throttle_step_after_throttle() {
2681        // Steps after end_throttle() are added to the outer pipeline, not inside throttle.
2682        let definition = RouteBuilder::from("timer:tick")
2683            .route_id("test-route")
2684            .throttle(10, std::time::Duration::from_secs(1))
2685            .to("mock:inner")
2686            .end_throttle()
2687            .to("mock:outer")
2688            .build()
2689            .unwrap();
2690
2691        assert_eq!(definition.steps().len(), 2);
2692        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2693    }
2694
2695    // ── LoadBalance typestate tests ──────────────────────────────────────────────────
2696
2697    #[test]
2698    fn test_load_balance_builder_typestate() {
2699        let definition = RouteBuilder::from("timer:tick")
2700            .route_id("test-route")
2701            .load_balance()
2702            .round_robin()
2703            .to("mock:a")
2704            .to("mock:b")
2705            .end_load_balance()
2706            .build()
2707            .unwrap();
2708
2709        assert_eq!(definition.steps().len(), 1);
2710        assert!(matches!(
2711            &definition.steps()[0],
2712            BuilderStep::LoadBalance { .. }
2713        ));
2714    }
2715
2716    #[test]
2717    fn test_load_balance_builder_with_strategy() {
2718        let definition = RouteBuilder::from("timer:tick")
2719            .route_id("test-route")
2720            .load_balance()
2721            .random()
2722            .to("mock:result")
2723            .end_load_balance()
2724            .build()
2725            .unwrap();
2726
2727        if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2728            assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2729        } else {
2730            panic!("Expected LoadBalance step");
2731        }
2732    }
2733
2734    #[test]
2735    fn test_load_balance_builder_steps_collected() {
2736        let definition = RouteBuilder::from("timer:tick")
2737            .route_id("test-route")
2738            .load_balance()
2739            .set_header("lb", Value::Bool(true))
2740            .to("mock:a")
2741            .end_load_balance()
2742            .build()
2743            .unwrap();
2744
2745        match &definition.steps()[0] {
2746            BuilderStep::LoadBalance { steps, .. } => {
2747                assert_eq!(steps.len(), 2); // SetHeader + To
2748            }
2749            other => panic!("Expected LoadBalance, got {:?}", other),
2750        }
2751    }
2752
2753    #[test]
2754    fn test_load_balance_step_after_load_balance() {
2755        // Steps after end_load_balance() are added to the outer pipeline, not inside load_balance.
2756        let definition = RouteBuilder::from("timer:tick")
2757            .route_id("test-route")
2758            .load_balance()
2759            .to("mock:inner")
2760            .end_load_balance()
2761            .to("mock:outer")
2762            .build()
2763            .unwrap();
2764
2765        assert_eq!(definition.steps().len(), 2);
2766        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2767    }
2768
2769    // ── DynamicRouter typestate tests ──────────────────────────────────────────────────
2770
2771    #[test]
2772    fn test_dynamic_router_builder() {
2773        let definition = RouteBuilder::from("timer:tick")
2774            .route_id("test-route")
2775            .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2776            .build()
2777            .unwrap();
2778
2779        assert_eq!(definition.steps().len(), 1);
2780        assert!(matches!(
2781            &definition.steps()[0],
2782            BuilderStep::DynamicRouter { .. }
2783        ));
2784    }
2785
2786    #[test]
2787    fn test_dynamic_router_builder_with_config() {
2788        let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2789            .max_iterations(100)
2790            .cache_size(500);
2791
2792        let definition = RouteBuilder::from("timer:tick")
2793            .route_id("test-route")
2794            .dynamic_router_with_config(config)
2795            .build()
2796            .unwrap();
2797
2798        assert_eq!(definition.steps().len(), 1);
2799        if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2800            assert_eq!(config.max_iterations, 100);
2801            assert_eq!(config.cache_size, 500);
2802        } else {
2803            panic!("Expected DynamicRouter step");
2804        }
2805    }
2806
2807    #[test]
2808    fn test_dynamic_router_step_after_router() {
2809        // Steps after dynamic_router() are added to the outer pipeline.
2810        let definition = RouteBuilder::from("timer:tick")
2811            .route_id("test-route")
2812            .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2813            .to("mock:outer")
2814            .build()
2815            .unwrap();
2816
2817        assert_eq!(definition.steps().len(), 2);
2818        assert!(matches!(
2819            &definition.steps()[0],
2820            BuilderStep::DynamicRouter { .. }
2821        ));
2822        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2823    }
2824
2825    #[test]
2826    fn routing_slip_builder_creates_step() {
2827        use camel_api::RoutingSlipExpression;
2828
2829        let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2830
2831        let route = RouteBuilder::from("direct:start")
2832            .route_id("routing-slip-test")
2833            .routing_slip(expression)
2834            .build()
2835            .unwrap();
2836
2837        assert!(
2838            matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2839            "Expected RoutingSlip step"
2840        );
2841    }
2842
2843    #[test]
2844    fn routing_slip_with_config_builder_creates_step() {
2845        use camel_api::RoutingSlipConfig;
2846
2847        let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2848            .uri_delimiter("|")
2849            .cache_size(50)
2850            .ignore_invalid_endpoints(true);
2851
2852        let route = RouteBuilder::from("direct:start")
2853            .route_id("routing-slip-config-test")
2854            .routing_slip_with_config(config)
2855            .build()
2856            .unwrap();
2857
2858        if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2859            assert_eq!(config.uri_delimiter, "|");
2860            assert_eq!(config.cache_size, 50);
2861            assert!(config.ignore_invalid_endpoints);
2862        } else {
2863            panic!("Expected RoutingSlip step");
2864        }
2865    }
2866
2867    #[test]
2868    fn test_builder_marshal_adds_processor_step() {
2869        let definition = RouteBuilder::from("timer:tick")
2870            .route_id("test-route")
2871            .marshal("json")
2872            .unwrap()
2873            .build()
2874            .unwrap();
2875        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2876    }
2877
2878    #[test]
2879    fn test_builder_unmarshal_adds_processor_step() {
2880        let definition = RouteBuilder::from("timer:tick")
2881            .route_id("test-route")
2882            .unmarshal("json")
2883            .unwrap()
2884            .build()
2885            .unwrap();
2886        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2887    }
2888
2889    #[test]
2890    fn test_builder_stream_cache_adds_processor_step() {
2891        let definition = RouteBuilder::from("timer:tick")
2892            .route_id("test-route")
2893            .stream_cache(1024)
2894            .build()
2895            .unwrap();
2896        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2897    }
2898
2899    #[test]
2900    fn validate_adds_to_step_with_validator_uri() {
2901        let def = RouteBuilder::from("direct:in")
2902            .route_id("test")
2903            .validate("schemas/order.xsd")
2904            .build()
2905            .unwrap();
2906        let steps = def.steps();
2907        assert_eq!(steps.len(), 1);
2908        assert!(
2909            matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2910            "got: {:?}",
2911            steps[0]
2912        );
2913    }
2914
2915    #[test]
2916    fn test_builder_marshal_returns_err_for_unknown_format() {
2917        let result = RouteBuilder::from("timer:tick")
2918            .route_id("test-route")
2919            .marshal("csv");
2920        let err = match result {
2921            Err(e) => e,
2922            Ok(_) => panic!("marshal with unknown format should return Err"),
2923        };
2924        let msg = err.to_string();
2925        assert!(
2926            msg.contains("unknown data format"),
2927            "error should mention unknown format, got: {msg}"
2928        );
2929        assert!(
2930            msg.contains("csv"),
2931            "error should mention format name, got: {msg}"
2932        );
2933    }
2934
2935    #[test]
2936    fn test_builder_unmarshal_returns_err_for_unknown_format() {
2937        let result = RouteBuilder::from("timer:tick")
2938            .route_id("test-route")
2939            .unmarshal("csv");
2940        let err = match result {
2941            Err(e) => e,
2942            Ok(_) => panic!("unmarshal with unknown format should return Err"),
2943        };
2944        let msg = err.to_string();
2945        assert!(
2946            msg.contains("unknown data format"),
2947            "error should mention unknown format, got: {msg}"
2948        );
2949        assert!(
2950            msg.contains("csv"),
2951            "error should mention format name, got: {msg}"
2952        );
2953    }
2954
2955    #[test]
2956    fn test_builder_recipient_list_creates_step() {
2957        let route = RouteBuilder::from("direct:start")
2958            .route_id("recipient-list-test")
2959            .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2960            .build()
2961            .unwrap();
2962
2963        assert!(matches!(
2964            &route.steps()[0],
2965            BuilderStep::RecipientList { .. }
2966        ));
2967    }
2968
2969    #[test]
2970    fn test_builder_recipient_list_with_config_creates_step() {
2971        let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2972
2973        let route = RouteBuilder::from("direct:start")
2974            .route_id("recipient-list-config-test")
2975            .recipient_list_with_config(config)
2976            .build()
2977            .unwrap();
2978
2979        assert!(matches!(
2980            &route.steps()[0],
2981            BuilderStep::RecipientList { .. }
2982        ));
2983    }
2984
2985    #[test]
2986    fn test_builder_script_adds_script_step() {
2987        let route = RouteBuilder::from("direct:start")
2988            .route_id("script-test")
2989            .script("rhai", "headers[\"x\"] = \"y\"")
2990            .build()
2991            .unwrap();
2992
2993        assert!(matches!(
2994            &route.steps()[0],
2995            BuilderStep::Script { language, script }
2996            if language == "rhai" && script == "headers[\"x\"] = \"y\""
2997        ));
2998    }
2999
3000    #[test]
3001    fn test_builder_delay_and_delay_with_header_add_steps() {
3002        let route = RouteBuilder::from("direct:start")
3003            .route_id("delay-test")
3004            .delay(Duration::from_millis(250))
3005            .delay_with_header(Duration::from_millis(500), "x-delay")
3006            .build()
3007            .unwrap();
3008
3009        assert_eq!(route.steps().len(), 2);
3010        assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
3011        assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
3012    }
3013
3014    #[test]
3015    fn test_builder_log_and_stop_add_steps_in_order() {
3016        let route = RouteBuilder::from("direct:start")
3017            .route_id("log-stop-test")
3018            .log("hello", LogLevel::Info)
3019            .stop()
3020            .to("mock:after")
3021            .build()
3022            .unwrap();
3023
3024        assert_eq!(route.steps().len(), 3);
3025        assert!(matches!(
3026            &route.steps()[0],
3027            BuilderStep::Log { message, .. } if message == "hello"
3028        ));
3029        assert!(matches!(&route.steps()[1], BuilderStep::Stop));
3030        assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
3031    }
3032
3033    #[test]
3034    fn test_builder_stream_cache_default_adds_processor_step() {
3035        let route = RouteBuilder::from("direct:start")
3036            .route_id("stream-cache-default-test")
3037            .stream_cache_default()
3038            .build()
3039            .unwrap();
3040
3041        assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
3042    }
3043
3044    #[test]
3045    fn test_validate_preserves_existing_validator_prefix() {
3046        let route = RouteBuilder::from("direct:in")
3047            .route_id("validate-prefix-test")
3048            .validate("validator:schemas/order.xsd")
3049            .build()
3050            .unwrap();
3051
3052        assert!(matches!(
3053            &route.steps()[0],
3054            BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
3055        ));
3056    }
3057
3058    #[test]
3059    fn test_load_balance_builder_weighted_failover_parallel_config() {
3060        let route = RouteBuilder::from("direct:start")
3061            .route_id("lb-weighted-failover-parallel")
3062            .load_balance()
3063            .weighted(vec![
3064                ("direct:a".to_string(), 3),
3065                ("direct:b".to_string(), 1),
3066            ])
3067            .failover()
3068            .parallel(true)
3069            .to("mock:result")
3070            .end_load_balance()
3071            .build()
3072            .unwrap();
3073
3074        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3075            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3076            assert!(config.parallel);
3077        } else {
3078            panic!("Expected LoadBalance step");
3079        }
3080    }
3081
3082    #[test]
3083    fn test_multicast_builder_all_config_setters() {
3084        let route = RouteBuilder::from("direct:start")
3085            .route_id("multicast-config-test")
3086            .multicast()
3087            .parallel(true)
3088            .parallel_limit(4)
3089            .stop_on_exception(true)
3090            .timeout(Duration::from_millis(300))
3091            .aggregation(MulticastStrategy::Original)
3092            .to("mock:a")
3093            .end_multicast()
3094            .build()
3095            .unwrap();
3096
3097        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3098            assert!(config.parallel);
3099            assert_eq!(config.parallel_limit, Some(4));
3100            assert!(config.stop_on_exception);
3101            assert_eq!(config.timeout, Some(Duration::from_millis(300)));
3102            assert!(matches!(config.aggregation, MulticastStrategy::Original));
3103        } else {
3104            panic!("Expected Multicast step");
3105        }
3106    }
3107
3108    #[test]
3109    fn test_build_canonical_rejects_unsupported_processor_step() {
3110        let err = RouteBuilder::from("direct:start")
3111            .route_id("canonical-reject")
3112            .set_header("k", Value::String("v".into()))
3113            .build_canonical()
3114            .unwrap_err();
3115
3116        assert!(format!("{err}").contains("does not support step `processor`"));
3117    }
3118
3119    // ── LoadBalance strategy-specific tests ─────────────────────────────────────
3120
3121    #[test]
3122    fn test_load_balance_builder_weighted_strategy() {
3123        let route = RouteBuilder::from("direct:start")
3124            .route_id("lb-weighted")
3125            .load_balance()
3126            .weighted(vec![
3127                ("direct:a".to_string(), 5),
3128                ("direct:b".to_string(), 2),
3129                ("direct:c".to_string(), 1),
3130            ])
3131            .to("mock:result")
3132            .end_load_balance()
3133            .build()
3134            .unwrap();
3135
3136        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3137            assert!(matches!(config.strategy, LoadBalanceStrategy::Weighted(_)));
3138        } else {
3139            panic!("Expected LoadBalance step");
3140        }
3141    }
3142
3143    #[test]
3144    fn test_load_balance_builder_failover_strategy() {
3145        let route = RouteBuilder::from("direct:start")
3146            .route_id("lb-failover")
3147            .load_balance()
3148            .failover()
3149            .to("mock:primary")
3150            .end_load_balance()
3151            .build()
3152            .unwrap();
3153
3154        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3155            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3156            assert!(!config.parallel);
3157        } else {
3158            panic!("Expected LoadBalance step");
3159        }
3160    }
3161
3162    #[test]
3163    fn test_load_balance_builder_parallel_false_explicit() {
3164        let route = RouteBuilder::from("direct:start")
3165            .route_id("lb-parallel-false")
3166            .load_balance()
3167            .round_robin()
3168            .parallel(false)
3169            .to("mock:result")
3170            .end_load_balance()
3171            .build()
3172            .unwrap();
3173
3174        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3175            assert!(!config.parallel);
3176        } else {
3177            panic!("Expected LoadBalance step");
3178        }
3179    }
3180
3181    // ── FilterInSplitBuilder tests ──────────────────────────────────────────────
3182
3183    #[test]
3184    fn test_filter_in_split_builder_typestate() {
3185        use camel_api::splitter::{SplitterConfig, split_body_lines};
3186
3187        let definition = RouteBuilder::from("timer:test")
3188            .route_id("filter-in-split")
3189            .split(SplitterConfig::new(split_body_lines()))
3190            .filter(|_ex| true)
3191            .to("mock:filtered")
3192            .end_filter()
3193            .end_split()
3194            .build()
3195            .unwrap();
3196
3197        assert_eq!(definition.steps().len(), 1);
3198        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3199            assert_eq!(steps.len(), 1);
3200            assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3201        } else {
3202            panic!("Expected Split step");
3203        }
3204    }
3205
3206    #[test]
3207    fn test_filter_in_split_builder_multiple_steps() {
3208        use camel_api::splitter::{SplitterConfig, split_body_lines};
3209
3210        let definition = RouteBuilder::from("timer:test")
3211            .route_id("filter-in-split-multi")
3212            .split(SplitterConfig::new(split_body_lines()))
3213            .to("mock:before-filter")
3214            .filter(|_ex| true)
3215            .to("mock:inside-filter")
3216            .end_filter()
3217            .to("mock:after-filter")
3218            .end_split()
3219            .build()
3220            .unwrap();
3221
3222        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3223            // To("before-filter") + Filter{...} + To("after-filter") = 3
3224            assert_eq!(steps.len(), 3);
3225        } else {
3226            panic!("Expected Split step");
3227        }
3228    }
3229
3230    // ── build_canonical tests ───────────────────────────────────────────────────
3231
3232    #[test]
3233    fn test_build_canonical_with_circuit_breaker() {
3234        use camel_api::circuit_breaker::CircuitBreakerConfig;
3235
3236        let spec = RouteBuilder::from("direct:start")
3237            .route_id("canonical-cb")
3238            .circuit_breaker(CircuitBreakerConfig::new().failure_threshold(10))
3239            .to("mock:result")
3240            .build_canonical()
3241            .unwrap();
3242
3243        let cb = spec.circuit_breaker.expect("circuit breaker should be set");
3244        assert_eq!(cb.failure_threshold, 10);
3245    }
3246
3247    #[test]
3248    fn test_build_canonical_rejects_custom_split_aggregation() {
3249        use camel_api::splitter::{SplitterConfig, split_body_lines};
3250
3251        let err = RouteBuilder::from("direct:start")
3252            .route_id("canonical-custom-split")
3253            .split(SplitterConfig::new(split_body_lines()).aggregation(
3254                camel_api::splitter::AggregationStrategy::Custom(Arc::new(|_, ex| ex)),
3255            ))
3256            .to("mock:frag")
3257            .end_split()
3258            .build_canonical()
3259            .unwrap_err();
3260
3261        // Split with closure-based expression is rejected in canonical v1.
3262        assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3263    }
3264
3265    #[test]
3266    fn test_build_canonical_rejects_custom_aggregate_strategy() {
3267        let err = RouteBuilder::from("direct:start")
3268            .route_id("canonical-custom-agg")
3269            .aggregate(
3270                AggregatorConfig::correlate_by("key")
3271                    .complete_when_size(2)
3272                    .strategy(AggregationStrategy::Custom(Arc::new(|_, ex| ex)))
3273                    .build()
3274                    .unwrap(),
3275            )
3276            .build_canonical()
3277            .unwrap_err();
3278
3279        assert!(format!("{err}").contains("custom aggregate strategy"));
3280    }
3281
3282    #[test]
3283    fn test_build_canonical_rejects_fn_correlation_strategy() {
3284        let err = RouteBuilder::from("direct:start")
3285            .route_id("canonical-fn-corr")
3286            .aggregate(AggregatorConfig {
3287                header_name: "key".to_string(),
3288                completion: CompletionMode::Single(CompletionCondition::Size(1)),
3289                correlation: CorrelationStrategy::Fn(Arc::new(|_| Some("key".to_string()))),
3290                strategy: AggregationStrategy::CollectAll,
3291                max_buckets: None,
3292                bucket_ttl: None,
3293                force_completion_on_stop: false,
3294                discard_on_timeout: false,
3295            })
3296            .build_canonical()
3297            .unwrap_err();
3298
3299        assert!(format!("{err}").contains("Fn correlation strategy"));
3300    }
3301
3302    #[test]
3303    fn test_build_canonical_rejects_predicate_completion() {
3304        let err = RouteBuilder::from("direct:start")
3305            .route_id("canonical-pred-completion")
3306            .aggregate(AggregatorConfig {
3307                header_name: "key".to_string(),
3308                completion: CompletionMode::Single(CompletionCondition::Predicate(Arc::new(
3309                    |_| false,
3310                ))),
3311                correlation: CorrelationStrategy::HeaderName("key".to_string()),
3312                strategy: AggregationStrategy::CollectAll,
3313                max_buckets: None,
3314                bucket_ttl: None,
3315                force_completion_on_stop: false,
3316                discard_on_timeout: false,
3317            })
3318            .build_canonical()
3319            .unwrap_err();
3320
3321        assert!(format!("{err}").contains("predicate completion"));
3322    }
3323
3324    #[test]
3325    fn test_build_canonical_with_expression_correlation() {
3326        let spec = RouteBuilder::from("direct:start")
3327            .route_id("canonical-expr-corr")
3328            .aggregate(AggregatorConfig {
3329                header_name: "key".to_string(),
3330                completion: CompletionMode::Single(CompletionCondition::Size(1)),
3331                correlation: CorrelationStrategy::Expression {
3332                    expr: "header.key".to_string(),
3333                    language: "simple".to_string(),
3334                },
3335                strategy: AggregationStrategy::CollectAll,
3336                max_buckets: None,
3337                bucket_ttl: None,
3338                force_completion_on_stop: false,
3339                discard_on_timeout: false,
3340            })
3341            .build_canonical()
3342            .unwrap();
3343
3344        assert!(spec.steps.iter().any(|s| matches!(s, CanonicalStepSpec::Aggregate(a) if a.correlation_key == Some("header.key".to_string()))));
3345    }
3346
3347    #[test]
3348    fn test_build_canonical_split_rejected_with_closure_expression() {
3349        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
3350
3351        // Builder-based split uses closure expressions, which are not serializable.
3352        let err = RouteBuilder::from("direct:start")
3353            .route_id("canonical-split-last")
3354            .split(
3355                SplitterConfig::new(split_body_lines()).aggregation(AggregationStrategy::LastWins),
3356            )
3357            .to("mock:frag")
3358            .end_split()
3359            .build_canonical()
3360            .unwrap_err();
3361
3362        assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3363    }
3364
3365    // ── OnExceptionBuilder full chain tests ─────────────────────────────────────
3366
3367    #[test]
3368    fn test_on_exception_full_chain_retry_backoff_jitter_handled_by() {
3369        let definition = RouteBuilder::from("direct:start")
3370            .route_id("on-exception-full")
3371            .dead_letter_channel("log:dlc")
3372            .on_exception(|e| matches!(e, CamelError::Io(_)))
3373            .retry(5)
3374            .with_backoff(Duration::from_millis(10), 2.0, Duration::from_millis(500))
3375            .with_jitter(0.3)
3376            .handled_by("log:io-handler")
3377            .end_on_exception()
3378            .to("mock:out")
3379            .build()
3380            .unwrap();
3381
3382        let cfg = definition
3383            .error_handler_config()
3384            .expect("error handler should be set");
3385        assert_eq!(cfg.policies.len(), 1);
3386        let policy = &cfg.policies[0];
3387        let retry = policy.retry.as_ref().expect("retry should be set");
3388        assert_eq!(retry.max_attempts, 5);
3389        assert_eq!(retry.initial_delay, Duration::from_millis(10));
3390        assert_eq!(retry.multiplier, 2.0);
3391        assert_eq!(retry.max_delay, Duration::from_millis(500));
3392        assert!((retry.jitter_factor - 0.3).abs() < f64::EPSILON);
3393        assert_eq!(policy.handled_by.as_deref(), Some("log:io-handler"));
3394    }
3395
3396    #[test]
3397    fn test_on_exception_jitter_clamped_to_valid_range() {
3398        let definition = RouteBuilder::from("direct:start")
3399            .route_id("jitter-clamp")
3400            .on_exception(|_e| true)
3401            .retry(1)
3402            .with_jitter(5.0)
3403            .end_on_exception()
3404            .to("mock:out")
3405            .build()
3406            .unwrap();
3407
3408        let cfg = definition.error_handler_config().unwrap();
3409        let retry = cfg.policies[0].retry.as_ref().unwrap();
3410        assert!((retry.jitter_factor - 1.0).abs() < f64::EPSILON);
3411    }
3412
3413    // ── StepAccumulator: process_fn, convert_body_to, bean ──────────────────────
3414
3415    #[test]
3416    fn test_builder_process_fn_adds_processor_step() {
3417        use camel_api::BoxProcessorExt;
3418        let processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
3419        let definition = RouteBuilder::from("timer:tick")
3420            .route_id("process-fn-test")
3421            .process_fn(processor)
3422            .build()
3423            .unwrap();
3424
3425        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3426    }
3427
3428    #[test]
3429    fn test_builder_convert_body_to_adds_processor_step() {
3430        let definition = RouteBuilder::from("timer:tick")
3431            .route_id("convert-body-test")
3432            .convert_body_to(BodyType::Json)
3433            .build()
3434            .unwrap();
3435
3436        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3437    }
3438
3439    #[test]
3440    fn test_builder_bean_adds_bean_step() {
3441        let definition = RouteBuilder::from("timer:tick")
3442            .route_id("bean-test")
3443            .bean("myBean", "process")
3444            .build()
3445            .unwrap();
3446
3447        assert!(
3448            matches!(&definition.steps()[0], BuilderStep::Bean { name, method }
3449            if name == "myBean" && method == "process")
3450        );
3451    }
3452
3453    // ── Throttle strategy-specific tests ────────────────────────────────────────
3454
3455    #[test]
3456    fn test_throttle_builder_delay_strategy() {
3457        let definition = RouteBuilder::from("timer:tick")
3458            .route_id("throttle-delay")
3459            .throttle(10, Duration::from_secs(1))
3460            .strategy(ThrottleStrategy::Delay)
3461            .to("mock:result")
3462            .end_throttle()
3463            .build()
3464            .unwrap();
3465
3466        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3467            assert_eq!(config.strategy, ThrottleStrategy::Delay);
3468        } else {
3469            panic!("Expected Throttle step");
3470        }
3471    }
3472
3473    #[test]
3474    fn test_throttle_builder_drop_strategy() {
3475        let definition = RouteBuilder::from("timer:tick")
3476            .route_id("throttle-drop")
3477            .throttle(10, Duration::from_secs(1))
3478            .strategy(ThrottleStrategy::Drop)
3479            .to("mock:result")
3480            .end_throttle()
3481            .build()
3482            .unwrap();
3483
3484        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3485            assert_eq!(config.strategy, ThrottleStrategy::Drop);
3486        } else {
3487            panic!("Expected Throttle step");
3488        }
3489    }
3490
3491    // ── LoopInLoopBuilder with loop_while ───────────────────────────────────────
3492
3493    #[test]
3494    fn test_nested_loop_while_builder() {
3495        use camel_api::loop_eip::LoopMode;
3496
3497        let def = RouteBuilder::from("direct:start")
3498            .route_id("nested-loop-while")
3499            .loop_count(2)
3500            .to("mock:outer")
3501            .loop_while(|_ex| true)
3502            .to("mock:inner")
3503            .end_loop()
3504            .end_loop()
3505            .build()
3506            .unwrap();
3507
3508        assert_eq!(def.steps().len(), 1);
3509        if let BuilderStep::Loop { steps, .. } = &def.steps()[0] {
3510            assert_eq!(steps.len(), 2);
3511            if let BuilderStep::Loop { config, .. } = &steps[1] {
3512                assert!(matches!(config.mode, LoopMode::While(_)));
3513            } else {
3514                panic!("Expected inner Loop step");
3515            }
3516        } else {
3517            panic!("Expected outer Loop step");
3518        }
3519    }
3520
3521    // ── Choice with multiple whens + otherwise ──────────────────────────────────
3522
3523    #[test]
3524    fn test_choice_builder_multiple_whens_with_otherwise() {
3525        let definition = RouteBuilder::from("timer:tick")
3526            .route_id("choice-multi-otherwise")
3527            .choice()
3528            .when(|ex: &Exchange| ex.input.header("a").is_some())
3529            .to("mock:a")
3530            .end_when()
3531            .when(|ex: &Exchange| ex.input.header("b").is_some())
3532            .to("mock:b")
3533            .end_when()
3534            .when(|ex: &Exchange| ex.input.header("c").is_some())
3535            .to("mock:c")
3536            .end_when()
3537            .otherwise()
3538            .to("mock:fallback")
3539            .end_otherwise()
3540            .end_choice()
3541            .build()
3542            .unwrap();
3543
3544        if let BuilderStep::Choice { whens, otherwise } = &definition.steps()[0] {
3545            assert_eq!(whens.len(), 3);
3546            assert!(otherwise.is_some());
3547            assert_eq!(otherwise.as_ref().unwrap().len(), 1);
3548        } else {
3549            panic!("Expected Choice step");
3550        }
3551    }
3552
3553    // ── Multicast individual config tests ───────────────────────────────────────
3554
3555    #[test]
3556    fn test_multicast_builder_parallel_only() {
3557        let route = RouteBuilder::from("direct:start")
3558            .route_id("multicast-parallel")
3559            .multicast()
3560            .parallel(true)
3561            .to("mock:a")
3562            .end_multicast()
3563            .build()
3564            .unwrap();
3565
3566        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3567            assert!(config.parallel);
3568            assert_eq!(config.parallel_limit, None);
3569        } else {
3570            panic!("Expected Multicast step");
3571        }
3572    }
3573
3574    #[test]
3575    fn test_multicast_builder_timeout_only() {
3576        let route = RouteBuilder::from("direct:start")
3577            .route_id("multicast-timeout")
3578            .multicast()
3579            .timeout(Duration::from_secs(5))
3580            .to("mock:a")
3581            .end_multicast()
3582            .build()
3583            .unwrap();
3584
3585        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3586            assert_eq!(config.timeout, Some(Duration::from_secs(5)));
3587        } else {
3588            panic!("Expected Multicast step");
3589        }
3590    }
3591
3592    #[test]
3593    fn test_multicast_builder_aggregation_collect_all() {
3594        let route = RouteBuilder::from("direct:start")
3595            .route_id("multicast-collect")
3596            .multicast()
3597            .aggregation(MulticastStrategy::CollectAll)
3598            .to("mock:a")
3599            .end_multicast()
3600            .build()
3601            .unwrap();
3602
3603        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3604            assert!(matches!(config.aggregation, MulticastStrategy::CollectAll));
3605        } else {
3606            panic!("Expected Multicast step");
3607        }
3608    }
3609
3610    // ── extract_completion_fields: Any mode with multiple conditions ────────────
3611
3612    #[test]
3613    fn test_build_canonical_aggregate_any_completion_mode() {
3614        let spec = RouteBuilder::from("direct:start")
3615            .route_id("canonical-any-completion")
3616            .aggregate(
3617                AggregatorConfig::correlate_by("key")
3618                    .complete_on_size_or_timeout(10, Duration::from_secs(30))
3619                    .build()
3620                    .unwrap(),
3621            )
3622            .build_canonical()
3623            .unwrap();
3624
3625        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3626            assert_eq!(agg.completion_size, Some(10));
3627            assert_eq!(agg.completion_timeout_ms, Some(30_000));
3628        } else {
3629            panic!("Expected Aggregate step");
3630        }
3631    }
3632
3633    #[test]
3634    fn test_build_canonical_aggregate_timeout_completion() {
3635        let spec = RouteBuilder::from("direct:start")
3636            .route_id("canonical-timeout-completion")
3637            .aggregate(
3638                AggregatorConfig::correlate_by("key")
3639                    .complete_on_timeout(Duration::from_millis(500))
3640                    .build()
3641                    .unwrap(),
3642            )
3643            .build_canonical()
3644            .unwrap();
3645
3646        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3647            assert_eq!(agg.completion_size, None);
3648            assert_eq!(agg.completion_timeout_ms, Some(500));
3649        } else {
3650            panic!("Expected Aggregate step");
3651        }
3652    }
3653
3654    // ── canonicalize_aggregate: discard_on_timeout and force_completion_on_stop ─
3655
3656    #[test]
3657    fn test_build_canonical_aggregate_discard_on_timeout() {
3658        use camel_api::aggregator::AggregatorConfig;
3659
3660        let spec = RouteBuilder::from("direct:start")
3661            .route_id("canonical-discard-timeout")
3662            .aggregate(
3663                AggregatorConfig::correlate_by("key")
3664                    .complete_when_size(1)
3665                    .discard_on_timeout(true)
3666                    .build()
3667                    .unwrap(),
3668            )
3669            .build_canonical()
3670            .unwrap();
3671
3672        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3673            assert_eq!(agg.discard_on_timeout, Some(true));
3674        } else {
3675            panic!("Expected Aggregate step");
3676        }
3677    }
3678
3679    #[test]
3680    fn test_build_canonical_aggregate_force_completion_on_stop() {
3681        use camel_api::aggregator::AggregatorConfig;
3682
3683        let spec = RouteBuilder::from("direct:start")
3684            .route_id("canonical-force-stop")
3685            .aggregate(
3686                AggregatorConfig::correlate_by("key")
3687                    .complete_when_size(1)
3688                    .force_completion_on_stop(true)
3689                    .build()
3690                    .unwrap(),
3691            )
3692            .build_canonical()
3693            .unwrap();
3694
3695        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3696            assert_eq!(agg.force_completion_on_stop, Some(true));
3697        } else {
3698            panic!("Expected Aggregate step");
3699        }
3700    }
3701
3702    // ── build_canonical: max_buckets and bucket_ttl ─────────────────────────────
3703
3704    #[test]
3705    fn test_build_canonical_aggregate_max_buckets_and_ttl() {
3706        use camel_api::aggregator::AggregatorConfig;
3707
3708        let spec = RouteBuilder::from("direct:start")
3709            .route_id("canonical-buckets-ttl")
3710            .aggregate(
3711                AggregatorConfig::correlate_by("key")
3712                    .complete_when_size(1)
3713                    .max_buckets(100)
3714                    .bucket_ttl(Duration::from_secs(60))
3715                    .build()
3716                    .unwrap(),
3717            )
3718            .build_canonical()
3719            .unwrap();
3720
3721        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3722            assert_eq!(agg.max_buckets, Some(100));
3723            assert_eq!(agg.bucket_ttl_ms, Some(60_000));
3724        } else {
3725            panic!("Expected Aggregate step");
3726        }
3727    }
3728
3729    // ── SplitBuilder with filter inside ─────────────────────────────────────────
3730
3731    #[test]
3732    fn test_split_builder_with_filter_inside() {
3733        use camel_api::splitter::{SplitterConfig, split_body_lines};
3734
3735        let definition = RouteBuilder::from("timer:test")
3736            .route_id("split-with-filter")
3737            .split(SplitterConfig::new(split_body_lines()))
3738            .filter(|_ex| true)
3739            .to("mock:filtered-frag")
3740            .end_filter()
3741            .end_split()
3742            .build()
3743            .unwrap();
3744
3745        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3746            assert_eq!(steps.len(), 1);
3747            assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3748        } else {
3749            panic!("Expected Split step");
3750        }
3751    }
3752
3753    // ── WireTap additional tests ────────────────────────────────────────────────
3754
3755    #[test]
3756    fn test_wire_tap_multiple_taps() {
3757        let definition = RouteBuilder::from("timer:tick")
3758            .route_id("multi-wire-tap")
3759            .wire_tap("mock:tap1")
3760            .wire_tap("mock:tap2")
3761            .to("mock:result")
3762            .build()
3763            .unwrap();
3764
3765        assert_eq!(definition.steps().len(), 3);
3766        assert!(
3767            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap1")
3768        );
3769        assert!(
3770            matches!(&definition.steps()[1], BuilderStep::WireTap { uri } if uri == "mock:tap2")
3771        );
3772    }
3773
3774    // ── Error handler: explicit config after shorthand → Mixed mode ─────────────
3775
3776    #[test]
3777    fn test_builder_shorthand_then_explicit_mixed_mode() {
3778        let result = RouteBuilder::from("direct:start")
3779            .route_id("mixed-mode-2")
3780            .dead_letter_channel("log:dlc")
3781            .error_handler(ErrorHandlerConfig::log_only())
3782            .to("mock:out")
3783            .build();
3784
3785        let err = result.err().expect("mixed mode should fail");
3786        assert!(format!("{err}").contains("mixed error handler modes"));
3787    }
3788
3789    // ── build_canonical: empty from_uri error ───────────────────────────────────
3790
3791    #[test]
3792    fn test_build_canonical_empty_from_uri_errors() {
3793        let result = RouteBuilder::from("").route_id("test").build_canonical();
3794        assert!(result.is_err());
3795    }
3796
3797    #[test]
3798    fn test_build_canonical_missing_route_id_errors() {
3799        let result = RouteBuilder::from("direct:start").build_canonical();
3800        assert!(result.is_err());
3801        let err = result.unwrap_err().to_string();
3802        assert!(err.contains("route_id"));
3803    }
3804
3805    // ── SplitBuilder: aggregate inside split ────────────────────────────────────
3806
3807    #[test]
3808    fn test_split_builder_with_aggregate_inside() {
3809        use camel_api::aggregator::AggregatorConfig;
3810        use camel_api::splitter::{SplitterConfig, split_body_lines};
3811
3812        let definition = RouteBuilder::from("timer:test")
3813            .route_id("split-agg")
3814            .split(SplitterConfig::new(split_body_lines()))
3815            .aggregate(
3816                AggregatorConfig::correlate_by("frag-key")
3817                    .complete_when_size(3)
3818                    .build()
3819                    .unwrap(),
3820            )
3821            .end_split()
3822            .build()
3823            .unwrap();
3824
3825        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3826            assert_eq!(steps.len(), 1);
3827            assert!(matches!(&steps[0], BuilderStep::Aggregate { .. }));
3828        } else {
3829            panic!("Expected Split step");
3830        }
3831    }
3832
3833    // ── Throttle: steps collected inside throttle scope ─────────────────────────
3834
3835    #[test]
3836    fn test_throttle_builder_with_steps_inside() {
3837        let definition = RouteBuilder::from("timer:tick")
3838            .route_id("throttle-steps")
3839            .throttle(10, Duration::from_secs(1))
3840            .set_header("throttled", Value::Bool(true))
3841            .to("mock:throttled")
3842            .end_throttle()
3843            .build()
3844            .unwrap();
3845
3846        if let BuilderStep::Throttle { steps, .. } = &definition.steps()[0] {
3847            assert_eq!(steps.len(), 2);
3848        } else {
3849            panic!("Expected Throttle step");
3850        }
3851    }
3852
3853    // ── LoadBalance: steps collected inside scope ───────────────────────────────
3854
3855    #[test]
3856    fn test_load_balance_builder_with_steps_inside() {
3857        let definition = RouteBuilder::from("timer:tick")
3858            .route_id("lb-steps")
3859            .load_balance()
3860            .round_robin()
3861            .set_header("lb", Value::Bool(true))
3862            .to("mock:lb")
3863            .end_load_balance()
3864            .build()
3865            .unwrap();
3866
3867        if let BuilderStep::LoadBalance { steps, .. } = &definition.steps()[0] {
3868            assert_eq!(steps.len(), 2);
3869        } else {
3870            panic!("Expected LoadBalance step");
3871        }
3872    }
3873
3874    // ── Multicast: steps collected inside scope ─────────────────────────────────
3875
3876    #[test]
3877    fn test_multicast_builder_with_steps_inside() {
3878        let definition = RouteBuilder::from("timer:tick")
3879            .route_id("multicast-steps")
3880            .multicast()
3881            .set_header("mc", Value::Bool(true))
3882            .to("mock:multicast")
3883            .end_multicast()
3884            .build()
3885            .unwrap();
3886
3887        if let BuilderStep::Multicast { steps, .. } = &definition.steps()[0] {
3888            assert_eq!(steps.len(), 2);
3889        } else {
3890            panic!("Expected Multicast step");
3891        }
3892    }
3893
3894    // ── LoopBuilder: steps collected inside loop scope ──────────────────────────
3895
3896    #[test]
3897    fn test_loop_builder_with_steps_inside() {
3898        let definition = RouteBuilder::from("timer:tick")
3899            .route_id("loop-steps")
3900            .loop_count(3)
3901            .set_header("loop", Value::Bool(true))
3902            .to("mock:loop")
3903            .end_loop()
3904            .build()
3905            .unwrap();
3906
3907        if let BuilderStep::Loop { steps, .. } = &definition.steps()[0] {
3908            assert_eq!(steps.len(), 2);
3909        } else {
3910            panic!("Expected Loop step");
3911        }
3912    }
3913
3914    // ── canonical_step_name coverage for remaining variants ─────────────────────
3915
3916    #[test]
3917    fn test_build_canonical_rejects_loop_step() {
3918        let err = RouteBuilder::from("direct:start")
3919            .route_id("canonical-loop")
3920            .loop_count(3)
3921            .to("mock:loop")
3922            .end_loop()
3923            .build_canonical()
3924            .unwrap_err();
3925
3926        assert!(format!("{err}").contains("does not support step `loop`"));
3927    }
3928
3929    #[test]
3930    fn test_build_canonical_rejects_multicast_step() {
3931        let err = RouteBuilder::from("direct:start")
3932            .route_id("canonical-multicast")
3933            .multicast()
3934            .to("mock:a")
3935            .end_multicast()
3936            .build_canonical()
3937            .unwrap_err();
3938
3939        assert!(format!("{err}").contains("does not support step `multicast`"));
3940    }
3941
3942    #[test]
3943    fn test_build_canonical_rejects_throttle_step() {
3944        let err = RouteBuilder::from("direct:start")
3945            .route_id("canonical-throttle")
3946            .throttle(10, Duration::from_secs(1))
3947            .to("mock:result")
3948            .end_throttle()
3949            .build_canonical()
3950            .unwrap_err();
3951
3952        assert!(format!("{err}").contains("does not support step `throttle`"));
3953    }
3954
3955    #[test]
3956    fn test_build_canonical_rejects_load_balancer_step() {
3957        let err = RouteBuilder::from("direct:start")
3958            .route_id("canonical-lb")
3959            .load_balance()
3960            .round_robin()
3961            .to("mock:result")
3962            .end_load_balance()
3963            .build_canonical()
3964            .unwrap_err();
3965
3966        assert!(format!("{err}").contains("does not support step `load_balancer`"));
3967    }
3968
3969    #[test]
3970    fn test_build_canonical_rejects_bean_step() {
3971        let err = RouteBuilder::from("direct:start")
3972            .route_id("canonical-bean")
3973            .bean("myBean", "process")
3974            .build_canonical()
3975            .unwrap_err();
3976
3977        assert!(format!("{err}").contains("does not support step `bean`"));
3978    }
3979
3980    #[test]
3981    fn test_build_canonical_rejects_script_step() {
3982        let err = RouteBuilder::from("direct:start")
3983            .route_id("canonical-script")
3984            .script("rhai", "x = 1")
3985            .build_canonical()
3986            .unwrap_err();
3987
3988        assert!(format!("{err}").contains("does not support step `script`"));
3989    }
3990
3991    #[test]
3992    fn test_build_canonical_accepts_delay_step() {
3993        let spec = RouteBuilder::from("direct:start")
3994            .route_id("canonical-delay")
3995            .delay(Duration::from_millis(100))
3996            .build_canonical()
3997            .unwrap();
3998
3999        assert!(
4000            spec.steps.iter().any(
4001                |s| matches!(s, CanonicalStepSpec::Delay { delay_ms, .. } if *delay_ms == 100)
4002            )
4003        );
4004    }
4005
4006    #[test]
4007    fn test_build_canonical_accepts_wire_tap_step() {
4008        let spec = RouteBuilder::from("direct:start")
4009            .route_id("canonical-wiretap")
4010            .wire_tap("mock:tap")
4011            .build_canonical()
4012            .unwrap();
4013
4014        assert!(
4015            spec.steps
4016                .iter()
4017                .any(|s| matches!(s, CanonicalStepSpec::WireTap { uri } if uri == "mock:tap"))
4018        );
4019    }
4020
4021    #[test]
4022    fn test_build_canonical_rejects_dynamic_router_step() {
4023        let err = RouteBuilder::from("direct:start")
4024            .route_id("canonical-dyn-router")
4025            .dynamic_router(Arc::new(|_| Some("mock:a".to_string())))
4026            .build_canonical()
4027            .unwrap_err();
4028
4029        assert!(format!("{err}").contains("does not support step `dynamic_router`"));
4030    }
4031
4032    #[test]
4033    fn test_build_canonical_rejects_routing_slip_step() {
4034        let err = RouteBuilder::from("direct:start")
4035            .route_id("canonical-routing-slip")
4036            .routing_slip(Arc::new(|_| Some("mock:a".to_string())))
4037            .build_canonical()
4038            .unwrap_err();
4039
4040        assert!(format!("{err}").contains("does not support step `routing_slip`"));
4041    }
4042
4043    #[test]
4044    fn test_build_canonical_rejects_recipient_list_step() {
4045        let err = RouteBuilder::from("direct:start")
4046            .route_id("canonical-recipient")
4047            .recipient_list(Arc::new(|_| "mock:a".to_string()))
4048            .build_canonical()
4049            .unwrap_err();
4050
4051        assert!(format!("{err}").contains("does not support step `recipient_list`"));
4052    }
4053
4054    // ── extract_completion_fields: Any mode with predicate → error ──────────────
4055
4056    #[test]
4057    fn test_build_canonical_rejects_any_mode_with_predicate() {
4058        let err = RouteBuilder::from("direct:start")
4059            .route_id("canonical-any-pred")
4060            .aggregate(AggregatorConfig {
4061                header_name: "key".to_string(),
4062                completion: CompletionMode::Any(vec![
4063                    CompletionCondition::Size(5),
4064                    CompletionCondition::Predicate(Arc::new(|_| false)),
4065                ]),
4066                correlation: CorrelationStrategy::HeaderName("key".to_string()),
4067                strategy: AggregationStrategy::CollectAll,
4068                max_buckets: None,
4069                bucket_ttl: None,
4070                force_completion_on_stop: false,
4071                discard_on_timeout: false,
4072            })
4073            .build_canonical()
4074            .unwrap_err();
4075
4076        assert!(format!("{err}").contains("predicate completion"));
4077    }
4078
4079    // ── BUILDER-004: Validation errors for missing required fields ────────────
4080
4081    #[test]
4082    fn test_builder_validation_missing_from_uri() {
4083        let result = RouteBuilder::from("")
4084            .route_id("missing-uri-route")
4085            .to("log:info")
4086            .build();
4087        assert!(result.is_err(), "empty from URI should fail validation");
4088        let err = result.err().unwrap().to_string();
4089        assert!(
4090            err.contains("'from'") || err.contains("URI"),
4091            "error should mention from/URI, got: {err}"
4092        );
4093    }
4094
4095    #[test]
4096    fn test_builder_validation_invalid_step_uri_scheme() {
4097        let result = RouteBuilder::from("timer:tick")
4098            .route_id("bad-step-route")
4099            .to("not-a-valid-uri") // no scheme
4100            .build();
4101        // The builder itself accepts any URI string; validation happens at
4102        // resolution time. Verify the build succeeds (step URI is deferred).
4103        assert!(
4104            result.is_ok(),
4105            "builder should accept opaque step URIs; resolution happens later"
4106        );
4107    }
4108
4109    // ── BUILDER-006: Duplicate route IDs ──────────────────────────────────────
4110
4111    #[test]
4112    fn test_builder_duplicate_route_ids_produce_identical_definitions() {
4113        // The builder itself doesn't check for duplicates (that's context-level).
4114        // Verify both builds succeed with the same ID — detection is TODO(BUILDER-006).
4115        let route1 = RouteBuilder::from("direct:a")
4116            .route_id("dup-route")
4117            .to("mock:out")
4118            .build();
4119        let route2 = RouteBuilder::from("direct:b")
4120            .route_id("dup-route")
4121            .to("mock:out")
4122            .build();
4123
4124        assert!(route1.is_ok());
4125        assert!(route2.is_ok());
4126        assert_eq!(route1.unwrap().route_id(), route2.unwrap().route_id());
4127    }
4128}