Skip to main content

camel_builder/
lib.rs

1//! Fluent builder API for constructing Camel routes programmatically with EIP patterns.
2//!
3//! Main types: `RouteBuilder`, `StepAccumulator`, `SplitBuilder`, `ChoiceBuilder`, `MulticastBuilder`,
4//! `ThrottleBuilder`, `LoopBuilder`, `LoadBalancerBuilder`, `OnExceptionBuilder`.
5
6use camel_api::DelayConfig;
7use camel_api::aggregator::{
8    AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
9};
10use camel_api::body::Body;
11use camel_api::body_converter::BodyType;
12use camel_api::circuit_breaker::CircuitBreakerConfig;
13use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
14use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
15use camel_api::load_balancer::LoadBalancerConfig;
16use camel_api::loop_eip::{LoopConfig, LoopMode};
17use camel_api::multicast::{MulticastConfig, MulticastStrategy};
18use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
19use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
20use camel_api::splitter::SplitterConfig;
21use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
22use camel_api::{
23    BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
24    ProcessorFn, Value,
25    runtime::{
26        CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
27        CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
28        CanonicalWhenSpec,
29    },
30};
31use camel_component_api::ConcurrencyModel;
32use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
33use camel_processor::{
34    ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
35    StreamCacheService, UnmarshalService, builtin_data_format,
36};
37
38/// Shared step-accumulation methods for all builder types.
39///
40/// Implementors provide `steps_mut()` and get step-adding methods for free.
41/// `filter()` and other branching methods are NOT included — they return
42/// different types per builder and stay as per-builder methods.
43pub trait StepAccumulator: Sized {
44    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
45
46    fn to(mut self, endpoint: impl Into<String>) -> Self {
47        self.steps_mut().push(BuilderStep::To(endpoint.into()));
48        self
49    }
50
51    fn process<F, Fut>(mut self, f: F) -> Self
52    where
53        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
54        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
55    {
56        let svc = ProcessorFn::new(f);
57        self.steps_mut()
58            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
59        self
60    }
61
62    fn process_fn(mut self, processor: BoxProcessor) -> Self {
63        self.steps_mut().push(BuilderStep::Processor(processor));
64        self
65    }
66
67    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
68        let svc = SetHeader::new(IdentityProcessor, key, value);
69        self.steps_mut()
70            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
71        self
72    }
73
74    fn map_body<F>(mut self, mapper: F) -> Self
75    where
76        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
77    {
78        let svc = MapBody::new(IdentityProcessor, mapper);
79        self.steps_mut()
80            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
81        self
82    }
83
84    fn set_body<B>(mut self, body: B) -> Self
85    where
86        B: Into<Body> + Clone + Send + Sync + 'static,
87    {
88        let body: Body = body.into();
89        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
90        self.steps_mut()
91            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
92        self
93    }
94
95    /// Apache Camel-compatible alias for [`set_body`](Self::set_body).
96    ///
97    /// Transforms the message body using the given value. Semantically identical
98    /// to `set_body` — provided for familiarity with Apache Camel route DSLs.
99    fn transform<B>(self, body: B) -> Self
100    where
101        B: Into<Body> + Clone + Send + Sync + 'static,
102    {
103        self.set_body(body)
104    }
105
106    fn set_body_fn<F>(mut self, expr: F) -> Self
107    where
108        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
109    {
110        let svc = SetBody::new(IdentityProcessor, expr);
111        self.steps_mut()
112            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
113        self
114    }
115
116    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
117    where
118        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
119    {
120        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
121        self.steps_mut()
122            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
123        self
124    }
125
126    fn aggregate(mut self, config: AggregatorConfig) -> Self {
127        self.steps_mut().push(BuilderStep::Aggregate { config });
128        self
129    }
130
131    /// Stop processing this exchange immediately. No further steps in the
132    /// current pipeline will run.
133    ///
134    /// Can be used at any point in the route: directly on RouteBuilder,
135    /// inside `.filter()`, inside `.split()`, etc.
136    fn stop(mut self) -> Self {
137        self.steps_mut().push(BuilderStep::Stop);
138        self
139    }
140
141    fn delay(mut self, duration: std::time::Duration) -> Self {
142        self.steps_mut().push(BuilderStep::Delay {
143            config: DelayConfig::from_duration(duration),
144        });
145        self
146    }
147
148    fn delay_with_header(
149        mut self,
150        duration: std::time::Duration,
151        header: impl Into<String>,
152    ) -> Self {
153        self.steps_mut().push(BuilderStep::Delay {
154            config: DelayConfig::from_duration_with_header(duration, header),
155        });
156        self
157    }
158
159    /// Log a message at the specified level.
160    ///
161    /// The message will be logged when an exchange passes through this step.
162    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
163        self.steps_mut().push(BuilderStep::Log {
164            level,
165            message: message.into(),
166        });
167        self
168    }
169
170    /// Convert the message body to the target type.
171    ///
172    /// Supported: Text ↔ Json ↔ Bytes. `Body::Stream` always fails.
173    /// Returns `TypeConversionFailed` if conversion is not possible.
174    ///
175    /// # Example
176    /// ```ignore
177    /// route.set_body(Value::String(r#"{"x":1}"#.into()))
178    ///      .convert_body_to(BodyType::Json)
179    ///      .to("direct:next")
180    /// ```
181    fn convert_body_to(mut self, target: BodyType) -> Self {
182        let svc = ConvertBodyTo::new(IdentityProcessor, target);
183        self.steps_mut()
184            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
185        self
186    }
187
188    fn stream_cache(mut self, threshold: usize) -> Self {
189        let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
190        let svc = StreamCacheService::new(IdentityProcessor, config);
191        self.steps_mut()
192            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
193        self
194    }
195
196    /// Materialize `Body::Stream` into `Body::Bytes` using the default threshold (128 KB).
197    ///
198    /// Equivalent to `.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)`.
199    fn stream_cache_default(self) -> Self {
200        self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
201    }
202
203    /// Marshal the message body using the specified data format.
204    ///
205    /// Supported formats: `"json"`, `"xml"`. Returns `Err(CamelError::Config)` if
206    /// the format name is unknown.
207    /// Converts a structured body (e.g., `Body::Json`) to a wire-format body (e.g., `Body::Text`).
208    ///
209    /// # Example
210    /// ```ignore
211    /// route.marshal("json")?.to("direct:next")
212    /// ```
213    fn marshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
214        let name = format.into();
215        let df = builtin_data_format(&name)
216            .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
217        let svc = MarshalService::new(IdentityProcessor, df);
218        self.steps_mut()
219            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
220        Ok(self)
221    }
222
223    /// Unmarshal the message body using the specified data format.
224    ///
225    /// Supported formats: `"json"`, `"xml"`. Returns `Err(CamelError::Config)` if
226    /// the format name is unknown.
227    /// Converts a wire-format body (e.g., `Body::Text`) to a structured body (e.g., `Body::Json`).
228    ///
229    /// # Example
230    /// ```ignore
231    /// route.unmarshal("json")?.to("direct:next")
232    /// ```
233    fn unmarshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
234        let name = format.into();
235        let df = builtin_data_format(&name)
236            .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
237        let svc = UnmarshalService::new(IdentityProcessor, df);
238        self.steps_mut()
239            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
240        Ok(self)
241    }
242
243    /// Validate the exchange body against a schema file.
244    ///
245    /// Shorthand for `to("validator:path")`. Supports XSD, JSON Schema, and YAML.
246    ///
247    /// # Example
248    /// ```ignore
249    /// route.validate("schemas/order.xsd").to("direct:out")
250    /// ```
251    fn validate(mut self, schema_path: impl Into<String>) -> Self {
252        let path = schema_path.into();
253        let uri = if path.starts_with("validator:") {
254            path
255        } else {
256            format!("validator:{path}")
257        };
258        self.steps_mut().push(BuilderStep::To(uri));
259        self
260    }
261
262    /// Execute a script that can modify the exchange (headers, properties, body).
263    ///
264    /// The script has access to `headers`, `properties`, and `body` variables
265    /// and can modify them with assignment syntax: `headers["k"] = v`.
266    ///
267    /// # Example
268    /// ```ignore
269    /// // ignore: requires full CamelContext setup with registered language
270    /// route.script("rhai", r#"headers["tenant"] = "acme"; body = body + "_processed""#)
271    /// ```
272    fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
273        self.steps_mut().push(BuilderStep::Script {
274            language: language.into(),
275            script: script.into(),
276        });
277        self
278    }
279
280    fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
281        self.steps_mut().push(BuilderStep::Bean {
282            name: name.into(),
283            method: method.into(),
284        });
285        self
286    }
287}
288
289/// A fluent builder for constructing routes.
290///
291/// # Example
292///
293/// ```ignore
294/// let definition = RouteBuilder::from("timer:tick?period=1000")
295///     .set_header("source", Value::String("timer".into()))
296///     .filter(|ex| ex.input.body.as_text().is_some())
297///     .to("log:info?showHeaders=true")
298///     .build()?;
299/// ```
300// TODO(BUILDER-003): RouteBuilder does not support clone-and-reuse semantics.
301// Once built, the builder is consumed. Add `Clone` or a `fork()` method to allow
302// reusing a partially-built route as a template for multiple routes.
303pub struct RouteBuilder {
304    from_uri: String,
305    steps: Vec<BuilderStep>,
306    error_handler: Option<ErrorHandlerConfig>,
307    error_handler_mode: ErrorHandlerMode,
308    circuit_breaker_config: Option<CircuitBreakerConfig>,
309    security_policy_config: Option<camel_api::security_policy::SecurityPolicyConfig>,
310    security_authenticator: Option<std::sync::Arc<dyn camel_auth::TokenAuthenticator>>,
311    concurrency: Option<ConcurrencyModel>,
312    route_id: Option<String>,
313    auto_startup: Option<bool>,
314    startup_order: Option<i32>,
315}
316
317#[derive(Default)]
318enum ErrorHandlerMode {
319    #[default]
320    None,
321    ExplicitConfig,
322    Shorthand {
323        dlc_uri: Option<String>,
324        specs: Vec<OnExceptionSpec>,
325    },
326    Mixed,
327}
328
329#[derive(Clone)]
330struct OnExceptionSpec {
331    matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
332    retry: Option<RedeliveryPolicy>,
333    handled_by: Option<String>,
334}
335
336impl RouteBuilder {
337    /// Start building a route from the given source endpoint URI.
338    pub fn from(endpoint: &str) -> Self {
339        Self {
340            from_uri: endpoint.to_string(),
341            steps: Vec::new(),
342            error_handler: None,
343            error_handler_mode: ErrorHandlerMode::None,
344            circuit_breaker_config: None,
345            security_policy_config: None,
346            security_authenticator: None,
347            concurrency: None,
348            route_id: None,
349            auto_startup: None,
350            startup_order: None,
351        }
352    }
353
354    /// Open a filter scope. Only exchanges matching `predicate` will be processed
355    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
356    /// and continue to steps after `.end_filter()`.
357    pub fn filter<F>(self, predicate: F) -> FilterBuilder
358    where
359        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
360    {
361        FilterBuilder {
362            parent: self,
363            predicate: std::sync::Arc::new(predicate),
364            steps: vec![],
365        }
366    }
367
368    /// Open a choice scope for content-based routing.
369    ///
370    /// Within the choice, you can define multiple `.when()` clauses and an
371    /// optional `.otherwise()` clause. The first matching `when` predicate
372    /// determines which sub-pipeline executes.
373    pub fn choice(self) -> ChoiceBuilder {
374        ChoiceBuilder {
375            parent: self,
376            whens: vec![],
377            _otherwise: None,
378        }
379    }
380
381    /// Add a WireTap step that sends a clone of the exchange to the given
382    /// endpoint URI (fire-and-forget). The original exchange continues
383    /// downstream unchanged.
384    pub fn wire_tap(mut self, endpoint: &str) -> Self {
385        self.steps.push(BuilderStep::WireTap {
386            uri: endpoint.to_string(),
387        });
388        self
389    }
390
391    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
392    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
393        self.error_handler_mode = match self.error_handler_mode {
394            ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
395                ErrorHandlerMode::ExplicitConfig
396            }
397            ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
398        };
399        self.error_handler = Some(config);
400        self
401    }
402
403    /// Set a dead letter channel URI for shorthand error handler mode.
404    pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
405        let uri = uri.into();
406        self.error_handler_mode = match self.error_handler_mode {
407            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
408                dlc_uri: Some(uri),
409                specs: Vec::new(),
410            },
411            ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
412                dlc_uri: Some(uri),
413                specs,
414            },
415            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
416        };
417        self
418    }
419
420    /// Add a shorthand exception policy scope. Call `.end_on_exception()` to return to route builder.
421    pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
422    where
423        F: Fn(&CamelError) -> bool + Send + Sync + 'static,
424    {
425        self.error_handler_mode = match self.error_handler_mode {
426            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
427                dlc_uri: None,
428                specs: Vec::new(),
429            },
430            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
431            shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
432        };
433
434        OnExceptionBuilder {
435            parent: self,
436            policy: OnExceptionSpec {
437                matches: std::sync::Arc::new(matches),
438                retry: None,
439                handled_by: None,
440            },
441        }
442    }
443
444    /// Set a circuit breaker for this route.
445    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
446        self.circuit_breaker_config = Some(config);
447        self
448    }
449
450    pub fn security_policy(
451        mut self,
452        config: camel_api::security_policy::SecurityPolicyConfig,
453    ) -> Self {
454        self.security_policy_config = Some(config);
455        self
456    }
457
458    pub fn security_authenticator(
459        mut self,
460        auth: std::sync::Arc<dyn camel_auth::TokenAuthenticator>,
461    ) -> Self {
462        self.security_authenticator = Some(auth);
463        self
464    }
465
466    /// Override the consumer's default concurrency model.
467    ///
468    /// When set, the pipeline spawns a task per exchange, processing them
469    /// concurrently. `max` limits the number of simultaneously active
470    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
471    ///
472    /// # Example
473    /// ```ignore
474    /// RouteBuilder::from("http://0.0.0.0:8080/api")
475    ///     .concurrent(16)  // max 16 in-flight pipeline executions
476    ///     .process(handle_request)
477    ///     .build()
478    /// ```
479    pub fn concurrent(mut self, max: usize) -> Self {
480        let max = if max == 0 { None } else { Some(max) };
481        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
482        self
483    }
484
485    /// Force sequential processing, overriding a concurrent-capable consumer.
486    ///
487    /// Useful for HTTP routes that mutate shared state and need ordering
488    /// guarantees.
489    pub fn sequential(mut self) -> Self {
490        self.concurrency = Some(ConcurrencyModel::Sequential);
491        self
492    }
493
494    /// Set the route ID for this route.
495    ///
496    /// If not set, the route will be assigned an auto-generated ID.
497    pub fn route_id(mut self, id: impl Into<String>) -> Self {
498        self.route_id = Some(id.into());
499        self
500    }
501
502    /// Set whether this route should automatically start when the context starts.
503    ///
504    /// Default is `true`.
505    pub fn auto_startup(mut self, auto: bool) -> Self {
506        self.auto_startup = Some(auto);
507        self
508    }
509
510    /// Set the startup order for this route.
511    ///
512    /// Routes with lower values start first. Default is 1000.
513    pub fn startup_order(mut self, order: i32) -> Self {
514        self.startup_order = Some(order);
515        self
516    }
517
518    /// Begin a Splitter sub-pipeline. Steps added after this call (until
519    /// `.end_split()`) will be executed per-fragment.
520    ///
521    /// Returns a `SplitBuilder` — you cannot call `.build()` until
522    /// `.end_split()` closes the split scope (enforced by the type system).
523    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
524        SplitBuilder {
525            parent: self,
526            config,
527            steps: Vec::new(),
528        }
529    }
530
531    /// Begin a Multicast sub-pipeline. Steps added after this call (until
532    /// `.end_multicast()`) will each receive a copy of the exchange.
533    ///
534    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
535    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
536    pub fn multicast(self) -> MulticastBuilder {
537        MulticastBuilder {
538            parent: self,
539            steps: Vec::new(),
540            config: MulticastConfig::new(),
541        }
542    }
543
544    /// Begin a Throttle sub-pipeline. Rate limits message processing to at most
545    /// `max_requests` per `period`. Steps inside the throttle scope are only
546    /// executed when the rate limit allows.
547    ///
548    /// Returns a `ThrottleBuilder` — you cannot call `.build()` until
549    /// `.end_throttle()` closes the throttle scope (enforced by the type system).
550    pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
551        ThrottleBuilder {
552            parent: self,
553            config: ThrottlerConfig::new(max_requests, period),
554            steps: Vec::new(),
555        }
556    }
557
558    /// Begin a Loop sub-pipeline that iterates a fixed number of times.
559    pub fn loop_count(self, count: usize) -> LoopBuilder {
560        LoopBuilder {
561            parent: self,
562            config: LoopConfig {
563                mode: LoopMode::Count(count),
564            },
565            steps: vec![],
566        }
567    }
568
569    /// Begin a Loop sub-pipeline that iterates while a predicate is true.
570    pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
571    where
572        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
573    {
574        LoopBuilder {
575            parent: self,
576            config: LoopConfig {
577                mode: LoopMode::While(std::sync::Arc::new(predicate)),
578            },
579            steps: vec![],
580        }
581    }
582
583    /// Begin a LoadBalance sub-pipeline. Distributes exchanges across multiple
584    /// endpoints using a configurable strategy (round-robin, random, weighted, failover).
585    ///
586    /// Returns a `LoadBalancerBuilder` — you cannot call `.build()` until
587    /// `.end_load_balance()` closes the load balance scope (enforced by the type system).
588    pub fn load_balance(self) -> LoadBalancerBuilder {
589        LoadBalancerBuilder {
590            parent: self,
591            config: LoadBalancerConfig::round_robin(),
592            steps: Vec::new(),
593        }
594    }
595
596    /// Add a dynamic router step that routes exchanges dynamically based on
597    /// expression evaluation at runtime.
598    ///
599    /// The expression receives the exchange and returns `Some(uri)` to route to
600    /// the next endpoint, or `None` to stop routing.
601    ///
602    /// # Example
603    /// ```ignore
604    /// RouteBuilder::from("timer:tick")
605    ///     .route_id("test-route")
606    ///     .dynamic_router(|ex| {
607    ///         ex.input.header("dest").and_then(|v| v.as_str().map(|s| s.to_string()))
608    ///     })
609    ///     .build()
610    /// ```
611    pub fn dynamic_router(self, expression: RouterExpression) -> Self {
612        self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
613    }
614
615    /// Add a dynamic router step with full configuration.
616    ///
617    /// Allows customization of URI delimiter, cache size, timeout, and other options.
618    pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
619        self.steps.push(BuilderStep::DynamicRouter { config });
620        self
621    }
622
623    pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
624        self.routing_slip_with_config(RoutingSlipConfig::new(expression))
625    }
626
627    pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
628        self.steps.push(BuilderStep::RoutingSlip { config });
629        self
630    }
631
632    pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
633        self.recipient_list_with_config(RecipientListConfig::new(expression))
634    }
635
636    pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
637        self.steps.push(BuilderStep::RecipientList { config });
638        self
639    }
640
641    /// Consume the builder and produce a [`RouteDefinition`].
642    // TODO(BUILDER-006): Validate duplicate route IDs. When a route with the same
643    // ID is already registered in the context, return `Err(CamelError::RouteError)`.
644    // Currently, duplicate IDs are silently accepted; detection should happen at
645    // `CamelContext::add_route_definition` time.
646    pub fn build(self) -> Result<RouteDefinition, CamelError> {
647        validate_uri(&self.from_uri)?;
648        let route_id = self
649            .route_id
650            .filter(|s| !s.trim().is_empty())
651            .ok_or_else(|| {
652                CamelError::RouteError(
653                    "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
654                        .to_string(),
655                )
656            })?;
657        let resolved_error_handler = match self.error_handler_mode {
658            ErrorHandlerMode::None => self.error_handler,
659            ErrorHandlerMode::ExplicitConfig => self.error_handler,
660            ErrorHandlerMode::Mixed => {
661                return Err(CamelError::RouteError(
662                    "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
663                ));
664            }
665            ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
666                let mut config = if let Some(uri) = dlc_uri {
667                    ErrorHandlerConfig::dead_letter_channel(uri)
668                } else {
669                    ErrorHandlerConfig::log_only()
670                };
671
672                for spec in specs {
673                    let matcher = spec.matches.clone();
674                    let mut builder = config.on_exception(move |e| matcher(e));
675
676                    if let Some(retry) = spec.retry {
677                        builder = builder.retry(retry.max_attempts).with_backoff(
678                            retry.initial_delay,
679                            retry.multiplier,
680                            retry.max_delay,
681                        );
682                        if retry.jitter_factor > 0.0 {
683                            builder = builder.with_jitter(retry.jitter_factor);
684                        }
685                    }
686
687                    if let Some(uri) = spec.handled_by {
688                        builder = builder.handled_by(uri);
689                    }
690
691                    config = builder.build();
692                }
693
694                Some(config)
695            }
696        };
697
698        let definition = RouteDefinition::new(self.from_uri, self.steps);
699        let definition = if let Some(eh) = resolved_error_handler {
700            definition.with_error_handler(eh)
701        } else {
702            definition
703        };
704        let definition = if let Some(cb) = self.circuit_breaker_config {
705            definition.with_circuit_breaker(cb)
706        } else {
707            definition
708        };
709        let definition = if let Some(sp) = self.security_policy_config {
710            definition.with_security_policy(sp)
711        } else {
712            definition
713        };
714        let definition = if let Some(auth) = self.security_authenticator {
715            definition.with_security_authenticator(auth)
716        } else {
717            definition
718        };
719        let definition = if let Some(concurrency) = self.concurrency {
720            definition.with_concurrency(concurrency)
721        } else {
722            definition
723        };
724        let definition = definition.with_route_id(route_id);
725        let definition = if let Some(auto) = self.auto_startup {
726            definition.with_auto_startup(auto)
727        } else {
728            definition
729        };
730        let definition = if let Some(order) = self.startup_order {
731            definition.with_startup_order(order)
732        } else {
733            definition
734        };
735        Ok(definition)
736    }
737
738    /// Compile this builder route into canonical v1 spec.
739    pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
740        validate_uri(&self.from_uri)?;
741        let route_id = self
742            .route_id
743            .filter(|s| !s.trim().is_empty())
744            .ok_or_else(|| {
745                CamelError::RouteError(
746                    "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
747                        .to_string(),
748                )
749            })?;
750
751        let steps = canonicalize_steps(self.steps)?;
752        let circuit_breaker = self
753            .circuit_breaker_config
754            .map(canonicalize_circuit_breaker);
755
756        if self.security_policy_config.is_some() {
757            return Err(CamelError::RouteError(
758                "routes with security_policy cannot use the canonical/hot-reload path (not yet supported)"
759                    .into(),
760            ));
761        }
762
763        let spec = CanonicalRouteSpec {
764            route_id,
765            from: self.from_uri,
766            steps,
767            circuit_breaker,
768            version: camel_api::CANONICAL_CONTRACT_VERSION,
769        };
770        spec.validate_contract()?;
771        Ok(spec)
772    }
773}
774
775pub struct OnExceptionBuilder {
776    parent: RouteBuilder,
777    policy: OnExceptionSpec,
778}
779
780impl OnExceptionBuilder {
781    pub fn retry(mut self, max_attempts: u32) -> Self {
782        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
783        self
784    }
785
786    pub fn with_backoff(
787        mut self,
788        initial: std::time::Duration,
789        multiplier: f64,
790        max: std::time::Duration,
791    ) -> Self {
792        if let Some(ref mut retry) = self.policy.retry {
793            retry.initial_delay = initial;
794            retry.multiplier = multiplier;
795            retry.max_delay = max;
796        } else {
797            tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
798        }
799        self
800    }
801
802    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
803        if let Some(ref mut retry) = self.policy.retry {
804            retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
805        } else {
806            tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
807        }
808        self
809    }
810
811    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
812        self.policy.handled_by = Some(uri.into());
813        self
814    }
815
816    pub fn end_on_exception(mut self) -> RouteBuilder {
817        if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
818            specs.push(self.policy);
819        }
820        self.parent
821    }
822}
823
824/// Validate that a URI is non-empty and contains a scheme component.
825fn validate_uri(uri: &str) -> Result<(), CamelError> {
826    let trimmed = uri.trim();
827    if trimmed.is_empty() {
828        return Err(CamelError::RouteError(
829            "route must have a 'from' URI".to_string(),
830        ));
831    }
832    if !trimmed.contains(':') {
833        return Err(CamelError::RouteError(
834            "URI must have a scheme (e.g. 'timer:tick')".to_string(),
835        ));
836    }
837    let scheme = trimmed.split(':').next().unwrap_or("");
838    if scheme.trim().is_empty() {
839        return Err(CamelError::RouteError(
840            "URI scheme must not be empty".to_string(),
841        ));
842    }
843    Ok(())
844}
845
846fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
847    let mut canonical = Vec::with_capacity(steps.len());
848    for step in steps {
849        canonical.push(canonicalize_step(step)?);
850    }
851    Ok(canonical)
852}
853
854fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
855    match step {
856        BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
857        BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
858        BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
859        BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
860        BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
861            delay_ms: config.delay_ms,
862            dynamic_header: config.dynamic_header,
863        }),
864        BuilderStep::DeclarativeScript { expression } => {
865            Ok(CanonicalStepSpec::Script { expression })
866        }
867        BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
868            predicate,
869            steps: canonicalize_steps(steps)?,
870        }),
871        BuilderStep::DeclarativeChoice { whens, otherwise } => {
872            let mut canonical_whens = Vec::with_capacity(whens.len());
873            for DeclarativeWhenStep { predicate, steps } in whens {
874                canonical_whens.push(CanonicalWhenSpec {
875                    predicate,
876                    steps: canonicalize_steps(steps)?,
877                });
878            }
879            let otherwise = match otherwise {
880                Some(steps) => Some(canonicalize_steps(steps)?),
881                None => None,
882            };
883            Ok(CanonicalStepSpec::Choice {
884                whens: canonical_whens,
885                otherwise,
886            })
887        }
888        BuilderStep::DeclarativeSplit {
889            expression,
890            aggregation,
891            parallel,
892            parallel_limit,
893            stop_on_exception,
894            steps,
895        } => Ok(CanonicalStepSpec::Split {
896            expression: CanonicalSplitExpressionSpec::Language(expression),
897            aggregation: canonicalize_split_aggregation(aggregation)?,
898            parallel,
899            parallel_limit,
900            stop_on_exception,
901            steps: canonicalize_steps(steps)?,
902        }),
903        BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate(
904            canonicalize_aggregate(config)?,
905        )),
906        other => {
907            let step_name = canonical_step_name(&other);
908            let detail = camel_api::canonical_contract_rejection_reason(step_name)
909                .unwrap_or("not included in canonical v1");
910            Err(CamelError::RouteError(format!(
911                "canonical v1 does not support step `{step_name}`: {detail}"
912            )))
913        }
914    }
915}
916
917fn canonicalize_split_aggregation(
918    strategy: camel_api::splitter::AggregationStrategy,
919) -> Result<CanonicalSplitAggregationSpec, CamelError> {
920    match strategy {
921        camel_api::splitter::AggregationStrategy::LastWins => {
922            Ok(CanonicalSplitAggregationSpec::LastWins)
923        }
924        camel_api::splitter::AggregationStrategy::CollectAll => {
925            Ok(CanonicalSplitAggregationSpec::CollectAll)
926        }
927        camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
928            "canonical v1 does not support custom split aggregation".to_string(),
929        )),
930        camel_api::splitter::AggregationStrategy::Original => {
931            Ok(CanonicalSplitAggregationSpec::Original)
932        }
933    }
934}
935
936fn extract_completion_fields(
937    mode: &CompletionMode,
938) -> Result<(Option<usize>, Option<u64>), CamelError> {
939    match mode {
940        CompletionMode::Single(cond) => match cond {
941            CompletionCondition::Size(n) => Ok((Some(*n), None)),
942            CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
943            CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
944                "canonical v1 does not support aggregate predicate completion".to_string(),
945            )),
946        },
947        CompletionMode::Any(conds) => {
948            let mut size = None;
949            let mut timeout_ms = None;
950            for cond in conds {
951                match cond {
952                    CompletionCondition::Size(n) => size = Some(*n),
953                    CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
954                    CompletionCondition::Predicate(_) => {
955                        return Err(CamelError::RouteError(
956                            "canonical v1 does not support aggregate predicate completion"
957                                .to_string(),
958                        ));
959                    }
960                }
961            }
962            Ok((size, timeout_ms))
963        }
964    }
965}
966
967fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
968    let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
969
970    let header = match &config.correlation {
971        CorrelationStrategy::HeaderName(h) => h.clone(),
972        CorrelationStrategy::Expression { expr, .. } => expr.clone(),
973        CorrelationStrategy::Fn(_) => {
974            return Err(CamelError::RouteError(
975                "canonical v1 does not support Fn correlation strategy".to_string(),
976            ));
977        }
978    };
979
980    let correlation_key = match &config.correlation {
981        CorrelationStrategy::HeaderName(_) => None,
982        CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
983        CorrelationStrategy::Fn(_) => unreachable!(),
984    };
985
986    let strategy = match config.strategy {
987        AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
988        AggregationStrategy::Custom(_) => {
989            return Err(CamelError::RouteError(
990                "canonical v1 does not support custom aggregate strategy".to_string(),
991            ));
992        }
993    };
994    let bucket_ttl_ms = config
995        .bucket_ttl
996        .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
997
998    Ok(CanonicalAggregateSpec {
999        header,
1000        completion_size,
1001        completion_timeout_ms,
1002        correlation_key,
1003        force_completion_on_stop: if config.force_completion_on_stop {
1004            Some(true)
1005        } else {
1006            None
1007        },
1008        discard_on_timeout: if config.discard_on_timeout {
1009            Some(true)
1010        } else {
1011            None
1012        },
1013        strategy,
1014        max_buckets: config.max_buckets,
1015        bucket_ttl_ms,
1016    })
1017}
1018
1019fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
1020    CanonicalCircuitBreakerSpec {
1021        failure_threshold: config.failure_threshold,
1022        open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
1023    }
1024}
1025
1026fn canonical_step_name(step: &BuilderStep) -> &'static str {
1027    match step {
1028        BuilderStep::Processor(_) => "processor",
1029        BuilderStep::To(_) => "to",
1030        BuilderStep::Stop => "stop",
1031        BuilderStep::Log { .. } => "log",
1032        BuilderStep::DeclarativeSetHeader { .. } => "set_header",
1033        BuilderStep::DeclarativeSetBody { .. } => "set_body",
1034        BuilderStep::DeclarativeFilter { .. } => "filter",
1035        BuilderStep::DeclarativeChoice { .. } => "choice",
1036        BuilderStep::DeclarativeScript { .. } => "script",
1037        BuilderStep::DeclarativeFunction { .. } => "function",
1038        BuilderStep::DeclarativeSplit { .. } => "split",
1039        BuilderStep::Split { .. } => "split",
1040        BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
1041        BuilderStep::Aggregate { .. } => "aggregate",
1042        BuilderStep::Filter { .. } => "filter",
1043        BuilderStep::Choice { .. } => "choice",
1044        BuilderStep::WireTap { .. } => "wire_tap",
1045        BuilderStep::Delay { .. } => "delay",
1046        BuilderStep::Multicast { .. } => "multicast",
1047        BuilderStep::DeclarativeLog { .. } => "log",
1048        BuilderStep::Bean { .. } => "bean",
1049        BuilderStep::Script { .. } => "script",
1050        BuilderStep::Throttle { .. } => "throttle",
1051        BuilderStep::LoadBalance { .. } => "load_balancer",
1052        BuilderStep::DynamicRouter { .. } => "dynamic_router",
1053        BuilderStep::RoutingSlip { .. } => "routing_slip",
1054        BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
1055        BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
1056        BuilderStep::RecipientList { .. } => "recipient_list",
1057        BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
1058        BuilderStep::DeclarativeSetProperty { .. } => "set_property",
1059    }
1060}
1061
1062impl StepAccumulator for RouteBuilder {
1063    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1064        &mut self.steps
1065    }
1066}
1067
1068/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
1069///
1070/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1071/// but NOT `.build()` and NOT `.split()` (no nested splits).
1072///
1073/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
1074/// and returns the parent `RouteBuilder`.
1075pub struct SplitBuilder {
1076    parent: RouteBuilder,
1077    config: SplitterConfig,
1078    steps: Vec<BuilderStep>,
1079}
1080
1081impl SplitBuilder {
1082    /// Open a filter scope within the split sub-pipeline.
1083    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1084    where
1085        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1086    {
1087        FilterInSplitBuilder {
1088            parent: self,
1089            predicate: std::sync::Arc::new(predicate),
1090            steps: vec![],
1091        }
1092    }
1093
1094    /// Close the split scope. Packages the accumulated sub-steps into a
1095    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
1096    pub fn end_split(mut self) -> RouteBuilder {
1097        let split_step = BuilderStep::Split {
1098            config: self.config,
1099            steps: self.steps,
1100        };
1101        self.parent.steps.push(split_step);
1102        self.parent
1103    }
1104}
1105
1106impl StepAccumulator for SplitBuilder {
1107    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1108        &mut self.steps
1109    }
1110}
1111
1112/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
1113pub struct FilterBuilder {
1114    parent: RouteBuilder,
1115    predicate: FilterPredicate,
1116    steps: Vec<BuilderStep>,
1117}
1118
1119impl FilterBuilder {
1120    /// Close the filter scope. Packages the accumulated sub-steps into a
1121    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
1122    pub fn end_filter(mut self) -> RouteBuilder {
1123        let step = BuilderStep::Filter {
1124            predicate: self.predicate,
1125            steps: self.steps,
1126        };
1127        self.parent.steps.push(step);
1128        self.parent
1129    }
1130}
1131
1132impl StepAccumulator for FilterBuilder {
1133    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1134        &mut self.steps
1135    }
1136}
1137
1138/// Builder for a filter scope nested inside a `.split()` block.
1139pub struct FilterInSplitBuilder {
1140    parent: SplitBuilder,
1141    predicate: FilterPredicate,
1142    steps: Vec<BuilderStep>,
1143}
1144
1145impl FilterInSplitBuilder {
1146    /// Close the filter scope and return the parent `SplitBuilder`.
1147    pub fn end_filter(mut self) -> SplitBuilder {
1148        let step = BuilderStep::Filter {
1149            predicate: self.predicate,
1150            steps: self.steps,
1151        };
1152        self.parent.steps.push(step);
1153        self.parent
1154    }
1155}
1156
1157impl StepAccumulator for FilterInSplitBuilder {
1158    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1159        &mut self.steps
1160    }
1161}
1162
1163// ── Choice/When/Otherwise builders ─────────────────────────────────────────
1164
1165/// Builder for a `.choice()` ... `.end_choice()` block.
1166///
1167/// Accumulates `when` clauses and an optional `otherwise` clause.
1168/// Cannot call `.build()` until `.end_choice()` is called.
1169pub struct ChoiceBuilder {
1170    parent: RouteBuilder,
1171    whens: Vec<WhenStep>,
1172    _otherwise: Option<Vec<BuilderStep>>,
1173}
1174
1175impl ChoiceBuilder {
1176    /// Open a `when` clause. Only exchanges matching `predicate` will be
1177    /// processed by the steps inside the `.when()` ... `.end_when()` scope.
1178    pub fn when<F>(self, predicate: F) -> WhenBuilder
1179    where
1180        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1181    {
1182        WhenBuilder {
1183            parent: self,
1184            predicate: std::sync::Arc::new(predicate),
1185            steps: vec![],
1186        }
1187    }
1188
1189    /// Open an `otherwise` clause. Executed when no `when` predicate matched.
1190    ///
1191    /// Only one `otherwise` is allowed per `choice`. Call this after all `.when()` clauses.
1192    pub fn otherwise(self) -> OtherwiseBuilder {
1193        OtherwiseBuilder {
1194            parent: self,
1195            steps: vec![],
1196        }
1197    }
1198
1199    /// Close the choice scope. Packages all accumulated `when` clauses and
1200    /// optional `otherwise` into a `BuilderStep::Choice` and returns the
1201    /// parent `RouteBuilder`.
1202    pub fn end_choice(mut self) -> RouteBuilder {
1203        let step = BuilderStep::Choice {
1204            whens: self.whens,
1205            otherwise: self._otherwise,
1206        };
1207        self.parent.steps.push(step);
1208        self.parent
1209    }
1210}
1211
1212/// Builder for the sub-pipeline within a `.when()` ... `.end_when()` block.
1213pub struct WhenBuilder {
1214    parent: ChoiceBuilder,
1215    predicate: camel_api::FilterPredicate,
1216    steps: Vec<BuilderStep>,
1217}
1218
1219impl WhenBuilder {
1220    /// Close the when scope. Packages the accumulated sub-steps into a
1221    /// `WhenStep` and returns the parent `ChoiceBuilder`.
1222    pub fn end_when(mut self) -> ChoiceBuilder {
1223        self.parent.whens.push(WhenStep {
1224            predicate: self.predicate,
1225            steps: self.steps,
1226        });
1227        self.parent
1228    }
1229}
1230
1231impl StepAccumulator for WhenBuilder {
1232    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1233        &mut self.steps
1234    }
1235}
1236
1237/// Builder for the sub-pipeline within an `.otherwise()` ... `.end_otherwise()` block.
1238pub struct OtherwiseBuilder {
1239    parent: ChoiceBuilder,
1240    steps: Vec<BuilderStep>,
1241}
1242
1243impl OtherwiseBuilder {
1244    /// Close the otherwise scope and return the parent `ChoiceBuilder`.
1245    pub fn end_otherwise(self) -> ChoiceBuilder {
1246        let OtherwiseBuilder { mut parent, steps } = self;
1247        parent._otherwise = Some(steps);
1248        parent
1249    }
1250}
1251
1252impl StepAccumulator for OtherwiseBuilder {
1253    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1254        &mut self.steps
1255    }
1256}
1257
1258/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
1259///
1260/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1261/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
1262///
1263/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
1264/// and returns the parent `RouteBuilder`.
1265pub struct MulticastBuilder {
1266    parent: RouteBuilder,
1267    steps: Vec<BuilderStep>,
1268    config: MulticastConfig,
1269}
1270
1271impl MulticastBuilder {
1272    pub fn parallel(mut self, parallel: bool) -> Self {
1273        self.config = self.config.parallel(parallel);
1274        self
1275    }
1276
1277    pub fn parallel_limit(mut self, limit: usize) -> Self {
1278        self.config = self.config.parallel_limit(limit);
1279        self
1280    }
1281
1282    pub fn stop_on_exception(mut self, stop: bool) -> Self {
1283        self.config = self.config.stop_on_exception(stop);
1284        self
1285    }
1286
1287    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1288        self.config = self.config.timeout(duration);
1289        self
1290    }
1291
1292    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1293        self.config = self.config.aggregation(strategy);
1294        self
1295    }
1296
1297    pub fn end_multicast(mut self) -> RouteBuilder {
1298        let step = BuilderStep::Multicast {
1299            steps: self.steps,
1300            config: self.config,
1301        };
1302        self.parent.steps.push(step);
1303        self.parent
1304    }
1305}
1306
1307impl StepAccumulator for MulticastBuilder {
1308    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1309        &mut self.steps
1310    }
1311}
1312
1313/// Builder for the sub-pipeline within a `.throttle()` ... `.end_throttle()` block.
1314///
1315/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1316/// but NOT `.build()` and NOT `.throttle()` (no nested throttles).
1317///
1318/// Calling `.end_throttle()` packages the sub-steps into a `BuilderStep::Throttle`
1319/// and returns the parent `RouteBuilder`.
1320pub struct ThrottleBuilder {
1321    parent: RouteBuilder,
1322    config: ThrottlerConfig,
1323    steps: Vec<BuilderStep>,
1324}
1325
1326impl ThrottleBuilder {
1327    /// Set the throttle strategy. Default is `Delay`.
1328    ///
1329    /// - `Delay`: Queue messages until capacity available
1330    /// - `Reject`: Return error immediately when throttled
1331    /// - `Drop`: Silently discard excess messages
1332    pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1333        self.config = self.config.strategy(strategy);
1334        self
1335    }
1336
1337    /// Close the throttle scope. Packages the accumulated sub-steps into a
1338    /// `BuilderStep::Throttle` and returns the parent `RouteBuilder`.
1339    pub fn end_throttle(mut self) -> RouteBuilder {
1340        let step = BuilderStep::Throttle {
1341            config: self.config,
1342            steps: self.steps,
1343        };
1344        self.parent.steps.push(step);
1345        self.parent
1346    }
1347}
1348
1349impl StepAccumulator for ThrottleBuilder {
1350    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1351        &mut self.steps
1352    }
1353}
1354
1355/// Builder for the sub-pipeline within a `.loop_count()` / `.loop_while()` ... `.end_loop()` block.
1356pub struct LoopBuilder {
1357    parent: RouteBuilder,
1358    config: LoopConfig,
1359    steps: Vec<BuilderStep>,
1360}
1361
1362impl LoopBuilder {
1363    pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1364        LoopInLoopBuilder {
1365            parent: self,
1366            config: LoopConfig {
1367                mode: LoopMode::Count(count),
1368            },
1369            steps: vec![],
1370        }
1371    }
1372
1373    pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1374    where
1375        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1376    {
1377        LoopInLoopBuilder {
1378            parent: self,
1379            config: LoopConfig {
1380                mode: LoopMode::While(std::sync::Arc::new(predicate)),
1381            },
1382            steps: vec![],
1383        }
1384    }
1385
1386    pub fn end_loop(mut self) -> RouteBuilder {
1387        let step = BuilderStep::Loop {
1388            config: self.config,
1389            steps: self.steps,
1390        };
1391        self.parent.steps.push(step);
1392        self.parent
1393    }
1394}
1395
1396impl StepAccumulator for LoopBuilder {
1397    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1398        &mut self.steps
1399    }
1400}
1401
1402pub struct LoopInLoopBuilder {
1403    parent: LoopBuilder,
1404    config: LoopConfig,
1405    steps: Vec<BuilderStep>,
1406}
1407
1408impl LoopInLoopBuilder {
1409    pub fn end_loop(mut self) -> LoopBuilder {
1410        let step = BuilderStep::Loop {
1411            config: self.config,
1412            steps: self.steps,
1413        };
1414        self.parent.steps.push(step);
1415        self.parent
1416    }
1417}
1418
1419impl StepAccumulator for LoopInLoopBuilder {
1420    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1421        &mut self.steps
1422    }
1423}
1424
1425/// Builder for the sub-pipeline within a `.load_balance()` ... `.end_load_balance()` block.
1426///
1427/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1428/// but NOT `.build()` and NOT `.load_balance()` (no nested load balancers).
1429///
1430/// Calling `.end_load_balance()` packages the sub-steps into a `BuilderStep::LoadBalance`
1431/// and returns the parent `RouteBuilder`.
1432pub struct LoadBalancerBuilder {
1433    parent: RouteBuilder,
1434    config: LoadBalancerConfig,
1435    steps: Vec<BuilderStep>,
1436}
1437
1438impl LoadBalancerBuilder {
1439    /// Set the load balance strategy to round-robin (default).
1440    pub fn round_robin(mut self) -> Self {
1441        self.config = LoadBalancerConfig::round_robin();
1442        self
1443    }
1444
1445    /// Set the load balance strategy to random selection.
1446    pub fn random(mut self) -> Self {
1447        self.config = LoadBalancerConfig::random();
1448        self
1449    }
1450
1451    /// Set the load balance strategy to weighted selection.
1452    ///
1453    /// Each endpoint is assigned a weight that determines its probability
1454    /// of being selected.
1455    pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1456        self.config = LoadBalancerConfig::weighted(weights);
1457        self
1458    }
1459
1460    /// Set the load balance strategy to failover.
1461    ///
1462    /// Exchanges are sent to the first endpoint; on failure, the next endpoint
1463    /// is tried.
1464    pub fn failover(mut self) -> Self {
1465        self.config = LoadBalancerConfig::failover();
1466        self
1467    }
1468
1469    /// Enable or disable parallel execution of endpoints.
1470    ///
1471    /// When enabled, all endpoints receive the exchange simultaneously.
1472    /// When disabled (default), only one endpoint is selected per exchange.
1473    pub fn parallel(mut self, parallel: bool) -> Self {
1474        self.config = self.config.parallel(parallel);
1475        self
1476    }
1477
1478    /// Close the load balance scope. Packages the accumulated sub-steps into a
1479    /// `BuilderStep::LoadBalance` and returns the parent `RouteBuilder`.
1480    pub fn end_load_balance(mut self) -> RouteBuilder {
1481        let step = BuilderStep::LoadBalance {
1482            config: self.config,
1483            steps: self.steps,
1484        };
1485        self.parent.steps.push(step);
1486        self.parent
1487    }
1488}
1489
1490impl StepAccumulator for LoadBalancerBuilder {
1491    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1492        &mut self.steps
1493    }
1494}
1495
1496// ---------------------------------------------------------------------------
1497// Tests
1498// ---------------------------------------------------------------------------
1499
1500#[cfg(test)]
1501mod tests {
1502    use super::*;
1503    use camel_api::error_handler::ErrorHandlerConfig;
1504    use camel_api::load_balancer::LoadBalanceStrategy;
1505    use camel_api::{Exchange, Message};
1506    use camel_core::route::BuilderStep;
1507    use std::sync::Arc;
1508    use std::time::Duration;
1509    use tower::{Service, ServiceExt};
1510
1511    #[test]
1512    fn test_builder_from_creates_definition() {
1513        let definition = RouteBuilder::from("timer:tick")
1514            .route_id("test-route")
1515            .build()
1516            .unwrap();
1517        assert_eq!(definition.from_uri(), "timer:tick");
1518    }
1519
1520    #[test]
1521    fn test_builder_empty_from_uri_errors() {
1522        let result = RouteBuilder::from("").route_id("test-route").build();
1523        assert!(result.is_err());
1524    }
1525
1526    #[test]
1527    fn test_build_rejects_schemeless_uri() {
1528        let result = RouteBuilder::from("no-scheme-here")
1529            .route_id("test-route")
1530            .build();
1531        match result {
1532            Err(err) => {
1533                let err_msg = format!("{err}");
1534                assert!(
1535                    err_msg.contains("scheme"),
1536                    "expected scheme-related error, got: {err_msg}"
1537                );
1538            }
1539            Ok(_) => panic!("schemeless URI should fail"),
1540        }
1541    }
1542
1543    #[test]
1544    fn test_build_rejects_empty_scheme_uri() {
1545        let result = RouteBuilder::from(":missing-scheme")
1546            .route_id("test-route")
1547            .build();
1548        match result {
1549            Err(err) => {
1550                let err_msg = format!("{err}");
1551                assert!(
1552                    err_msg.contains("scheme"),
1553                    "expected scheme-related error, got: {err_msg}"
1554                );
1555            }
1556            Ok(_) => panic!("empty-scheme URI should fail"),
1557        }
1558    }
1559
1560    #[test]
1561    fn test_build_accepts_valid_uri() {
1562        let result = RouteBuilder::from("timer:tick")
1563            .route_id("test-route")
1564            .build();
1565        assert!(result.is_ok());
1566    }
1567
1568    #[test]
1569    fn test_build_canonical_rejects_schemeless_uri() {
1570        let result = RouteBuilder::from("no-scheme-here")
1571            .route_id("test-route")
1572            .build_canonical();
1573        assert!(result.is_err());
1574    }
1575
1576    #[test]
1577    fn test_builder_to_adds_step() {
1578        let definition = RouteBuilder::from("timer:tick")
1579            .route_id("test-route")
1580            .to("log:info")
1581            .build()
1582            .unwrap();
1583
1584        assert_eq!(definition.from_uri(), "timer:tick");
1585        // We can verify steps were added by checking the structure
1586        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1587    }
1588
1589    #[test]
1590    fn test_builder_filter_adds_filter_step() {
1591        let definition = RouteBuilder::from("timer:tick")
1592            .route_id("test-route")
1593            .filter(|_ex| true)
1594            .to("mock:result")
1595            .end_filter()
1596            .build()
1597            .unwrap();
1598
1599        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1600    }
1601
1602    #[test]
1603    fn test_builder_set_header_adds_processor_step() {
1604        let definition = RouteBuilder::from("timer:tick")
1605            .route_id("test-route")
1606            .set_header("key", Value::String("value".into()))
1607            .build()
1608            .unwrap();
1609
1610        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1611    }
1612
1613    #[test]
1614    fn test_builder_map_body_adds_processor_step() {
1615        let definition = RouteBuilder::from("timer:tick")
1616            .route_id("test-route")
1617            .map_body(|body| body)
1618            .build()
1619            .unwrap();
1620
1621        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1622    }
1623
1624    #[test]
1625    fn test_builder_process_adds_processor_step() {
1626        let definition = RouteBuilder::from("timer:tick")
1627            .route_id("test-route")
1628            .process(|ex| async move { Ok(ex) })
1629            .build()
1630            .unwrap();
1631
1632        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1633    }
1634
1635    #[test]
1636    fn test_builder_chain_multiple_steps() {
1637        let definition = RouteBuilder::from("timer:tick")
1638            .route_id("test-route")
1639            .set_header("source", Value::String("timer".into()))
1640            .filter(|ex| ex.input.header("source").is_some())
1641            .to("log:info")
1642            .end_filter()
1643            .to("mock:result")
1644            .build()
1645            .unwrap();
1646
1647        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
1648        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
1649        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
1650        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1651    }
1652
1653    #[test]
1654    fn test_loop_count_builder() {
1655        use camel_api::loop_eip::LoopMode;
1656
1657        let def = RouteBuilder::from("direct:start")
1658            .route_id("loop-test")
1659            .loop_count(3)
1660            .to("mock:inside")
1661            .end_loop()
1662            .to("mock:after")
1663            .build()
1664            .unwrap();
1665
1666        assert_eq!(def.steps().len(), 2);
1667        match &def.steps()[0] {
1668            BuilderStep::Loop { config, steps } => {
1669                assert!(matches!(config.mode, LoopMode::Count(3)));
1670                assert_eq!(steps.len(), 1);
1671            }
1672            other => panic!("Expected Loop, got {:?}", other),
1673        }
1674        assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1675    }
1676
1677    #[test]
1678    fn test_loop_while_builder() {
1679        use camel_api::loop_eip::LoopMode;
1680
1681        let def = RouteBuilder::from("direct:start")
1682            .route_id("loop-while-test")
1683            .loop_while(|_ex| true)
1684            .to("mock:retry")
1685            .end_loop()
1686            .build()
1687            .unwrap();
1688
1689        assert_eq!(def.steps().len(), 1);
1690        match &def.steps()[0] {
1691            BuilderStep::Loop { config, steps } => {
1692                assert!(matches!(config.mode, LoopMode::While(_)));
1693                assert_eq!(steps.len(), 1);
1694            }
1695            other => panic!("Expected Loop, got {:?}", other),
1696        }
1697    }
1698
1699    #[test]
1700    fn test_nested_loop_builder() {
1701        use camel_api::loop_eip::LoopMode;
1702
1703        let def = RouteBuilder::from("direct:start")
1704            .route_id("nested-loop-test")
1705            .loop_count(2)
1706            .to("mock:outer")
1707            .loop_count(3)
1708            .to("mock:inner")
1709            .end_loop()
1710            .end_loop()
1711            .to("mock:after")
1712            .build()
1713            .unwrap();
1714
1715        assert_eq!(def.steps().len(), 2);
1716        match &def.steps()[0] {
1717            BuilderStep::Loop { steps, .. } => {
1718                assert_eq!(steps.len(), 2);
1719                match &steps[1] {
1720                    BuilderStep::Loop {
1721                        config,
1722                        steps: inner_steps,
1723                    } => {
1724                        assert!(matches!(config.mode, LoopMode::Count(3)));
1725                        assert_eq!(inner_steps.len(), 1);
1726                    }
1727                    other => panic!("Expected nested Loop, got {:?}", other),
1728                }
1729            }
1730            other => panic!("Expected outer Loop, got {:?}", other),
1731        }
1732    }
1733
1734    // -----------------------------------------------------------------------
1735    // Processor behavior tests — exercise the real Tower services directly
1736    // -----------------------------------------------------------------------
1737
1738    #[tokio::test]
1739    async fn test_set_header_processor_works() {
1740        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1741        let exchange = Exchange::new(Message::new("test"));
1742        let result = svc.call(exchange).await.unwrap();
1743        assert_eq!(
1744            result.input.header("greeting"),
1745            Some(&Value::String("hello".into()))
1746        );
1747    }
1748
1749    #[tokio::test]
1750    async fn test_filter_processor_passes() {
1751        use camel_api::BoxProcessorExt;
1752        use camel_processor::FilterService;
1753
1754        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1755        let mut svc =
1756            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1757        let exchange = Exchange::new(Message::new("pass"));
1758        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1759        assert_eq!(result.input.body.as_text(), Some("pass"));
1760    }
1761
1762    #[tokio::test]
1763    async fn test_filter_processor_blocks() {
1764        use camel_api::BoxProcessorExt;
1765        use camel_processor::FilterService;
1766
1767        let sub = BoxProcessor::from_fn(|_ex| {
1768            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1769        });
1770        let mut svc =
1771            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1772        let exchange = Exchange::new(Message::new("reject"));
1773        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1774        assert_eq!(result.input.body.as_text(), Some("reject"));
1775    }
1776
1777    #[tokio::test]
1778    async fn test_map_body_processor_works() {
1779        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1780            if let Some(text) = body.as_text() {
1781                Body::Text(text.to_uppercase())
1782            } else {
1783                body
1784            }
1785        });
1786        let exchange = Exchange::new(Message::new("hello"));
1787        let result = mapper.oneshot(exchange).await.unwrap();
1788        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1789    }
1790
1791    #[tokio::test]
1792    async fn test_process_custom_processor_works() {
1793        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1794            ex.set_property("custom", Value::Bool(true));
1795            Ok(ex)
1796        });
1797        let exchange = Exchange::new(Message::default());
1798        let result = processor.oneshot(exchange).await.unwrap();
1799        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1800    }
1801
1802    // -----------------------------------------------------------------------
1803    // Sequential pipeline test
1804    // -----------------------------------------------------------------------
1805
1806    #[tokio::test]
1807    async fn test_compose_pipeline_runs_steps_in_order() {
1808        use camel_core::route::compose_pipeline;
1809
1810        let processors = vec![
1811            BoxProcessor::new(SetHeader::new(
1812                IdentityProcessor,
1813                "step",
1814                Value::String("one".into()),
1815            )),
1816            BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1817                if let Some(text) = body.as_text() {
1818                    Body::Text(format!("{}-processed", text))
1819                } else {
1820                    body
1821                }
1822            })),
1823        ];
1824
1825        let pipeline = compose_pipeline(processors);
1826        let exchange = Exchange::new(Message::new("hello"));
1827        let result = pipeline.oneshot(exchange).await.unwrap();
1828
1829        assert_eq!(
1830            result.input.header("step"),
1831            Some(&Value::String("one".into()))
1832        );
1833        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1834    }
1835
1836    #[tokio::test]
1837    async fn test_compose_pipeline_empty_is_identity() {
1838        use camel_core::route::compose_pipeline;
1839
1840        let pipeline = compose_pipeline(vec![]);
1841        let exchange = Exchange::new(Message::new("unchanged"));
1842        let result = pipeline.oneshot(exchange).await.unwrap();
1843        assert_eq!(result.input.body.as_text(), Some("unchanged"));
1844    }
1845
1846    // -----------------------------------------------------------------------
1847    // Circuit breaker builder tests
1848    // -----------------------------------------------------------------------
1849
1850    #[test]
1851    fn test_builder_circuit_breaker_sets_config() {
1852        use camel_api::circuit_breaker::CircuitBreakerConfig;
1853
1854        let config = CircuitBreakerConfig::new().failure_threshold(5);
1855        let definition = RouteBuilder::from("timer:tick")
1856            .route_id("test-route")
1857            .circuit_breaker(config)
1858            .build()
1859            .unwrap();
1860
1861        let cb = definition
1862            .circuit_breaker_config()
1863            .expect("circuit breaker should be set");
1864        assert_eq!(cb.failure_threshold, 5);
1865    }
1866
1867    #[test]
1868    fn test_builder_circuit_breaker_with_error_handler() {
1869        use camel_api::circuit_breaker::CircuitBreakerConfig;
1870        use camel_api::error_handler::ErrorHandlerConfig;
1871
1872        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1873        let eh_config = ErrorHandlerConfig::log_only();
1874
1875        let definition = RouteBuilder::from("timer:tick")
1876            .route_id("test-route")
1877            .to("log:info")
1878            .circuit_breaker(cb_config)
1879            .error_handler(eh_config)
1880            .build()
1881            .unwrap();
1882
1883        assert!(
1884            definition.circuit_breaker_config().is_some(),
1885            "circuit breaker config should be set"
1886        );
1887        // Route definition was built successfully with both configs.
1888    }
1889
1890    #[test]
1891    fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1892        let definition = RouteBuilder::from("direct:start")
1893            .route_id("test-route")
1894            .dead_letter_channel("log:dlc")
1895            .on_exception(|e| matches!(e, CamelError::Io(_)))
1896            .retry(3)
1897            .handled_by("log:io")
1898            .end_on_exception()
1899            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1900            .retry(1)
1901            .end_on_exception()
1902            .to("mock:out")
1903            .build()
1904            .expect("route should build");
1905
1906        let cfg = definition
1907            .error_handler_config()
1908            .expect("error handler should be set");
1909        assert_eq!(cfg.policies.len(), 2);
1910        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1911        assert_eq!(
1912            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1913            Some(3)
1914        );
1915        assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1916        assert_eq!(
1917            cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1918            Some(1)
1919        );
1920    }
1921
1922    #[test]
1923    fn test_builder_on_exception_mixed_mode_rejected() {
1924        let result = RouteBuilder::from("direct:start")
1925            .route_id("test-route")
1926            .error_handler(ErrorHandlerConfig::log_only())
1927            .on_exception(|_e| true)
1928            .end_on_exception()
1929            .to("mock:out")
1930            .build();
1931
1932        let err = result.err().expect("mixed mode should fail with an error");
1933
1934        assert!(
1935            format!("{err}").contains("mixed error handler modes"),
1936            "unexpected error: {err}"
1937        );
1938    }
1939
1940    #[test]
1941    fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1942        let definition = RouteBuilder::from("direct:start")
1943            .route_id("test-route")
1944            .on_exception(|_e| true)
1945            .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1946            .with_jitter(0.5)
1947            .end_on_exception()
1948            .to("mock:out")
1949            .build()
1950            .expect("route should build");
1951
1952        let cfg = definition
1953            .error_handler_config()
1954            .expect("error handler should be set");
1955        assert_eq!(cfg.policies.len(), 1);
1956        assert!(cfg.policies[0].retry.is_none());
1957    }
1958
1959    #[test]
1960    fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1961        let definition = RouteBuilder::from("direct:start")
1962            .route_id("test-route")
1963            .dead_letter_channel("log:dlc")
1964            .to("mock:out")
1965            .build()
1966            .expect("route should build");
1967
1968        let cfg = definition
1969            .error_handler_config()
1970            .expect("error handler should be set");
1971        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1972        assert!(cfg.policies.is_empty());
1973    }
1974
1975    #[test]
1976    fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1977        let definition = RouteBuilder::from("direct:start")
1978            .route_id("test-route")
1979            .dead_letter_channel("log:first")
1980            .on_exception(|e| matches!(e, CamelError::Io(_)))
1981            .retry(2)
1982            .end_on_exception()
1983            .dead_letter_channel("log:second")
1984            .to("mock:out")
1985            .build()
1986            .expect("route should build");
1987
1988        let cfg = definition
1989            .error_handler_config()
1990            .expect("error handler should be set");
1991        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1992        assert_eq!(cfg.policies.len(), 1);
1993        assert_eq!(
1994            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1995            Some(2)
1996        );
1997    }
1998
1999    #[test]
2000    fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
2001        let definition = RouteBuilder::from("direct:start")
2002            .route_id("test-route")
2003            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
2004            .retry(1)
2005            .end_on_exception()
2006            .to("mock:out")
2007            .build()
2008            .expect("route should build");
2009
2010        let cfg = definition
2011            .error_handler_config()
2012            .expect("error handler should be set");
2013        assert!(cfg.dlc_uri.is_none());
2014        assert_eq!(cfg.policies.len(), 1);
2015    }
2016
2017    #[test]
2018    fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
2019        let first = ErrorHandlerConfig::dead_letter_channel("log:first");
2020        let second = ErrorHandlerConfig::dead_letter_channel("log:second");
2021
2022        let definition = RouteBuilder::from("direct:start")
2023            .route_id("test-route")
2024            .error_handler(first)
2025            .error_handler(second)
2026            .to("mock:out")
2027            .build()
2028            .expect("route should build");
2029
2030        let cfg = definition
2031            .error_handler_config()
2032            .expect("error handler should be set");
2033        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2034    }
2035
2036    // --- Splitter builder tests ---
2037
2038    #[test]
2039    fn test_split_builder_typestate() {
2040        use camel_api::splitter::{SplitterConfig, split_body_lines};
2041
2042        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
2043        let definition = RouteBuilder::from("timer:test?period=1000")
2044            .route_id("test-route")
2045            .split(SplitterConfig::new(split_body_lines()))
2046            .to("mock:per-fragment")
2047            .end_split()
2048            .to("mock:final")
2049            .build()
2050            .unwrap();
2051
2052        // Should have 2 top-level steps: Split + To("mock:final")
2053        assert_eq!(definition.steps().len(), 2);
2054    }
2055
2056    #[test]
2057    fn test_split_builder_steps_collected() {
2058        use camel_api::splitter::{SplitterConfig, split_body_lines};
2059
2060        let definition = RouteBuilder::from("timer:test?period=1000")
2061            .route_id("test-route")
2062            .split(SplitterConfig::new(split_body_lines()))
2063            .set_header("fragment", Value::String("yes".into()))
2064            .to("mock:per-fragment")
2065            .end_split()
2066            .build()
2067            .unwrap();
2068
2069        // Should have 1 top-level step: Split (containing 2 sub-steps)
2070        assert_eq!(definition.steps().len(), 1);
2071        match &definition.steps()[0] {
2072            BuilderStep::Split { steps, .. } => {
2073                assert_eq!(steps.len(), 2); // SetHeader + To
2074            }
2075            other => panic!("Expected Split, got {:?}", other),
2076        }
2077    }
2078
2079    #[test]
2080    fn test_split_builder_config_propagated() {
2081        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
2082
2083        let definition = RouteBuilder::from("timer:test?period=1000")
2084            .route_id("test-route")
2085            .split(
2086                SplitterConfig::new(split_body_lines())
2087                    .parallel(true)
2088                    .parallel_limit(4)
2089                    .aggregation(AggregationStrategy::CollectAll),
2090            )
2091            .to("mock:per-fragment")
2092            .end_split()
2093            .build()
2094            .unwrap();
2095
2096        match &definition.steps()[0] {
2097            BuilderStep::Split { config, .. } => {
2098                assert!(config.parallel);
2099                assert_eq!(config.parallel_limit, Some(4));
2100                assert!(matches!(
2101                    config.aggregation,
2102                    AggregationStrategy::CollectAll
2103                ));
2104            }
2105            other => panic!("Expected Split, got {:?}", other),
2106        }
2107    }
2108
2109    #[test]
2110    fn test_aggregate_builder_adds_step() {
2111        use camel_api::aggregator::AggregatorConfig;
2112        use camel_core::route::BuilderStep;
2113
2114        let definition = RouteBuilder::from("timer:tick")
2115            .route_id("test-route")
2116            .aggregate(
2117                AggregatorConfig::correlate_by("key")
2118                    .complete_when_size(2)
2119                    .build()
2120                    .unwrap(),
2121            )
2122            .build()
2123            .unwrap();
2124
2125        assert_eq!(definition.steps().len(), 1);
2126        assert!(matches!(
2127            definition.steps()[0],
2128            BuilderStep::Aggregate { .. }
2129        ));
2130    }
2131
2132    #[test]
2133    fn test_aggregate_in_split_builder() {
2134        use camel_api::aggregator::AggregatorConfig;
2135        use camel_api::splitter::{SplitterConfig, split_body_lines};
2136        use camel_core::route::BuilderStep;
2137
2138        let definition = RouteBuilder::from("timer:tick")
2139            .route_id("test-route")
2140            .split(SplitterConfig::new(split_body_lines()))
2141            .aggregate(
2142                AggregatorConfig::correlate_by("key")
2143                    .complete_when_size(1)
2144                    .build()
2145                    .unwrap(),
2146            )
2147            .end_split()
2148            .build()
2149            .unwrap();
2150
2151        assert_eq!(definition.steps().len(), 1);
2152        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2153            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2154        } else {
2155            panic!("expected Split step");
2156        }
2157    }
2158
2159    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
2160
2161    #[test]
2162    fn test_builder_set_body_static_adds_processor() {
2163        let definition = RouteBuilder::from("timer:tick")
2164            .route_id("test-route")
2165            .set_body("fixed")
2166            .build()
2167            .unwrap();
2168        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2169    }
2170
2171    #[test]
2172    fn test_builder_set_body_fn_adds_processor() {
2173        let definition = RouteBuilder::from("timer:tick")
2174            .route_id("test-route")
2175            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2176            .build()
2177            .unwrap();
2178        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2179    }
2180
2181    #[test]
2182    fn transform_alias_produces_same_as_set_body() {
2183        let route_transform = RouteBuilder::from("timer:tick")
2184            .route_id("test-route")
2185            .transform("hello")
2186            .build()
2187            .unwrap();
2188
2189        let route_set_body = RouteBuilder::from("timer:tick")
2190            .route_id("test-route")
2191            .set_body("hello")
2192            .build()
2193            .unwrap();
2194
2195        assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2196    }
2197
2198    #[test]
2199    fn test_builder_set_header_fn_adds_processor() {
2200        let definition = RouteBuilder::from("timer:tick")
2201            .route_id("test-route")
2202            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2203            .build()
2204            .unwrap();
2205        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2206    }
2207
2208    #[tokio::test]
2209    async fn test_set_body_static_processor_works() {
2210        use camel_core::route::compose_pipeline;
2211        let def = RouteBuilder::from("t:t")
2212            .route_id("test-route")
2213            .set_body("replaced")
2214            .build()
2215            .unwrap();
2216        let pipeline = compose_pipeline(
2217            def.steps()
2218                .iter()
2219                .filter_map(|s| {
2220                    if let BuilderStep::Processor(p) = s {
2221                        Some(p.clone())
2222                    } else {
2223                        None
2224                    }
2225                })
2226                .collect(),
2227        );
2228        let exchange = Exchange::new(Message::new("original"));
2229        let result = pipeline.oneshot(exchange).await.unwrap();
2230        assert_eq!(result.input.body.as_text(), Some("replaced"));
2231    }
2232
2233    #[tokio::test]
2234    async fn test_set_body_fn_processor_works() {
2235        use camel_core::route::compose_pipeline;
2236        let def = RouteBuilder::from("t:t")
2237            .route_id("test-route")
2238            .set_body_fn(|ex: &Exchange| {
2239                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2240            })
2241            .build()
2242            .unwrap();
2243        let pipeline = compose_pipeline(
2244            def.steps()
2245                .iter()
2246                .filter_map(|s| {
2247                    if let BuilderStep::Processor(p) = s {
2248                        Some(p.clone())
2249                    } else {
2250                        None
2251                    }
2252                })
2253                .collect(),
2254        );
2255        let exchange = Exchange::new(Message::new("hello"));
2256        let result = pipeline.oneshot(exchange).await.unwrap();
2257        assert_eq!(result.input.body.as_text(), Some("HELLO"));
2258    }
2259
2260    #[tokio::test]
2261    async fn test_set_header_fn_processor_works() {
2262        use camel_core::route::compose_pipeline;
2263        let def = RouteBuilder::from("t:t")
2264            .route_id("test-route")
2265            .set_header_fn("echo", |ex: &Exchange| {
2266                ex.input
2267                    .body
2268                    .as_text()
2269                    .map(|t| Value::String(t.into()))
2270                    .unwrap_or(Value::Null)
2271            })
2272            .build()
2273            .unwrap();
2274        let pipeline = compose_pipeline(
2275            def.steps()
2276                .iter()
2277                .filter_map(|s| {
2278                    if let BuilderStep::Processor(p) = s {
2279                        Some(p.clone())
2280                    } else {
2281                        None
2282                    }
2283                })
2284                .collect(),
2285        );
2286        let exchange = Exchange::new(Message::new("ping"));
2287        let result = pipeline.oneshot(exchange).await.unwrap();
2288        assert_eq!(
2289            result.input.header("echo"),
2290            Some(&Value::String("ping".into()))
2291        );
2292    }
2293
2294    // ── FilterBuilder typestate tests ─────────────────────────────────────
2295
2296    #[test]
2297    fn test_filter_builder_typestate() {
2298        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2299            .route_id("test-route")
2300            .filter(|_ex| true)
2301            .to("mock:inner")
2302            .end_filter()
2303            .to("mock:outer")
2304            .build();
2305        assert!(result.is_ok());
2306    }
2307
2308    #[test]
2309    fn test_filter_builder_steps_collected() {
2310        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2311            .route_id("test-route")
2312            .filter(|_ex| true)
2313            .to("mock:inner")
2314            .end_filter()
2315            .build()
2316            .unwrap();
2317
2318        assert_eq!(definition.steps().len(), 1);
2319        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2320    }
2321
2322    #[test]
2323    fn test_wire_tap_builder_adds_step() {
2324        let definition = RouteBuilder::from("timer:tick")
2325            .route_id("test-route")
2326            .wire_tap("mock:tap")
2327            .to("mock:result")
2328            .build()
2329            .unwrap();
2330
2331        assert_eq!(definition.steps().len(), 2);
2332        assert!(
2333            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2334        );
2335        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2336    }
2337
2338    // ── MulticastBuilder typestate tests ─────────────────────────────────────
2339
2340    #[test]
2341    fn test_multicast_builder_typestate() {
2342        let definition = RouteBuilder::from("timer:tick")
2343            .route_id("test-route")
2344            .multicast()
2345            .to("direct:a")
2346            .to("direct:b")
2347            .end_multicast()
2348            .to("mock:result")
2349            .build()
2350            .unwrap();
2351
2352        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
2353    }
2354
2355    #[test]
2356    fn test_multicast_builder_steps_collected() {
2357        let definition = RouteBuilder::from("timer:tick")
2358            .route_id("test-route")
2359            .multicast()
2360            .to("direct:a")
2361            .to("direct:b")
2362            .end_multicast()
2363            .build()
2364            .unwrap();
2365
2366        match &definition.steps()[0] {
2367            BuilderStep::Multicast { steps, .. } => {
2368                assert_eq!(steps.len(), 2);
2369            }
2370            other => panic!("Expected Multicast, got {:?}", other),
2371        }
2372    }
2373
2374    // ── Concurrency builder tests ─────────────────────────────────────
2375
2376    #[test]
2377    fn test_builder_concurrent_sets_concurrency() {
2378        use camel_component_api::ConcurrencyModel;
2379
2380        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2381            .route_id("test-route")
2382            .concurrent(16)
2383            .to("log:info")
2384            .build()
2385            .unwrap();
2386
2387        assert_eq!(
2388            definition.concurrency_override(),
2389            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2390        );
2391    }
2392
2393    #[test]
2394    fn test_builder_concurrent_zero_means_unbounded() {
2395        use camel_component_api::ConcurrencyModel;
2396
2397        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2398            .route_id("test-route")
2399            .concurrent(0)
2400            .to("log:info")
2401            .build()
2402            .unwrap();
2403
2404        assert_eq!(
2405            definition.concurrency_override(),
2406            Some(&ConcurrencyModel::Concurrent { max: None })
2407        );
2408    }
2409
2410    #[test]
2411    fn test_builder_sequential_sets_concurrency() {
2412        use camel_component_api::ConcurrencyModel;
2413
2414        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2415            .route_id("test-route")
2416            .sequential()
2417            .to("log:info")
2418            .build()
2419            .unwrap();
2420
2421        assert_eq!(
2422            definition.concurrency_override(),
2423            Some(&ConcurrencyModel::Sequential)
2424        );
2425    }
2426
2427    #[test]
2428    fn test_builder_default_concurrency_is_none() {
2429        let definition = RouteBuilder::from("timer:tick")
2430            .route_id("test-route")
2431            .to("log:info")
2432            .build()
2433            .unwrap();
2434
2435        assert_eq!(definition.concurrency_override(), None);
2436    }
2437
2438    // ── Route lifecycle builder tests ─────────────────────────────────────
2439
2440    #[test]
2441    fn test_builder_route_id_sets_id() {
2442        let definition = RouteBuilder::from("timer:tick")
2443            .route_id("my-route")
2444            .build()
2445            .unwrap();
2446
2447        assert_eq!(definition.route_id(), "my-route");
2448    }
2449
2450    #[test]
2451    fn test_build_without_route_id_fails() {
2452        let result = RouteBuilder::from("timer:tick?period=1000")
2453            .to("log:info")
2454            .build();
2455        let err = match result {
2456            Err(e) => e.to_string(),
2457            Ok(_) => panic!("build() should fail without route_id"),
2458        };
2459        assert!(
2460            err.contains("route_id"),
2461            "error should mention route_id, got: {}",
2462            err
2463        );
2464    }
2465
2466    #[test]
2467    fn test_builder_empty_route_id_rejected() {
2468        let result = RouteBuilder::from("timer:tick").route_id("").build();
2469        let err = result.err().expect("empty route_id should be rejected");
2470        assert!(matches!(err, CamelError::RouteError(_)));
2471    }
2472
2473    #[test]
2474    fn test_builder_whitespace_route_id_rejected() {
2475        let result = RouteBuilder::from("timer:tick").route_id("   ").build();
2476        assert!(result.is_err());
2477    }
2478
2479    #[test]
2480    fn test_builder_auto_startup_false() {
2481        let definition = RouteBuilder::from("timer:tick")
2482            .route_id("test-route")
2483            .auto_startup(false)
2484            .build()
2485            .unwrap();
2486
2487        assert!(!definition.auto_startup());
2488    }
2489
2490    #[test]
2491    fn test_builder_startup_order_custom() {
2492        let definition = RouteBuilder::from("timer:tick")
2493            .route_id("test-route")
2494            .startup_order(50)
2495            .build()
2496            .unwrap();
2497
2498        assert_eq!(definition.startup_order(), 50);
2499    }
2500
2501    #[test]
2502    fn test_builder_defaults() {
2503        let definition = RouteBuilder::from("timer:tick")
2504            .route_id("test-route")
2505            .build()
2506            .unwrap();
2507
2508        assert_eq!(definition.route_id(), "test-route");
2509        assert!(definition.auto_startup());
2510        assert_eq!(definition.startup_order(), 1000);
2511    }
2512
2513    // ── Choice typestate tests ──────────────────────────────────────────────────
2514
2515    #[test]
2516    fn test_choice_builder_single_when() {
2517        let definition = RouteBuilder::from("timer:tick")
2518            .route_id("test-route")
2519            .choice()
2520            .when(|ex: &Exchange| ex.input.header("type").is_some())
2521            .to("mock:typed")
2522            .end_when()
2523            .end_choice()
2524            .build()
2525            .unwrap();
2526        assert_eq!(definition.steps().len(), 1);
2527        assert!(
2528            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2529            if whens.len() == 1 && otherwise.is_none())
2530        );
2531    }
2532
2533    #[test]
2534    fn test_choice_builder_when_otherwise() {
2535        let definition = RouteBuilder::from("timer:tick")
2536            .route_id("test-route")
2537            .choice()
2538            .when(|ex: &Exchange| ex.input.header("a").is_some())
2539            .to("mock:a")
2540            .end_when()
2541            .otherwise()
2542            .to("mock:fallback")
2543            .end_otherwise()
2544            .end_choice()
2545            .build()
2546            .unwrap();
2547        assert!(
2548            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2549            if whens.len() == 1 && otherwise.is_some())
2550        );
2551    }
2552
2553    #[test]
2554    fn test_choice_builder_multiple_whens() {
2555        let definition = RouteBuilder::from("timer:tick")
2556            .route_id("test-route")
2557            .choice()
2558            .when(|ex: &Exchange| ex.input.header("a").is_some())
2559            .to("mock:a")
2560            .end_when()
2561            .when(|ex: &Exchange| ex.input.header("b").is_some())
2562            .to("mock:b")
2563            .end_when()
2564            .end_choice()
2565            .build()
2566            .unwrap();
2567        assert!(
2568            matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2569            if whens.len() == 2)
2570        );
2571    }
2572
2573    #[test]
2574    fn test_choice_step_after_choice() {
2575        // Steps after end_choice() are added to the outer pipeline, not inside choice.
2576        let definition = RouteBuilder::from("timer:tick")
2577            .route_id("test-route")
2578            .choice()
2579            .when(|_ex: &Exchange| true)
2580            .to("mock:inner")
2581            .end_when()
2582            .end_choice()
2583            .to("mock:outer") // must be step[1], not inside choice
2584            .build()
2585            .unwrap();
2586        assert_eq!(definition.steps().len(), 2);
2587        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2588    }
2589
2590    // ── Throttle typestate tests ──────────────────────────────────────────────────
2591
2592    #[test]
2593    fn test_throttle_builder_typestate() {
2594        let definition = RouteBuilder::from("timer:tick")
2595            .route_id("test-route")
2596            .throttle(10, std::time::Duration::from_secs(1))
2597            .to("mock:result")
2598            .end_throttle()
2599            .build()
2600            .unwrap();
2601
2602        assert_eq!(definition.steps().len(), 1);
2603        assert!(matches!(
2604            &definition.steps()[0],
2605            BuilderStep::Throttle { .. }
2606        ));
2607    }
2608
2609    #[test]
2610    fn test_throttle_builder_with_strategy() {
2611        let definition = RouteBuilder::from("timer:tick")
2612            .route_id("test-route")
2613            .throttle(10, std::time::Duration::from_secs(1))
2614            .strategy(ThrottleStrategy::Reject)
2615            .to("mock:result")
2616            .end_throttle()
2617            .build()
2618            .unwrap();
2619
2620        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2621            assert_eq!(config.strategy, ThrottleStrategy::Reject);
2622        } else {
2623            panic!("Expected Throttle step");
2624        }
2625    }
2626
2627    #[test]
2628    fn test_throttle_builder_steps_collected() {
2629        let definition = RouteBuilder::from("timer:tick")
2630            .route_id("test-route")
2631            .throttle(5, std::time::Duration::from_secs(1))
2632            .set_header("throttled", Value::Bool(true))
2633            .to("mock:throttled")
2634            .end_throttle()
2635            .build()
2636            .unwrap();
2637
2638        match &definition.steps()[0] {
2639            BuilderStep::Throttle { steps, .. } => {
2640                assert_eq!(steps.len(), 2); // SetHeader + To
2641            }
2642            other => panic!("Expected Throttle, got {:?}", other),
2643        }
2644    }
2645
2646    #[test]
2647    fn test_throttle_step_after_throttle() {
2648        // Steps after end_throttle() are added to the outer pipeline, not inside throttle.
2649        let definition = RouteBuilder::from("timer:tick")
2650            .route_id("test-route")
2651            .throttle(10, std::time::Duration::from_secs(1))
2652            .to("mock:inner")
2653            .end_throttle()
2654            .to("mock:outer")
2655            .build()
2656            .unwrap();
2657
2658        assert_eq!(definition.steps().len(), 2);
2659        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2660    }
2661
2662    // ── LoadBalance typestate tests ──────────────────────────────────────────────────
2663
2664    #[test]
2665    fn test_load_balance_builder_typestate() {
2666        let definition = RouteBuilder::from("timer:tick")
2667            .route_id("test-route")
2668            .load_balance()
2669            .round_robin()
2670            .to("mock:a")
2671            .to("mock:b")
2672            .end_load_balance()
2673            .build()
2674            .unwrap();
2675
2676        assert_eq!(definition.steps().len(), 1);
2677        assert!(matches!(
2678            &definition.steps()[0],
2679            BuilderStep::LoadBalance { .. }
2680        ));
2681    }
2682
2683    #[test]
2684    fn test_load_balance_builder_with_strategy() {
2685        let definition = RouteBuilder::from("timer:tick")
2686            .route_id("test-route")
2687            .load_balance()
2688            .random()
2689            .to("mock:result")
2690            .end_load_balance()
2691            .build()
2692            .unwrap();
2693
2694        if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2695            assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2696        } else {
2697            panic!("Expected LoadBalance step");
2698        }
2699    }
2700
2701    #[test]
2702    fn test_load_balance_builder_steps_collected() {
2703        let definition = RouteBuilder::from("timer:tick")
2704            .route_id("test-route")
2705            .load_balance()
2706            .set_header("lb", Value::Bool(true))
2707            .to("mock:a")
2708            .end_load_balance()
2709            .build()
2710            .unwrap();
2711
2712        match &definition.steps()[0] {
2713            BuilderStep::LoadBalance { steps, .. } => {
2714                assert_eq!(steps.len(), 2); // SetHeader + To
2715            }
2716            other => panic!("Expected LoadBalance, got {:?}", other),
2717        }
2718    }
2719
2720    #[test]
2721    fn test_load_balance_step_after_load_balance() {
2722        // Steps after end_load_balance() are added to the outer pipeline, not inside load_balance.
2723        let definition = RouteBuilder::from("timer:tick")
2724            .route_id("test-route")
2725            .load_balance()
2726            .to("mock:inner")
2727            .end_load_balance()
2728            .to("mock:outer")
2729            .build()
2730            .unwrap();
2731
2732        assert_eq!(definition.steps().len(), 2);
2733        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2734    }
2735
2736    // ── DynamicRouter typestate tests ──────────────────────────────────────────────────
2737
2738    #[test]
2739    fn test_dynamic_router_builder() {
2740        let definition = RouteBuilder::from("timer:tick")
2741            .route_id("test-route")
2742            .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2743            .build()
2744            .unwrap();
2745
2746        assert_eq!(definition.steps().len(), 1);
2747        assert!(matches!(
2748            &definition.steps()[0],
2749            BuilderStep::DynamicRouter { .. }
2750        ));
2751    }
2752
2753    #[test]
2754    fn test_dynamic_router_builder_with_config() {
2755        let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2756            .max_iterations(100)
2757            .cache_size(500);
2758
2759        let definition = RouteBuilder::from("timer:tick")
2760            .route_id("test-route")
2761            .dynamic_router_with_config(config)
2762            .build()
2763            .unwrap();
2764
2765        assert_eq!(definition.steps().len(), 1);
2766        if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2767            assert_eq!(config.max_iterations, 100);
2768            assert_eq!(config.cache_size, 500);
2769        } else {
2770            panic!("Expected DynamicRouter step");
2771        }
2772    }
2773
2774    #[test]
2775    fn test_dynamic_router_step_after_router() {
2776        // Steps after dynamic_router() are added to the outer pipeline.
2777        let definition = RouteBuilder::from("timer:tick")
2778            .route_id("test-route")
2779            .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2780            .to("mock:outer")
2781            .build()
2782            .unwrap();
2783
2784        assert_eq!(definition.steps().len(), 2);
2785        assert!(matches!(
2786            &definition.steps()[0],
2787            BuilderStep::DynamicRouter { .. }
2788        ));
2789        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2790    }
2791
2792    #[test]
2793    fn routing_slip_builder_creates_step() {
2794        use camel_api::RoutingSlipExpression;
2795
2796        let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2797
2798        let route = RouteBuilder::from("direct:start")
2799            .route_id("routing-slip-test")
2800            .routing_slip(expression)
2801            .build()
2802            .unwrap();
2803
2804        assert!(
2805            matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2806            "Expected RoutingSlip step"
2807        );
2808    }
2809
2810    #[test]
2811    fn routing_slip_with_config_builder_creates_step() {
2812        use camel_api::RoutingSlipConfig;
2813
2814        let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2815            .uri_delimiter("|")
2816            .cache_size(50)
2817            .ignore_invalid_endpoints(true);
2818
2819        let route = RouteBuilder::from("direct:start")
2820            .route_id("routing-slip-config-test")
2821            .routing_slip_with_config(config)
2822            .build()
2823            .unwrap();
2824
2825        if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2826            assert_eq!(config.uri_delimiter, "|");
2827            assert_eq!(config.cache_size, 50);
2828            assert!(config.ignore_invalid_endpoints);
2829        } else {
2830            panic!("Expected RoutingSlip step");
2831        }
2832    }
2833
2834    #[test]
2835    fn test_builder_marshal_adds_processor_step() {
2836        let definition = RouteBuilder::from("timer:tick")
2837            .route_id("test-route")
2838            .marshal("json")
2839            .unwrap()
2840            .build()
2841            .unwrap();
2842        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2843    }
2844
2845    #[test]
2846    fn test_builder_unmarshal_adds_processor_step() {
2847        let definition = RouteBuilder::from("timer:tick")
2848            .route_id("test-route")
2849            .unmarshal("json")
2850            .unwrap()
2851            .build()
2852            .unwrap();
2853        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2854    }
2855
2856    #[test]
2857    fn test_builder_stream_cache_adds_processor_step() {
2858        let definition = RouteBuilder::from("timer:tick")
2859            .route_id("test-route")
2860            .stream_cache(1024)
2861            .build()
2862            .unwrap();
2863        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2864    }
2865
2866    #[test]
2867    fn validate_adds_to_step_with_validator_uri() {
2868        let def = RouteBuilder::from("direct:in")
2869            .route_id("test")
2870            .validate("schemas/order.xsd")
2871            .build()
2872            .unwrap();
2873        let steps = def.steps();
2874        assert_eq!(steps.len(), 1);
2875        assert!(
2876            matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2877            "got: {:?}",
2878            steps[0]
2879        );
2880    }
2881
2882    #[test]
2883    fn test_builder_marshal_returns_err_for_unknown_format() {
2884        let result = RouteBuilder::from("timer:tick")
2885            .route_id("test-route")
2886            .marshal("csv");
2887        let err = match result {
2888            Err(e) => e,
2889            Ok(_) => panic!("marshal with unknown format should return Err"),
2890        };
2891        let msg = err.to_string();
2892        assert!(
2893            msg.contains("unknown data format"),
2894            "error should mention unknown format, got: {msg}"
2895        );
2896        assert!(
2897            msg.contains("csv"),
2898            "error should mention format name, got: {msg}"
2899        );
2900    }
2901
2902    #[test]
2903    fn test_builder_unmarshal_returns_err_for_unknown_format() {
2904        let result = RouteBuilder::from("timer:tick")
2905            .route_id("test-route")
2906            .unmarshal("csv");
2907        let err = match result {
2908            Err(e) => e,
2909            Ok(_) => panic!("unmarshal with unknown format should return Err"),
2910        };
2911        let msg = err.to_string();
2912        assert!(
2913            msg.contains("unknown data format"),
2914            "error should mention unknown format, got: {msg}"
2915        );
2916        assert!(
2917            msg.contains("csv"),
2918            "error should mention format name, got: {msg}"
2919        );
2920    }
2921
2922    #[test]
2923    fn test_builder_recipient_list_creates_step() {
2924        let route = RouteBuilder::from("direct:start")
2925            .route_id("recipient-list-test")
2926            .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2927            .build()
2928            .unwrap();
2929
2930        assert!(matches!(
2931            &route.steps()[0],
2932            BuilderStep::RecipientList { .. }
2933        ));
2934    }
2935
2936    #[test]
2937    fn test_builder_recipient_list_with_config_creates_step() {
2938        let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2939
2940        let route = RouteBuilder::from("direct:start")
2941            .route_id("recipient-list-config-test")
2942            .recipient_list_with_config(config)
2943            .build()
2944            .unwrap();
2945
2946        assert!(matches!(
2947            &route.steps()[0],
2948            BuilderStep::RecipientList { .. }
2949        ));
2950    }
2951
2952    #[test]
2953    fn test_builder_script_adds_script_step() {
2954        let route = RouteBuilder::from("direct:start")
2955            .route_id("script-test")
2956            .script("rhai", "headers[\"x\"] = \"y\"")
2957            .build()
2958            .unwrap();
2959
2960        assert!(matches!(
2961            &route.steps()[0],
2962            BuilderStep::Script { language, script }
2963            if language == "rhai" && script == "headers[\"x\"] = \"y\""
2964        ));
2965    }
2966
2967    #[test]
2968    fn test_builder_delay_and_delay_with_header_add_steps() {
2969        let route = RouteBuilder::from("direct:start")
2970            .route_id("delay-test")
2971            .delay(Duration::from_millis(250))
2972            .delay_with_header(Duration::from_millis(500), "x-delay")
2973            .build()
2974            .unwrap();
2975
2976        assert_eq!(route.steps().len(), 2);
2977        assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
2978        assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
2979    }
2980
2981    #[test]
2982    fn test_builder_log_and_stop_add_steps_in_order() {
2983        let route = RouteBuilder::from("direct:start")
2984            .route_id("log-stop-test")
2985            .log("hello", LogLevel::Info)
2986            .stop()
2987            .to("mock:after")
2988            .build()
2989            .unwrap();
2990
2991        assert_eq!(route.steps().len(), 3);
2992        assert!(matches!(
2993            &route.steps()[0],
2994            BuilderStep::Log { message, .. } if message == "hello"
2995        ));
2996        assert!(matches!(&route.steps()[1], BuilderStep::Stop));
2997        assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
2998    }
2999
3000    #[test]
3001    fn test_builder_stream_cache_default_adds_processor_step() {
3002        let route = RouteBuilder::from("direct:start")
3003            .route_id("stream-cache-default-test")
3004            .stream_cache_default()
3005            .build()
3006            .unwrap();
3007
3008        assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
3009    }
3010
3011    #[test]
3012    fn test_validate_preserves_existing_validator_prefix() {
3013        let route = RouteBuilder::from("direct:in")
3014            .route_id("validate-prefix-test")
3015            .validate("validator:schemas/order.xsd")
3016            .build()
3017            .unwrap();
3018
3019        assert!(matches!(
3020            &route.steps()[0],
3021            BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
3022        ));
3023    }
3024
3025    #[test]
3026    fn test_load_balance_builder_weighted_failover_parallel_config() {
3027        let route = RouteBuilder::from("direct:start")
3028            .route_id("lb-weighted-failover-parallel")
3029            .load_balance()
3030            .weighted(vec![
3031                ("direct:a".to_string(), 3),
3032                ("direct:b".to_string(), 1),
3033            ])
3034            .failover()
3035            .parallel(true)
3036            .to("mock:result")
3037            .end_load_balance()
3038            .build()
3039            .unwrap();
3040
3041        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3042            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3043            assert!(config.parallel);
3044        } else {
3045            panic!("Expected LoadBalance step");
3046        }
3047    }
3048
3049    #[test]
3050    fn test_multicast_builder_all_config_setters() {
3051        let route = RouteBuilder::from("direct:start")
3052            .route_id("multicast-config-test")
3053            .multicast()
3054            .parallel(true)
3055            .parallel_limit(4)
3056            .stop_on_exception(true)
3057            .timeout(Duration::from_millis(300))
3058            .aggregation(MulticastStrategy::Original)
3059            .to("mock:a")
3060            .end_multicast()
3061            .build()
3062            .unwrap();
3063
3064        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3065            assert!(config.parallel);
3066            assert_eq!(config.parallel_limit, Some(4));
3067            assert!(config.stop_on_exception);
3068            assert_eq!(config.timeout, Some(Duration::from_millis(300)));
3069            assert!(matches!(config.aggregation, MulticastStrategy::Original));
3070        } else {
3071            panic!("Expected Multicast step");
3072        }
3073    }
3074
3075    #[test]
3076    fn test_build_canonical_rejects_unsupported_processor_step() {
3077        let err = RouteBuilder::from("direct:start")
3078            .route_id("canonical-reject")
3079            .set_header("k", Value::String("v".into()))
3080            .build_canonical()
3081            .unwrap_err();
3082
3083        assert!(format!("{err}").contains("does not support step `processor`"));
3084    }
3085
3086    // ── LoadBalance strategy-specific tests ─────────────────────────────────────
3087
3088    #[test]
3089    fn test_load_balance_builder_weighted_strategy() {
3090        let route = RouteBuilder::from("direct:start")
3091            .route_id("lb-weighted")
3092            .load_balance()
3093            .weighted(vec![
3094                ("direct:a".to_string(), 5),
3095                ("direct:b".to_string(), 2),
3096                ("direct:c".to_string(), 1),
3097            ])
3098            .to("mock:result")
3099            .end_load_balance()
3100            .build()
3101            .unwrap();
3102
3103        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3104            assert!(matches!(config.strategy, LoadBalanceStrategy::Weighted(_)));
3105        } else {
3106            panic!("Expected LoadBalance step");
3107        }
3108    }
3109
3110    #[test]
3111    fn test_load_balance_builder_failover_strategy() {
3112        let route = RouteBuilder::from("direct:start")
3113            .route_id("lb-failover")
3114            .load_balance()
3115            .failover()
3116            .to("mock:primary")
3117            .end_load_balance()
3118            .build()
3119            .unwrap();
3120
3121        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3122            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3123            assert!(!config.parallel);
3124        } else {
3125            panic!("Expected LoadBalance step");
3126        }
3127    }
3128
3129    #[test]
3130    fn test_load_balance_builder_parallel_false_explicit() {
3131        let route = RouteBuilder::from("direct:start")
3132            .route_id("lb-parallel-false")
3133            .load_balance()
3134            .round_robin()
3135            .parallel(false)
3136            .to("mock:result")
3137            .end_load_balance()
3138            .build()
3139            .unwrap();
3140
3141        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3142            assert!(!config.parallel);
3143        } else {
3144            panic!("Expected LoadBalance step");
3145        }
3146    }
3147
3148    // ── FilterInSplitBuilder tests ──────────────────────────────────────────────
3149
3150    #[test]
3151    fn test_filter_in_split_builder_typestate() {
3152        use camel_api::splitter::{SplitterConfig, split_body_lines};
3153
3154        let definition = RouteBuilder::from("timer:test")
3155            .route_id("filter-in-split")
3156            .split(SplitterConfig::new(split_body_lines()))
3157            .filter(|_ex| true)
3158            .to("mock:filtered")
3159            .end_filter()
3160            .end_split()
3161            .build()
3162            .unwrap();
3163
3164        assert_eq!(definition.steps().len(), 1);
3165        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3166            assert_eq!(steps.len(), 1);
3167            assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3168        } else {
3169            panic!("Expected Split step");
3170        }
3171    }
3172
3173    #[test]
3174    fn test_filter_in_split_builder_multiple_steps() {
3175        use camel_api::splitter::{SplitterConfig, split_body_lines};
3176
3177        let definition = RouteBuilder::from("timer:test")
3178            .route_id("filter-in-split-multi")
3179            .split(SplitterConfig::new(split_body_lines()))
3180            .to("mock:before-filter")
3181            .filter(|_ex| true)
3182            .to("mock:inside-filter")
3183            .end_filter()
3184            .to("mock:after-filter")
3185            .end_split()
3186            .build()
3187            .unwrap();
3188
3189        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3190            // To("before-filter") + Filter{...} + To("after-filter") = 3
3191            assert_eq!(steps.len(), 3);
3192        } else {
3193            panic!("Expected Split step");
3194        }
3195    }
3196
3197    // ── build_canonical tests ───────────────────────────────────────────────────
3198
3199    #[test]
3200    fn test_build_canonical_with_circuit_breaker() {
3201        use camel_api::circuit_breaker::CircuitBreakerConfig;
3202
3203        let spec = RouteBuilder::from("direct:start")
3204            .route_id("canonical-cb")
3205            .circuit_breaker(CircuitBreakerConfig::new().failure_threshold(10))
3206            .to("mock:result")
3207            .build_canonical()
3208            .unwrap();
3209
3210        let cb = spec.circuit_breaker.expect("circuit breaker should be set");
3211        assert_eq!(cb.failure_threshold, 10);
3212    }
3213
3214    #[test]
3215    fn test_build_canonical_rejects_custom_split_aggregation() {
3216        use camel_api::splitter::{SplitterConfig, split_body_lines};
3217
3218        let err = RouteBuilder::from("direct:start")
3219            .route_id("canonical-custom-split")
3220            .split(SplitterConfig::new(split_body_lines()).aggregation(
3221                camel_api::splitter::AggregationStrategy::Custom(Arc::new(|_, ex| ex)),
3222            ))
3223            .to("mock:frag")
3224            .end_split()
3225            .build_canonical()
3226            .unwrap_err();
3227
3228        // Split with closure-based expression is rejected in canonical v1.
3229        assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3230    }
3231
3232    #[test]
3233    fn test_build_canonical_rejects_custom_aggregate_strategy() {
3234        let err = RouteBuilder::from("direct:start")
3235            .route_id("canonical-custom-agg")
3236            .aggregate(
3237                AggregatorConfig::correlate_by("key")
3238                    .complete_when_size(2)
3239                    .strategy(AggregationStrategy::Custom(Arc::new(|_, ex| ex)))
3240                    .build()
3241                    .unwrap(),
3242            )
3243            .build_canonical()
3244            .unwrap_err();
3245
3246        assert!(format!("{err}").contains("custom aggregate strategy"));
3247    }
3248
3249    #[test]
3250    fn test_build_canonical_rejects_fn_correlation_strategy() {
3251        let err = RouteBuilder::from("direct:start")
3252            .route_id("canonical-fn-corr")
3253            .aggregate(AggregatorConfig {
3254                header_name: "key".to_string(),
3255                completion: CompletionMode::Single(CompletionCondition::Size(1)),
3256                correlation: CorrelationStrategy::Fn(Arc::new(|_| Some("key".to_string()))),
3257                strategy: AggregationStrategy::CollectAll,
3258                max_buckets: None,
3259                bucket_ttl: None,
3260                force_completion_on_stop: false,
3261                discard_on_timeout: false,
3262            })
3263            .build_canonical()
3264            .unwrap_err();
3265
3266        assert!(format!("{err}").contains("Fn correlation strategy"));
3267    }
3268
3269    #[test]
3270    fn test_build_canonical_rejects_predicate_completion() {
3271        let err = RouteBuilder::from("direct:start")
3272            .route_id("canonical-pred-completion")
3273            .aggregate(AggregatorConfig {
3274                header_name: "key".to_string(),
3275                completion: CompletionMode::Single(CompletionCondition::Predicate(Arc::new(
3276                    |_| false,
3277                ))),
3278                correlation: CorrelationStrategy::HeaderName("key".to_string()),
3279                strategy: AggregationStrategy::CollectAll,
3280                max_buckets: None,
3281                bucket_ttl: None,
3282                force_completion_on_stop: false,
3283                discard_on_timeout: false,
3284            })
3285            .build_canonical()
3286            .unwrap_err();
3287
3288        assert!(format!("{err}").contains("predicate completion"));
3289    }
3290
3291    #[test]
3292    fn test_build_canonical_with_expression_correlation() {
3293        let spec = RouteBuilder::from("direct:start")
3294            .route_id("canonical-expr-corr")
3295            .aggregate(AggregatorConfig {
3296                header_name: "key".to_string(),
3297                completion: CompletionMode::Single(CompletionCondition::Size(1)),
3298                correlation: CorrelationStrategy::Expression {
3299                    expr: "header.key".to_string(),
3300                    language: "simple".to_string(),
3301                },
3302                strategy: AggregationStrategy::CollectAll,
3303                max_buckets: None,
3304                bucket_ttl: None,
3305                force_completion_on_stop: false,
3306                discard_on_timeout: false,
3307            })
3308            .build_canonical()
3309            .unwrap();
3310
3311        assert!(spec.steps.iter().any(|s| matches!(s, CanonicalStepSpec::Aggregate(a) if a.correlation_key == Some("header.key".to_string()))));
3312    }
3313
3314    #[test]
3315    fn test_build_canonical_split_rejected_with_closure_expression() {
3316        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
3317
3318        // Builder-based split uses closure expressions, which are not serializable.
3319        let err = RouteBuilder::from("direct:start")
3320            .route_id("canonical-split-last")
3321            .split(
3322                SplitterConfig::new(split_body_lines()).aggregation(AggregationStrategy::LastWins),
3323            )
3324            .to("mock:frag")
3325            .end_split()
3326            .build_canonical()
3327            .unwrap_err();
3328
3329        assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3330    }
3331
3332    // ── OnExceptionBuilder full chain tests ─────────────────────────────────────
3333
3334    #[test]
3335    fn test_on_exception_full_chain_retry_backoff_jitter_handled_by() {
3336        let definition = RouteBuilder::from("direct:start")
3337            .route_id("on-exception-full")
3338            .dead_letter_channel("log:dlc")
3339            .on_exception(|e| matches!(e, CamelError::Io(_)))
3340            .retry(5)
3341            .with_backoff(Duration::from_millis(10), 2.0, Duration::from_millis(500))
3342            .with_jitter(0.3)
3343            .handled_by("log:io-handler")
3344            .end_on_exception()
3345            .to("mock:out")
3346            .build()
3347            .unwrap();
3348
3349        let cfg = definition
3350            .error_handler_config()
3351            .expect("error handler should be set");
3352        assert_eq!(cfg.policies.len(), 1);
3353        let policy = &cfg.policies[0];
3354        let retry = policy.retry.as_ref().expect("retry should be set");
3355        assert_eq!(retry.max_attempts, 5);
3356        assert_eq!(retry.initial_delay, Duration::from_millis(10));
3357        assert_eq!(retry.multiplier, 2.0);
3358        assert_eq!(retry.max_delay, Duration::from_millis(500));
3359        assert!((retry.jitter_factor - 0.3).abs() < f64::EPSILON);
3360        assert_eq!(policy.handled_by.as_deref(), Some("log:io-handler"));
3361    }
3362
3363    #[test]
3364    fn test_on_exception_jitter_clamped_to_valid_range() {
3365        let definition = RouteBuilder::from("direct:start")
3366            .route_id("jitter-clamp")
3367            .on_exception(|_e| true)
3368            .retry(1)
3369            .with_jitter(5.0)
3370            .end_on_exception()
3371            .to("mock:out")
3372            .build()
3373            .unwrap();
3374
3375        let cfg = definition.error_handler_config().unwrap();
3376        let retry = cfg.policies[0].retry.as_ref().unwrap();
3377        assert!((retry.jitter_factor - 1.0).abs() < f64::EPSILON);
3378    }
3379
3380    // ── StepAccumulator: process_fn, convert_body_to, bean ──────────────────────
3381
3382    #[test]
3383    fn test_builder_process_fn_adds_processor_step() {
3384        use camel_api::BoxProcessorExt;
3385        let processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
3386        let definition = RouteBuilder::from("timer:tick")
3387            .route_id("process-fn-test")
3388            .process_fn(processor)
3389            .build()
3390            .unwrap();
3391
3392        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3393    }
3394
3395    #[test]
3396    fn test_builder_convert_body_to_adds_processor_step() {
3397        let definition = RouteBuilder::from("timer:tick")
3398            .route_id("convert-body-test")
3399            .convert_body_to(BodyType::Json)
3400            .build()
3401            .unwrap();
3402
3403        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3404    }
3405
3406    #[test]
3407    fn test_builder_bean_adds_bean_step() {
3408        let definition = RouteBuilder::from("timer:tick")
3409            .route_id("bean-test")
3410            .bean("myBean", "process")
3411            .build()
3412            .unwrap();
3413
3414        assert!(
3415            matches!(&definition.steps()[0], BuilderStep::Bean { name, method }
3416            if name == "myBean" && method == "process")
3417        );
3418    }
3419
3420    // ── Throttle strategy-specific tests ────────────────────────────────────────
3421
3422    #[test]
3423    fn test_throttle_builder_delay_strategy() {
3424        let definition = RouteBuilder::from("timer:tick")
3425            .route_id("throttle-delay")
3426            .throttle(10, Duration::from_secs(1))
3427            .strategy(ThrottleStrategy::Delay)
3428            .to("mock:result")
3429            .end_throttle()
3430            .build()
3431            .unwrap();
3432
3433        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3434            assert_eq!(config.strategy, ThrottleStrategy::Delay);
3435        } else {
3436            panic!("Expected Throttle step");
3437        }
3438    }
3439
3440    #[test]
3441    fn test_throttle_builder_drop_strategy() {
3442        let definition = RouteBuilder::from("timer:tick")
3443            .route_id("throttle-drop")
3444            .throttle(10, Duration::from_secs(1))
3445            .strategy(ThrottleStrategy::Drop)
3446            .to("mock:result")
3447            .end_throttle()
3448            .build()
3449            .unwrap();
3450
3451        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3452            assert_eq!(config.strategy, ThrottleStrategy::Drop);
3453        } else {
3454            panic!("Expected Throttle step");
3455        }
3456    }
3457
3458    // ── LoopInLoopBuilder with loop_while ───────────────────────────────────────
3459
3460    #[test]
3461    fn test_nested_loop_while_builder() {
3462        use camel_api::loop_eip::LoopMode;
3463
3464        let def = RouteBuilder::from("direct:start")
3465            .route_id("nested-loop-while")
3466            .loop_count(2)
3467            .to("mock:outer")
3468            .loop_while(|_ex| true)
3469            .to("mock:inner")
3470            .end_loop()
3471            .end_loop()
3472            .build()
3473            .unwrap();
3474
3475        assert_eq!(def.steps().len(), 1);
3476        if let BuilderStep::Loop { steps, .. } = &def.steps()[0] {
3477            assert_eq!(steps.len(), 2);
3478            if let BuilderStep::Loop { config, .. } = &steps[1] {
3479                assert!(matches!(config.mode, LoopMode::While(_)));
3480            } else {
3481                panic!("Expected inner Loop step");
3482            }
3483        } else {
3484            panic!("Expected outer Loop step");
3485        }
3486    }
3487
3488    // ── Choice with multiple whens + otherwise ──────────────────────────────────
3489
3490    #[test]
3491    fn test_choice_builder_multiple_whens_with_otherwise() {
3492        let definition = RouteBuilder::from("timer:tick")
3493            .route_id("choice-multi-otherwise")
3494            .choice()
3495            .when(|ex: &Exchange| ex.input.header("a").is_some())
3496            .to("mock:a")
3497            .end_when()
3498            .when(|ex: &Exchange| ex.input.header("b").is_some())
3499            .to("mock:b")
3500            .end_when()
3501            .when(|ex: &Exchange| ex.input.header("c").is_some())
3502            .to("mock:c")
3503            .end_when()
3504            .otherwise()
3505            .to("mock:fallback")
3506            .end_otherwise()
3507            .end_choice()
3508            .build()
3509            .unwrap();
3510
3511        if let BuilderStep::Choice { whens, otherwise } = &definition.steps()[0] {
3512            assert_eq!(whens.len(), 3);
3513            assert!(otherwise.is_some());
3514            assert_eq!(otherwise.as_ref().unwrap().len(), 1);
3515        } else {
3516            panic!("Expected Choice step");
3517        }
3518    }
3519
3520    // ── Multicast individual config tests ───────────────────────────────────────
3521
3522    #[test]
3523    fn test_multicast_builder_parallel_only() {
3524        let route = RouteBuilder::from("direct:start")
3525            .route_id("multicast-parallel")
3526            .multicast()
3527            .parallel(true)
3528            .to("mock:a")
3529            .end_multicast()
3530            .build()
3531            .unwrap();
3532
3533        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3534            assert!(config.parallel);
3535            assert_eq!(config.parallel_limit, None);
3536        } else {
3537            panic!("Expected Multicast step");
3538        }
3539    }
3540
3541    #[test]
3542    fn test_multicast_builder_timeout_only() {
3543        let route = RouteBuilder::from("direct:start")
3544            .route_id("multicast-timeout")
3545            .multicast()
3546            .timeout(Duration::from_secs(5))
3547            .to("mock:a")
3548            .end_multicast()
3549            .build()
3550            .unwrap();
3551
3552        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3553            assert_eq!(config.timeout, Some(Duration::from_secs(5)));
3554        } else {
3555            panic!("Expected Multicast step");
3556        }
3557    }
3558
3559    #[test]
3560    fn test_multicast_builder_aggregation_collect_all() {
3561        let route = RouteBuilder::from("direct:start")
3562            .route_id("multicast-collect")
3563            .multicast()
3564            .aggregation(MulticastStrategy::CollectAll)
3565            .to("mock:a")
3566            .end_multicast()
3567            .build()
3568            .unwrap();
3569
3570        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3571            assert!(matches!(config.aggregation, MulticastStrategy::CollectAll));
3572        } else {
3573            panic!("Expected Multicast step");
3574        }
3575    }
3576
3577    // ── extract_completion_fields: Any mode with multiple conditions ────────────
3578
3579    #[test]
3580    fn test_build_canonical_aggregate_any_completion_mode() {
3581        let spec = RouteBuilder::from("direct:start")
3582            .route_id("canonical-any-completion")
3583            .aggregate(
3584                AggregatorConfig::correlate_by("key")
3585                    .complete_on_size_or_timeout(10, Duration::from_secs(30))
3586                    .build()
3587                    .unwrap(),
3588            )
3589            .build_canonical()
3590            .unwrap();
3591
3592        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3593            assert_eq!(agg.completion_size, Some(10));
3594            assert_eq!(agg.completion_timeout_ms, Some(30_000));
3595        } else {
3596            panic!("Expected Aggregate step");
3597        }
3598    }
3599
3600    #[test]
3601    fn test_build_canonical_aggregate_timeout_completion() {
3602        let spec = RouteBuilder::from("direct:start")
3603            .route_id("canonical-timeout-completion")
3604            .aggregate(
3605                AggregatorConfig::correlate_by("key")
3606                    .complete_on_timeout(Duration::from_millis(500))
3607                    .build()
3608                    .unwrap(),
3609            )
3610            .build_canonical()
3611            .unwrap();
3612
3613        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3614            assert_eq!(agg.completion_size, None);
3615            assert_eq!(agg.completion_timeout_ms, Some(500));
3616        } else {
3617            panic!("Expected Aggregate step");
3618        }
3619    }
3620
3621    // ── canonicalize_aggregate: discard_on_timeout and force_completion_on_stop ─
3622
3623    #[test]
3624    fn test_build_canonical_aggregate_discard_on_timeout() {
3625        use camel_api::aggregator::AggregatorConfig;
3626
3627        let spec = RouteBuilder::from("direct:start")
3628            .route_id("canonical-discard-timeout")
3629            .aggregate(
3630                AggregatorConfig::correlate_by("key")
3631                    .complete_when_size(1)
3632                    .discard_on_timeout(true)
3633                    .build()
3634                    .unwrap(),
3635            )
3636            .build_canonical()
3637            .unwrap();
3638
3639        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3640            assert_eq!(agg.discard_on_timeout, Some(true));
3641        } else {
3642            panic!("Expected Aggregate step");
3643        }
3644    }
3645
3646    #[test]
3647    fn test_build_canonical_aggregate_force_completion_on_stop() {
3648        use camel_api::aggregator::AggregatorConfig;
3649
3650        let spec = RouteBuilder::from("direct:start")
3651            .route_id("canonical-force-stop")
3652            .aggregate(
3653                AggregatorConfig::correlate_by("key")
3654                    .complete_when_size(1)
3655                    .force_completion_on_stop(true)
3656                    .build()
3657                    .unwrap(),
3658            )
3659            .build_canonical()
3660            .unwrap();
3661
3662        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3663            assert_eq!(agg.force_completion_on_stop, Some(true));
3664        } else {
3665            panic!("Expected Aggregate step");
3666        }
3667    }
3668
3669    // ── build_canonical: max_buckets and bucket_ttl ─────────────────────────────
3670
3671    #[test]
3672    fn test_build_canonical_aggregate_max_buckets_and_ttl() {
3673        use camel_api::aggregator::AggregatorConfig;
3674
3675        let spec = RouteBuilder::from("direct:start")
3676            .route_id("canonical-buckets-ttl")
3677            .aggregate(
3678                AggregatorConfig::correlate_by("key")
3679                    .complete_when_size(1)
3680                    .max_buckets(100)
3681                    .bucket_ttl(Duration::from_secs(60))
3682                    .build()
3683                    .unwrap(),
3684            )
3685            .build_canonical()
3686            .unwrap();
3687
3688        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3689            assert_eq!(agg.max_buckets, Some(100));
3690            assert_eq!(agg.bucket_ttl_ms, Some(60_000));
3691        } else {
3692            panic!("Expected Aggregate step");
3693        }
3694    }
3695
3696    // ── SplitBuilder with filter inside ─────────────────────────────────────────
3697
3698    #[test]
3699    fn test_split_builder_with_filter_inside() {
3700        use camel_api::splitter::{SplitterConfig, split_body_lines};
3701
3702        let definition = RouteBuilder::from("timer:test")
3703            .route_id("split-with-filter")
3704            .split(SplitterConfig::new(split_body_lines()))
3705            .filter(|_ex| true)
3706            .to("mock:filtered-frag")
3707            .end_filter()
3708            .end_split()
3709            .build()
3710            .unwrap();
3711
3712        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3713            assert_eq!(steps.len(), 1);
3714            assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3715        } else {
3716            panic!("Expected Split step");
3717        }
3718    }
3719
3720    // ── WireTap additional tests ────────────────────────────────────────────────
3721
3722    #[test]
3723    fn test_wire_tap_multiple_taps() {
3724        let definition = RouteBuilder::from("timer:tick")
3725            .route_id("multi-wire-tap")
3726            .wire_tap("mock:tap1")
3727            .wire_tap("mock:tap2")
3728            .to("mock:result")
3729            .build()
3730            .unwrap();
3731
3732        assert_eq!(definition.steps().len(), 3);
3733        assert!(
3734            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap1")
3735        );
3736        assert!(
3737            matches!(&definition.steps()[1], BuilderStep::WireTap { uri } if uri == "mock:tap2")
3738        );
3739    }
3740
3741    // ── Error handler: explicit config after shorthand → Mixed mode ─────────────
3742
3743    #[test]
3744    fn test_builder_shorthand_then_explicit_mixed_mode() {
3745        let result = RouteBuilder::from("direct:start")
3746            .route_id("mixed-mode-2")
3747            .dead_letter_channel("log:dlc")
3748            .error_handler(ErrorHandlerConfig::log_only())
3749            .to("mock:out")
3750            .build();
3751
3752        let err = result.err().expect("mixed mode should fail");
3753        assert!(format!("{err}").contains("mixed error handler modes"));
3754    }
3755
3756    // ── build_canonical: empty from_uri error ───────────────────────────────────
3757
3758    #[test]
3759    fn test_build_canonical_empty_from_uri_errors() {
3760        let result = RouteBuilder::from("").route_id("test").build_canonical();
3761        assert!(result.is_err());
3762    }
3763
3764    #[test]
3765    fn test_build_canonical_missing_route_id_errors() {
3766        let result = RouteBuilder::from("direct:start").build_canonical();
3767        assert!(result.is_err());
3768        let err = result.unwrap_err().to_string();
3769        assert!(err.contains("route_id"));
3770    }
3771
3772    // ── SplitBuilder: aggregate inside split ────────────────────────────────────
3773
3774    #[test]
3775    fn test_split_builder_with_aggregate_inside() {
3776        use camel_api::aggregator::AggregatorConfig;
3777        use camel_api::splitter::{SplitterConfig, split_body_lines};
3778
3779        let definition = RouteBuilder::from("timer:test")
3780            .route_id("split-agg")
3781            .split(SplitterConfig::new(split_body_lines()))
3782            .aggregate(
3783                AggregatorConfig::correlate_by("frag-key")
3784                    .complete_when_size(3)
3785                    .build()
3786                    .unwrap(),
3787            )
3788            .end_split()
3789            .build()
3790            .unwrap();
3791
3792        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3793            assert_eq!(steps.len(), 1);
3794            assert!(matches!(&steps[0], BuilderStep::Aggregate { .. }));
3795        } else {
3796            panic!("Expected Split step");
3797        }
3798    }
3799
3800    // ── Throttle: steps collected inside throttle scope ─────────────────────────
3801
3802    #[test]
3803    fn test_throttle_builder_with_steps_inside() {
3804        let definition = RouteBuilder::from("timer:tick")
3805            .route_id("throttle-steps")
3806            .throttle(10, Duration::from_secs(1))
3807            .set_header("throttled", Value::Bool(true))
3808            .to("mock:throttled")
3809            .end_throttle()
3810            .build()
3811            .unwrap();
3812
3813        if let BuilderStep::Throttle { steps, .. } = &definition.steps()[0] {
3814            assert_eq!(steps.len(), 2);
3815        } else {
3816            panic!("Expected Throttle step");
3817        }
3818    }
3819
3820    // ── LoadBalance: steps collected inside scope ───────────────────────────────
3821
3822    #[test]
3823    fn test_load_balance_builder_with_steps_inside() {
3824        let definition = RouteBuilder::from("timer:tick")
3825            .route_id("lb-steps")
3826            .load_balance()
3827            .round_robin()
3828            .set_header("lb", Value::Bool(true))
3829            .to("mock:lb")
3830            .end_load_balance()
3831            .build()
3832            .unwrap();
3833
3834        if let BuilderStep::LoadBalance { steps, .. } = &definition.steps()[0] {
3835            assert_eq!(steps.len(), 2);
3836        } else {
3837            panic!("Expected LoadBalance step");
3838        }
3839    }
3840
3841    // ── Multicast: steps collected inside scope ─────────────────────────────────
3842
3843    #[test]
3844    fn test_multicast_builder_with_steps_inside() {
3845        let definition = RouteBuilder::from("timer:tick")
3846            .route_id("multicast-steps")
3847            .multicast()
3848            .set_header("mc", Value::Bool(true))
3849            .to("mock:multicast")
3850            .end_multicast()
3851            .build()
3852            .unwrap();
3853
3854        if let BuilderStep::Multicast { steps, .. } = &definition.steps()[0] {
3855            assert_eq!(steps.len(), 2);
3856        } else {
3857            panic!("Expected Multicast step");
3858        }
3859    }
3860
3861    // ── LoopBuilder: steps collected inside loop scope ──────────────────────────
3862
3863    #[test]
3864    fn test_loop_builder_with_steps_inside() {
3865        let definition = RouteBuilder::from("timer:tick")
3866            .route_id("loop-steps")
3867            .loop_count(3)
3868            .set_header("loop", Value::Bool(true))
3869            .to("mock:loop")
3870            .end_loop()
3871            .build()
3872            .unwrap();
3873
3874        if let BuilderStep::Loop { steps, .. } = &definition.steps()[0] {
3875            assert_eq!(steps.len(), 2);
3876        } else {
3877            panic!("Expected Loop step");
3878        }
3879    }
3880
3881    // ── canonical_step_name coverage for remaining variants ─────────────────────
3882
3883    #[test]
3884    fn test_build_canonical_rejects_loop_step() {
3885        let err = RouteBuilder::from("direct:start")
3886            .route_id("canonical-loop")
3887            .loop_count(3)
3888            .to("mock:loop")
3889            .end_loop()
3890            .build_canonical()
3891            .unwrap_err();
3892
3893        assert!(format!("{err}").contains("does not support step `loop`"));
3894    }
3895
3896    #[test]
3897    fn test_build_canonical_rejects_multicast_step() {
3898        let err = RouteBuilder::from("direct:start")
3899            .route_id("canonical-multicast")
3900            .multicast()
3901            .to("mock:a")
3902            .end_multicast()
3903            .build_canonical()
3904            .unwrap_err();
3905
3906        assert!(format!("{err}").contains("does not support step `multicast`"));
3907    }
3908
3909    #[test]
3910    fn test_build_canonical_rejects_throttle_step() {
3911        let err = RouteBuilder::from("direct:start")
3912            .route_id("canonical-throttle")
3913            .throttle(10, Duration::from_secs(1))
3914            .to("mock:result")
3915            .end_throttle()
3916            .build_canonical()
3917            .unwrap_err();
3918
3919        assert!(format!("{err}").contains("does not support step `throttle`"));
3920    }
3921
3922    #[test]
3923    fn test_build_canonical_rejects_load_balancer_step() {
3924        let err = RouteBuilder::from("direct:start")
3925            .route_id("canonical-lb")
3926            .load_balance()
3927            .round_robin()
3928            .to("mock:result")
3929            .end_load_balance()
3930            .build_canonical()
3931            .unwrap_err();
3932
3933        assert!(format!("{err}").contains("does not support step `load_balancer`"));
3934    }
3935
3936    #[test]
3937    fn test_build_canonical_rejects_bean_step() {
3938        let err = RouteBuilder::from("direct:start")
3939            .route_id("canonical-bean")
3940            .bean("myBean", "process")
3941            .build_canonical()
3942            .unwrap_err();
3943
3944        assert!(format!("{err}").contains("does not support step `bean`"));
3945    }
3946
3947    #[test]
3948    fn test_build_canonical_rejects_script_step() {
3949        let err = RouteBuilder::from("direct:start")
3950            .route_id("canonical-script")
3951            .script("rhai", "x = 1")
3952            .build_canonical()
3953            .unwrap_err();
3954
3955        assert!(format!("{err}").contains("does not support step `script`"));
3956    }
3957
3958    #[test]
3959    fn test_build_canonical_accepts_delay_step() {
3960        let spec = RouteBuilder::from("direct:start")
3961            .route_id("canonical-delay")
3962            .delay(Duration::from_millis(100))
3963            .build_canonical()
3964            .unwrap();
3965
3966        assert!(
3967            spec.steps.iter().any(
3968                |s| matches!(s, CanonicalStepSpec::Delay { delay_ms, .. } if *delay_ms == 100)
3969            )
3970        );
3971    }
3972
3973    #[test]
3974    fn test_build_canonical_accepts_wire_tap_step() {
3975        let spec = RouteBuilder::from("direct:start")
3976            .route_id("canonical-wiretap")
3977            .wire_tap("mock:tap")
3978            .build_canonical()
3979            .unwrap();
3980
3981        assert!(
3982            spec.steps
3983                .iter()
3984                .any(|s| matches!(s, CanonicalStepSpec::WireTap { uri } if uri == "mock:tap"))
3985        );
3986    }
3987
3988    #[test]
3989    fn test_build_canonical_rejects_dynamic_router_step() {
3990        let err = RouteBuilder::from("direct:start")
3991            .route_id("canonical-dyn-router")
3992            .dynamic_router(Arc::new(|_| Some("mock:a".to_string())))
3993            .build_canonical()
3994            .unwrap_err();
3995
3996        assert!(format!("{err}").contains("does not support step `dynamic_router`"));
3997    }
3998
3999    #[test]
4000    fn test_build_canonical_rejects_routing_slip_step() {
4001        let err = RouteBuilder::from("direct:start")
4002            .route_id("canonical-routing-slip")
4003            .routing_slip(Arc::new(|_| Some("mock:a".to_string())))
4004            .build_canonical()
4005            .unwrap_err();
4006
4007        assert!(format!("{err}").contains("does not support step `routing_slip`"));
4008    }
4009
4010    #[test]
4011    fn test_build_canonical_rejects_recipient_list_step() {
4012        let err = RouteBuilder::from("direct:start")
4013            .route_id("canonical-recipient")
4014            .recipient_list(Arc::new(|_| "mock:a".to_string()))
4015            .build_canonical()
4016            .unwrap_err();
4017
4018        assert!(format!("{err}").contains("does not support step `recipient_list`"));
4019    }
4020
4021    // ── extract_completion_fields: Any mode with predicate → error ──────────────
4022
4023    #[test]
4024    fn test_build_canonical_rejects_any_mode_with_predicate() {
4025        let err = RouteBuilder::from("direct:start")
4026            .route_id("canonical-any-pred")
4027            .aggregate(AggregatorConfig {
4028                header_name: "key".to_string(),
4029                completion: CompletionMode::Any(vec![
4030                    CompletionCondition::Size(5),
4031                    CompletionCondition::Predicate(Arc::new(|_| false)),
4032                ]),
4033                correlation: CorrelationStrategy::HeaderName("key".to_string()),
4034                strategy: AggregationStrategy::CollectAll,
4035                max_buckets: None,
4036                bucket_ttl: None,
4037                force_completion_on_stop: false,
4038                discard_on_timeout: false,
4039            })
4040            .build_canonical()
4041            .unwrap_err();
4042
4043        assert!(format!("{err}").contains("predicate completion"));
4044    }
4045
4046    // ── BUILDER-004: Validation errors for missing required fields ────────────
4047
4048    #[test]
4049    fn test_builder_validation_missing_from_uri() {
4050        let result = RouteBuilder::from("")
4051            .route_id("missing-uri-route")
4052            .to("log:info")
4053            .build();
4054        assert!(result.is_err(), "empty from URI should fail validation");
4055        let err = result.err().unwrap().to_string();
4056        assert!(
4057            err.contains("'from'") || err.contains("URI"),
4058            "error should mention from/URI, got: {err}"
4059        );
4060    }
4061
4062    #[test]
4063    fn test_builder_validation_invalid_step_uri_scheme() {
4064        let result = RouteBuilder::from("timer:tick")
4065            .route_id("bad-step-route")
4066            .to("not-a-valid-uri") // no scheme
4067            .build();
4068        // The builder itself accepts any URI string; validation happens at
4069        // resolution time. Verify the build succeeds (step URI is deferred).
4070        assert!(
4071            result.is_ok(),
4072            "builder should accept opaque step URIs; resolution happens later"
4073        );
4074    }
4075
4076    // ── BUILDER-006: Duplicate route IDs ──────────────────────────────────────
4077
4078    #[test]
4079    fn test_builder_duplicate_route_ids_produce_identical_definitions() {
4080        // The builder itself doesn't check for duplicates (that's context-level).
4081        // Verify both builds succeed with the same ID — detection is TODO(BUILDER-006).
4082        let route1 = RouteBuilder::from("direct:a")
4083            .route_id("dup-route")
4084            .to("mock:out")
4085            .build();
4086        let route2 = RouteBuilder::from("direct:b")
4087            .route_id("dup-route")
4088            .to("mock:out")
4089            .build();
4090
4091        assert!(route1.is_ok());
4092        assert!(route2.is_ok());
4093        assert_eq!(route1.unwrap().route_id(), route2.unwrap().route_id());
4094    }
4095}