Skip to main content

camel_builder/
lib.rs

1//! Fluent builder API for constructing Camel routes programmatically with EIP patterns.
2//!
3//! Main types: `RouteBuilder`, `StepAccumulator`, `SplitBuilder`, `ChoiceBuilder`, `MulticastBuilder`,
4//! `ThrottleBuilder`, `LoopBuilder`, `LoadBalancerBuilder`, `OnExceptionBuilder`.
5
6use camel_api::DelayConfig;
7use camel_api::aggregator::{
8    AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
9};
10use camel_api::body::Body;
11use camel_api::body_converter::BodyType;
12use camel_api::circuit_breaker::CircuitBreakerConfig;
13use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
14use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
15use camel_api::load_balancer::LoadBalancerConfig;
16use camel_api::loop_eip::{LoopConfig, LoopMode};
17use camel_api::multicast::{MulticastConfig, MulticastStrategy};
18use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
19use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
20use camel_api::splitter::SplitterConfig;
21use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
22use camel_api::{
23    BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
24    ProcessorFn, Value,
25    runtime::{
26        CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
27        CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
28        CanonicalWhenSpec,
29    },
30};
31use camel_component_api::ConcurrencyModel;
32use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
33use camel_processor::{
34    ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
35    StreamCacheService, UnmarshalService, builtin_data_format,
36};
37
38/// Shared step-accumulation methods for all builder types.
39///
40/// Implementors provide `steps_mut()` and get step-adding methods for free.
41/// `filter()` and other branching methods are NOT included — they return
42/// different types per builder and stay as per-builder methods.
43pub trait StepAccumulator: Sized {
44    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
45
46    fn to(mut self, endpoint: impl Into<String>) -> Self {
47        self.steps_mut().push(BuilderStep::To(endpoint.into()));
48        self
49    }
50
51    fn process<F, Fut>(mut self, f: F) -> Self
52    where
53        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
54        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
55    {
56        let svc = ProcessorFn::new(f);
57        self.steps_mut()
58            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
59        self
60    }
61
62    fn process_fn(mut self, processor: BoxProcessor) -> Self {
63        self.steps_mut().push(BuilderStep::Processor(processor));
64        self
65    }
66
67    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
68        let svc = SetHeader::new(IdentityProcessor, key, value);
69        self.steps_mut()
70            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
71        self
72    }
73
74    fn map_body<F>(mut self, mapper: F) -> Self
75    where
76        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
77    {
78        let svc = MapBody::new(IdentityProcessor, mapper);
79        self.steps_mut()
80            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
81        self
82    }
83
84    fn set_body<B>(mut self, body: B) -> Self
85    where
86        B: Into<Body> + Clone + Send + Sync + 'static,
87    {
88        let body: Body = body.into();
89        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
90        self.steps_mut()
91            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
92        self
93    }
94
95    /// Apache Camel-compatible alias for [`set_body`](Self::set_body).
96    ///
97    /// Transforms the message body using the given value. Semantically identical
98    /// to `set_body` — provided for familiarity with Apache Camel route DSLs.
99    fn transform<B>(self, body: B) -> Self
100    where
101        B: Into<Body> + Clone + Send + Sync + 'static,
102    {
103        self.set_body(body)
104    }
105
106    fn set_body_fn<F>(mut self, expr: F) -> Self
107    where
108        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
109    {
110        let svc = SetBody::new(IdentityProcessor, expr);
111        self.steps_mut()
112            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
113        self
114    }
115
116    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
117    where
118        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
119    {
120        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
121        self.steps_mut()
122            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
123        self
124    }
125
126    fn aggregate(mut self, config: AggregatorConfig) -> Self {
127        self.steps_mut().push(BuilderStep::Aggregate { config });
128        self
129    }
130
131    /// Stop processing this exchange immediately. No further steps in the
132    /// current pipeline will run.
133    ///
134    /// Can be used at any point in the route: directly on RouteBuilder,
135    /// inside `.filter()`, inside `.split()`, etc.
136    fn stop(mut self) -> Self {
137        self.steps_mut().push(BuilderStep::Stop);
138        self
139    }
140
141    fn delay(mut self, duration: std::time::Duration) -> Self {
142        self.steps_mut().push(BuilderStep::Delay {
143            config: DelayConfig::from_duration(duration),
144        });
145        self
146    }
147
148    fn delay_with_header(
149        mut self,
150        duration: std::time::Duration,
151        header: impl Into<String>,
152    ) -> Self {
153        self.steps_mut().push(BuilderStep::Delay {
154            config: DelayConfig::from_duration_with_header(duration, header),
155        });
156        self
157    }
158
159    /// Log a message at the specified level.
160    ///
161    /// The message will be logged when an exchange passes through this step.
162    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
163        self.steps_mut().push(BuilderStep::Log {
164            level,
165            message: message.into(),
166        });
167        self
168    }
169
170    /// Convert the message body to the target type.
171    ///
172    /// Supported: Text ↔ Json ↔ Bytes. `Body::Stream` always fails.
173    /// Returns `TypeConversionFailed` if conversion is not possible.
174    ///
175    /// # Example
176    /// ```ignore
177    /// route.set_body(Value::String(r#"{"x":1}"#.into()))
178    ///      .convert_body_to(BodyType::Json)
179    ///      .to("direct:next")
180    /// ```
181    fn convert_body_to(mut self, target: BodyType) -> Self {
182        let svc = ConvertBodyTo::new(IdentityProcessor, target);
183        self.steps_mut()
184            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
185        self
186    }
187
188    fn stream_cache(mut self, threshold: usize) -> Self {
189        let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
190        let svc = StreamCacheService::new(IdentityProcessor, config);
191        self.steps_mut()
192            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
193        self
194    }
195
196    /// Materialize `Body::Stream` into `Body::Bytes` using the default threshold (128 KB).
197    ///
198    /// Equivalent to `.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)`.
199    fn stream_cache_default(self) -> Self {
200        self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
201    }
202
203    /// Marshal the message body using the specified data format.
204    ///
205    /// Supported formats: `"json"`, `"xml"`. Panics if the format name is unknown.
206    /// Converts a structured body (e.g., `Body::Json`) to a wire-format body (e.g., `Body::Text`).
207    ///
208    /// # Example
209    /// ```ignore
210    /// route.marshal("json").to("direct:next")
211    /// ```
212    fn marshal(mut self, format: impl Into<String>) -> Self {
213        let name = format.into();
214        let df =
215            builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
216        let svc = MarshalService::new(IdentityProcessor, df);
217        self.steps_mut()
218            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
219        self
220    }
221
222    /// Unmarshal the message body using the specified data format.
223    ///
224    /// Supported formats: `"json"`, `"xml"`. Panics if the format name is unknown.
225    /// Converts a wire-format body (e.g., `Body::Text`) to a structured body (e.g., `Body::Json`).
226    ///
227    /// # Example
228    /// ```ignore
229    /// route.unmarshal("json").to("direct:next")
230    /// ```
231    fn unmarshal(mut self, format: impl Into<String>) -> Self {
232        let name = format.into();
233        let df =
234            builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
235        let svc = UnmarshalService::new(IdentityProcessor, df);
236        self.steps_mut()
237            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
238        self
239    }
240
241    /// Validate the exchange body against a schema file.
242    ///
243    /// Shorthand for `to("validator:path")`. Supports XSD, JSON Schema, and YAML.
244    ///
245    /// # Example
246    /// ```ignore
247    /// route.validate("schemas/order.xsd").to("direct:out")
248    /// ```
249    fn validate(mut self, schema_path: impl Into<String>) -> Self {
250        let path = schema_path.into();
251        let uri = if path.starts_with("validator:") {
252            path
253        } else {
254            format!("validator:{path}")
255        };
256        self.steps_mut().push(BuilderStep::To(uri));
257        self
258    }
259
260    /// Execute a script that can modify the exchange (headers, properties, body).
261    ///
262    /// The script has access to `headers`, `properties`, and `body` variables
263    /// and can modify them with assignment syntax: `headers["k"] = v`.
264    ///
265    /// # Example
266    /// ```ignore
267    /// // ignore: requires full CamelContext setup with registered language
268    /// route.script("rhai", r#"headers["tenant"] = "acme"; body = body + "_processed""#)
269    /// ```
270    fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
271        self.steps_mut().push(BuilderStep::Script {
272            language: language.into(),
273            script: script.into(),
274        });
275        self
276    }
277
278    fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
279        self.steps_mut().push(BuilderStep::Bean {
280            name: name.into(),
281            method: method.into(),
282        });
283        self
284    }
285}
286
287/// A fluent builder for constructing routes.
288///
289/// # Example
290///
291/// ```ignore
292/// let definition = RouteBuilder::from("timer:tick?period=1000")
293///     .set_header("source", Value::String("timer".into()))
294///     .filter(|ex| ex.input.body.as_text().is_some())
295///     .to("log:info?showHeaders=true")
296///     .build()?;
297/// ```
298pub struct RouteBuilder {
299    from_uri: String,
300    steps: Vec<BuilderStep>,
301    error_handler: Option<ErrorHandlerConfig>,
302    error_handler_mode: ErrorHandlerMode,
303    circuit_breaker_config: Option<CircuitBreakerConfig>,
304    concurrency: Option<ConcurrencyModel>,
305    route_id: Option<String>,
306    auto_startup: Option<bool>,
307    startup_order: Option<i32>,
308}
309
310#[derive(Default)]
311enum ErrorHandlerMode {
312    #[default]
313    None,
314    ExplicitConfig,
315    Shorthand {
316        dlc_uri: Option<String>,
317        specs: Vec<OnExceptionSpec>,
318    },
319    Mixed,
320}
321
322#[derive(Clone)]
323struct OnExceptionSpec {
324    matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
325    retry: Option<RedeliveryPolicy>,
326    handled_by: Option<String>,
327}
328
329impl RouteBuilder {
330    /// Start building a route from the given source endpoint URI.
331    pub fn from(endpoint: &str) -> Self {
332        Self {
333            from_uri: endpoint.to_string(),
334            steps: Vec::new(),
335            error_handler: None,
336            error_handler_mode: ErrorHandlerMode::None,
337            circuit_breaker_config: None,
338            concurrency: None,
339            route_id: None,
340            auto_startup: None,
341            startup_order: None,
342        }
343    }
344
345    /// Open a filter scope. Only exchanges matching `predicate` will be processed
346    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
347    /// and continue to steps after `.end_filter()`.
348    pub fn filter<F>(self, predicate: F) -> FilterBuilder
349    where
350        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
351    {
352        FilterBuilder {
353            parent: self,
354            predicate: std::sync::Arc::new(predicate),
355            steps: vec![],
356        }
357    }
358
359    /// Open a choice scope for content-based routing.
360    ///
361    /// Within the choice, you can define multiple `.when()` clauses and an
362    /// optional `.otherwise()` clause. The first matching `when` predicate
363    /// determines which sub-pipeline executes.
364    pub fn choice(self) -> ChoiceBuilder {
365        ChoiceBuilder {
366            parent: self,
367            whens: vec![],
368            _otherwise: None,
369        }
370    }
371
372    /// Add a WireTap step that sends a clone of the exchange to the given
373    /// endpoint URI (fire-and-forget). The original exchange continues
374    /// downstream unchanged.
375    pub fn wire_tap(mut self, endpoint: &str) -> Self {
376        self.steps.push(BuilderStep::WireTap {
377            uri: endpoint.to_string(),
378        });
379        self
380    }
381
382    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
383    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
384        self.error_handler_mode = match self.error_handler_mode {
385            ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
386                ErrorHandlerMode::ExplicitConfig
387            }
388            ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
389        };
390        self.error_handler = Some(config);
391        self
392    }
393
394    /// Set a dead letter channel URI for shorthand error handler mode.
395    pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
396        let uri = uri.into();
397        self.error_handler_mode = match self.error_handler_mode {
398            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
399                dlc_uri: Some(uri),
400                specs: Vec::new(),
401            },
402            ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
403                dlc_uri: Some(uri),
404                specs,
405            },
406            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
407        };
408        self
409    }
410
411    /// Add a shorthand exception policy scope. Call `.end_on_exception()` to return to route builder.
412    pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
413    where
414        F: Fn(&CamelError) -> bool + Send + Sync + 'static,
415    {
416        self.error_handler_mode = match self.error_handler_mode {
417            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
418                dlc_uri: None,
419                specs: Vec::new(),
420            },
421            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
422            shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
423        };
424
425        OnExceptionBuilder {
426            parent: self,
427            policy: OnExceptionSpec {
428                matches: std::sync::Arc::new(matches),
429                retry: None,
430                handled_by: None,
431            },
432        }
433    }
434
435    /// Set a circuit breaker for this route.
436    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
437        self.circuit_breaker_config = Some(config);
438        self
439    }
440
441    /// Override the consumer's default concurrency model.
442    ///
443    /// When set, the pipeline spawns a task per exchange, processing them
444    /// concurrently. `max` limits the number of simultaneously active
445    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
446    ///
447    /// # Example
448    /// ```ignore
449    /// RouteBuilder::from("http://0.0.0.0:8080/api")
450    ///     .concurrent(16)  // max 16 in-flight pipeline executions
451    ///     .process(handle_request)
452    ///     .build()
453    /// ```
454    pub fn concurrent(mut self, max: usize) -> Self {
455        let max = if max == 0 { None } else { Some(max) };
456        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
457        self
458    }
459
460    /// Force sequential processing, overriding a concurrent-capable consumer.
461    ///
462    /// Useful for HTTP routes that mutate shared state and need ordering
463    /// guarantees.
464    pub fn sequential(mut self) -> Self {
465        self.concurrency = Some(ConcurrencyModel::Sequential);
466        self
467    }
468
469    /// Set the route ID for this route.
470    ///
471    /// If not set, the route will be assigned an auto-generated ID.
472    pub fn route_id(mut self, id: impl Into<String>) -> Self {
473        self.route_id = Some(id.into());
474        self
475    }
476
477    /// Set whether this route should automatically start when the context starts.
478    ///
479    /// Default is `true`.
480    pub fn auto_startup(mut self, auto: bool) -> Self {
481        self.auto_startup = Some(auto);
482        self
483    }
484
485    /// Set the startup order for this route.
486    ///
487    /// Routes with lower values start first. Default is 1000.
488    pub fn startup_order(mut self, order: i32) -> Self {
489        self.startup_order = Some(order);
490        self
491    }
492
493    /// Begin a Splitter sub-pipeline. Steps added after this call (until
494    /// `.end_split()`) will be executed per-fragment.
495    ///
496    /// Returns a `SplitBuilder` — you cannot call `.build()` until
497    /// `.end_split()` closes the split scope (enforced by the type system).
498    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
499        SplitBuilder {
500            parent: self,
501            config,
502            steps: Vec::new(),
503        }
504    }
505
506    /// Begin a Multicast sub-pipeline. Steps added after this call (until
507    /// `.end_multicast()`) will each receive a copy of the exchange.
508    ///
509    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
510    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
511    pub fn multicast(self) -> MulticastBuilder {
512        MulticastBuilder {
513            parent: self,
514            steps: Vec::new(),
515            config: MulticastConfig::new(),
516        }
517    }
518
519    /// Begin a Throttle sub-pipeline. Rate limits message processing to at most
520    /// `max_requests` per `period`. Steps inside the throttle scope are only
521    /// executed when the rate limit allows.
522    ///
523    /// Returns a `ThrottleBuilder` — you cannot call `.build()` until
524    /// `.end_throttle()` closes the throttle scope (enforced by the type system).
525    pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
526        ThrottleBuilder {
527            parent: self,
528            config: ThrottlerConfig::new(max_requests, period),
529            steps: Vec::new(),
530        }
531    }
532
533    /// Begin a Loop sub-pipeline that iterates a fixed number of times.
534    pub fn loop_count(self, count: usize) -> LoopBuilder {
535        LoopBuilder {
536            parent: self,
537            config: LoopConfig {
538                mode: LoopMode::Count(count),
539            },
540            steps: vec![],
541        }
542    }
543
544    /// Begin a Loop sub-pipeline that iterates while a predicate is true.
545    pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
546    where
547        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
548    {
549        LoopBuilder {
550            parent: self,
551            config: LoopConfig {
552                mode: LoopMode::While(std::sync::Arc::new(predicate)),
553            },
554            steps: vec![],
555        }
556    }
557
558    /// Begin a LoadBalance sub-pipeline. Distributes exchanges across multiple
559    /// endpoints using a configurable strategy (round-robin, random, weighted, failover).
560    ///
561    /// Returns a `LoadBalancerBuilder` — you cannot call `.build()` until
562    /// `.end_load_balance()` closes the load balance scope (enforced by the type system).
563    pub fn load_balance(self) -> LoadBalancerBuilder {
564        LoadBalancerBuilder {
565            parent: self,
566            config: LoadBalancerConfig::round_robin(),
567            steps: Vec::new(),
568        }
569    }
570
571    /// Add a dynamic router step that routes exchanges dynamically based on
572    /// expression evaluation at runtime.
573    ///
574    /// The expression receives the exchange and returns `Some(uri)` to route to
575    /// the next endpoint, or `None` to stop routing.
576    ///
577    /// # Example
578    /// ```ignore
579    /// RouteBuilder::from("timer:tick")
580    ///     .route_id("test-route")
581    ///     .dynamic_router(|ex| {
582    ///         ex.input.header("dest").and_then(|v| v.as_str().map(|s| s.to_string()))
583    ///     })
584    ///     .build()
585    /// ```
586    pub fn dynamic_router(self, expression: RouterExpression) -> Self {
587        self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
588    }
589
590    /// Add a dynamic router step with full configuration.
591    ///
592    /// Allows customization of URI delimiter, cache size, timeout, and other options.
593    pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
594        self.steps.push(BuilderStep::DynamicRouter { config });
595        self
596    }
597
598    pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
599        self.routing_slip_with_config(RoutingSlipConfig::new(expression))
600    }
601
602    pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
603        self.steps.push(BuilderStep::RoutingSlip { config });
604        self
605    }
606
607    pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
608        self.recipient_list_with_config(RecipientListConfig::new(expression))
609    }
610
611    pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
612        self.steps.push(BuilderStep::RecipientList { config });
613        self
614    }
615
616    /// Consume the builder and produce a [`RouteDefinition`].
617    pub fn build(self) -> Result<RouteDefinition, CamelError> {
618        if self.from_uri.is_empty() {
619            return Err(CamelError::RouteError(
620                "route must have a 'from' URI".to_string(),
621            ));
622        }
623        let route_id = self
624            .route_id
625            .filter(|s| !s.trim().is_empty())
626            .ok_or_else(|| {
627                CamelError::RouteError(
628                    "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
629                        .to_string(),
630                )
631            })?;
632        let resolved_error_handler = match self.error_handler_mode {
633            ErrorHandlerMode::None => self.error_handler,
634            ErrorHandlerMode::ExplicitConfig => self.error_handler,
635            ErrorHandlerMode::Mixed => {
636                return Err(CamelError::RouteError(
637                    "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
638                ));
639            }
640            ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
641                let mut config = if let Some(uri) = dlc_uri {
642                    ErrorHandlerConfig::dead_letter_channel(uri)
643                } else {
644                    ErrorHandlerConfig::log_only()
645                };
646
647                for spec in specs {
648                    let matcher = spec.matches.clone();
649                    let mut builder = config.on_exception(move |e| matcher(e));
650
651                    if let Some(retry) = spec.retry {
652                        builder = builder.retry(retry.max_attempts).with_backoff(
653                            retry.initial_delay,
654                            retry.multiplier,
655                            retry.max_delay,
656                        );
657                        if retry.jitter_factor > 0.0 {
658                            builder = builder.with_jitter(retry.jitter_factor);
659                        }
660                    }
661
662                    if let Some(uri) = spec.handled_by {
663                        builder = builder.handled_by(uri);
664                    }
665
666                    config = builder.build();
667                }
668
669                Some(config)
670            }
671        };
672
673        let definition = RouteDefinition::new(self.from_uri, self.steps);
674        let definition = if let Some(eh) = resolved_error_handler {
675            definition.with_error_handler(eh)
676        } else {
677            definition
678        };
679        let definition = if let Some(cb) = self.circuit_breaker_config {
680            definition.with_circuit_breaker(cb)
681        } else {
682            definition
683        };
684        let definition = if let Some(concurrency) = self.concurrency {
685            definition.with_concurrency(concurrency)
686        } else {
687            definition
688        };
689        let definition = definition.with_route_id(route_id);
690        let definition = if let Some(auto) = self.auto_startup {
691            definition.with_auto_startup(auto)
692        } else {
693            definition
694        };
695        let definition = if let Some(order) = self.startup_order {
696            definition.with_startup_order(order)
697        } else {
698            definition
699        };
700        Ok(definition)
701    }
702
703    /// Compile this builder route into canonical v1 spec.
704    pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
705        if self.from_uri.is_empty() {
706            return Err(CamelError::RouteError(
707                "route must have a 'from' URI".to_string(),
708            ));
709        }
710        let route_id = self
711            .route_id
712            .filter(|s| !s.trim().is_empty())
713            .ok_or_else(|| {
714                CamelError::RouteError(
715                    "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
716                        .to_string(),
717                )
718            })?;
719
720        let steps = canonicalize_steps(self.steps)?;
721        let circuit_breaker = self
722            .circuit_breaker_config
723            .map(canonicalize_circuit_breaker);
724
725        let spec = CanonicalRouteSpec {
726            route_id,
727            from: self.from_uri,
728            steps,
729            circuit_breaker,
730            version: camel_api::CANONICAL_CONTRACT_VERSION,
731        };
732        spec.validate_contract()?;
733        Ok(spec)
734    }
735}
736
737pub struct OnExceptionBuilder {
738    parent: RouteBuilder,
739    policy: OnExceptionSpec,
740}
741
742impl OnExceptionBuilder {
743    pub fn retry(mut self, max_attempts: u32) -> Self {
744        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
745        self
746    }
747
748    pub fn with_backoff(
749        mut self,
750        initial: std::time::Duration,
751        multiplier: f64,
752        max: std::time::Duration,
753    ) -> Self {
754        if let Some(ref mut retry) = self.policy.retry {
755            retry.initial_delay = initial;
756            retry.multiplier = multiplier;
757            retry.max_delay = max;
758        }
759        self
760    }
761
762    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
763        if let Some(ref mut retry) = self.policy.retry {
764            retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
765        }
766        self
767    }
768
769    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
770        self.policy.handled_by = Some(uri.into());
771        self
772    }
773
774    pub fn end_on_exception(mut self) -> RouteBuilder {
775        if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
776            specs.push(self.policy);
777        }
778        self.parent
779    }
780}
781
782fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
783    let mut canonical = Vec::with_capacity(steps.len());
784    for step in steps {
785        canonical.push(canonicalize_step(step)?);
786    }
787    Ok(canonical)
788}
789
790fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
791    match step {
792        BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
793        BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
794        BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
795        BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
796        BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
797            delay_ms: config.delay_ms,
798            dynamic_header: config.dynamic_header,
799        }),
800        BuilderStep::DeclarativeScript { expression } => {
801            Ok(CanonicalStepSpec::Script { expression })
802        }
803        BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
804            predicate,
805            steps: canonicalize_steps(steps)?,
806        }),
807        BuilderStep::DeclarativeChoice { whens, otherwise } => {
808            let mut canonical_whens = Vec::with_capacity(whens.len());
809            for DeclarativeWhenStep { predicate, steps } in whens {
810                canonical_whens.push(CanonicalWhenSpec {
811                    predicate,
812                    steps: canonicalize_steps(steps)?,
813                });
814            }
815            let otherwise = match otherwise {
816                Some(steps) => Some(canonicalize_steps(steps)?),
817                None => None,
818            };
819            Ok(CanonicalStepSpec::Choice {
820                whens: canonical_whens,
821                otherwise,
822            })
823        }
824        BuilderStep::DeclarativeSplit {
825            expression,
826            aggregation,
827            parallel,
828            parallel_limit,
829            stop_on_exception,
830            steps,
831        } => Ok(CanonicalStepSpec::Split {
832            expression: CanonicalSplitExpressionSpec::Language(expression),
833            aggregation: canonicalize_split_aggregation(aggregation)?,
834            parallel,
835            parallel_limit,
836            stop_on_exception,
837            steps: canonicalize_steps(steps)?,
838        }),
839        BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate(
840            canonicalize_aggregate(config)?,
841        )),
842        other => {
843            let step_name = canonical_step_name(&other);
844            let detail = camel_api::canonical_contract_rejection_reason(step_name)
845                .unwrap_or("not included in canonical v1");
846            Err(CamelError::RouteError(format!(
847                "canonical v1 does not support step `{step_name}`: {detail}"
848            )))
849        }
850    }
851}
852
853fn canonicalize_split_aggregation(
854    strategy: camel_api::splitter::AggregationStrategy,
855) -> Result<CanonicalSplitAggregationSpec, CamelError> {
856    match strategy {
857        camel_api::splitter::AggregationStrategy::LastWins => {
858            Ok(CanonicalSplitAggregationSpec::LastWins)
859        }
860        camel_api::splitter::AggregationStrategy::CollectAll => {
861            Ok(CanonicalSplitAggregationSpec::CollectAll)
862        }
863        camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
864            "canonical v1 does not support custom split aggregation".to_string(),
865        )),
866        camel_api::splitter::AggregationStrategy::Original => {
867            Ok(CanonicalSplitAggregationSpec::Original)
868        }
869    }
870}
871
872fn extract_completion_fields(
873    mode: &CompletionMode,
874) -> Result<(Option<usize>, Option<u64>), CamelError> {
875    match mode {
876        CompletionMode::Single(cond) => match cond {
877            CompletionCondition::Size(n) => Ok((Some(*n), None)),
878            CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
879            CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
880                "canonical v1 does not support aggregate predicate completion".to_string(),
881            )),
882        },
883        CompletionMode::Any(conds) => {
884            let mut size = None;
885            let mut timeout_ms = None;
886            for cond in conds {
887                match cond {
888                    CompletionCondition::Size(n) => size = Some(*n),
889                    CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
890                    CompletionCondition::Predicate(_) => {
891                        return Err(CamelError::RouteError(
892                            "canonical v1 does not support aggregate predicate completion"
893                                .to_string(),
894                        ));
895                    }
896                }
897            }
898            Ok((size, timeout_ms))
899        }
900    }
901}
902
903fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
904    let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
905
906    let header = match &config.correlation {
907        CorrelationStrategy::HeaderName(h) => h.clone(),
908        CorrelationStrategy::Expression { expr, .. } => expr.clone(),
909        CorrelationStrategy::Fn(_) => {
910            return Err(CamelError::RouteError(
911                "canonical v1 does not support Fn correlation strategy".to_string(),
912            ));
913        }
914    };
915
916    let correlation_key = match &config.correlation {
917        CorrelationStrategy::HeaderName(_) => None,
918        CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
919        CorrelationStrategy::Fn(_) => unreachable!(),
920    };
921
922    let strategy = match config.strategy {
923        AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
924        AggregationStrategy::Custom(_) => {
925            return Err(CamelError::RouteError(
926                "canonical v1 does not support custom aggregate strategy".to_string(),
927            ));
928        }
929    };
930    let bucket_ttl_ms = config
931        .bucket_ttl
932        .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
933
934    Ok(CanonicalAggregateSpec {
935        header,
936        completion_size,
937        completion_timeout_ms,
938        correlation_key,
939        force_completion_on_stop: if config.force_completion_on_stop {
940            Some(true)
941        } else {
942            None
943        },
944        discard_on_timeout: if config.discard_on_timeout {
945            Some(true)
946        } else {
947            None
948        },
949        strategy,
950        max_buckets: config.max_buckets,
951        bucket_ttl_ms,
952    })
953}
954
955fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
956    CanonicalCircuitBreakerSpec {
957        failure_threshold: config.failure_threshold,
958        open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
959    }
960}
961
962fn canonical_step_name(step: &BuilderStep) -> &'static str {
963    match step {
964        BuilderStep::Processor(_) => "processor",
965        BuilderStep::To(_) => "to",
966        BuilderStep::Stop => "stop",
967        BuilderStep::Log { .. } => "log",
968        BuilderStep::DeclarativeSetHeader { .. } => "set_header",
969        BuilderStep::DeclarativeSetBody { .. } => "set_body",
970        BuilderStep::DeclarativeFilter { .. } => "filter",
971        BuilderStep::DeclarativeChoice { .. } => "choice",
972        BuilderStep::DeclarativeScript { .. } => "script",
973        BuilderStep::DeclarativeFunction { .. } => "function",
974        BuilderStep::DeclarativeSplit { .. } => "split",
975        BuilderStep::Split { .. } => "split",
976        BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
977        BuilderStep::Aggregate { .. } => "aggregate",
978        BuilderStep::Filter { .. } => "filter",
979        BuilderStep::Choice { .. } => "choice",
980        BuilderStep::WireTap { .. } => "wire_tap",
981        BuilderStep::Delay { .. } => "delay",
982        BuilderStep::Multicast { .. } => "multicast",
983        BuilderStep::DeclarativeLog { .. } => "log",
984        BuilderStep::Bean { .. } => "bean",
985        BuilderStep::Script { .. } => "script",
986        BuilderStep::Throttle { .. } => "throttle",
987        BuilderStep::LoadBalance { .. } => "load_balancer",
988        BuilderStep::DynamicRouter { .. } => "dynamic_router",
989        BuilderStep::RoutingSlip { .. } => "routing_slip",
990        BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
991        BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
992        BuilderStep::RecipientList { .. } => "recipient_list",
993        BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
994        BuilderStep::DeclarativeSetProperty { .. } => "set_property",
995    }
996}
997
998impl StepAccumulator for RouteBuilder {
999    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1000        &mut self.steps
1001    }
1002}
1003
1004/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
1005///
1006/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1007/// but NOT `.build()` and NOT `.split()` (no nested splits).
1008///
1009/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
1010/// and returns the parent `RouteBuilder`.
1011pub struct SplitBuilder {
1012    parent: RouteBuilder,
1013    config: SplitterConfig,
1014    steps: Vec<BuilderStep>,
1015}
1016
1017impl SplitBuilder {
1018    /// Open a filter scope within the split sub-pipeline.
1019    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1020    where
1021        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1022    {
1023        FilterInSplitBuilder {
1024            parent: self,
1025            predicate: std::sync::Arc::new(predicate),
1026            steps: vec![],
1027        }
1028    }
1029
1030    /// Close the split scope. Packages the accumulated sub-steps into a
1031    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
1032    pub fn end_split(mut self) -> RouteBuilder {
1033        let split_step = BuilderStep::Split {
1034            config: self.config,
1035            steps: self.steps,
1036        };
1037        self.parent.steps.push(split_step);
1038        self.parent
1039    }
1040}
1041
1042impl StepAccumulator for SplitBuilder {
1043    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1044        &mut self.steps
1045    }
1046}
1047
1048/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
1049pub struct FilterBuilder {
1050    parent: RouteBuilder,
1051    predicate: FilterPredicate,
1052    steps: Vec<BuilderStep>,
1053}
1054
1055impl FilterBuilder {
1056    /// Close the filter scope. Packages the accumulated sub-steps into a
1057    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
1058    pub fn end_filter(mut self) -> RouteBuilder {
1059        let step = BuilderStep::Filter {
1060            predicate: self.predicate,
1061            steps: self.steps,
1062        };
1063        self.parent.steps.push(step);
1064        self.parent
1065    }
1066}
1067
1068impl StepAccumulator for FilterBuilder {
1069    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1070        &mut self.steps
1071    }
1072}
1073
1074/// Builder for a filter scope nested inside a `.split()` block.
1075pub struct FilterInSplitBuilder {
1076    parent: SplitBuilder,
1077    predicate: FilterPredicate,
1078    steps: Vec<BuilderStep>,
1079}
1080
1081impl FilterInSplitBuilder {
1082    /// Close the filter scope and return the parent `SplitBuilder`.
1083    pub fn end_filter(mut self) -> SplitBuilder {
1084        let step = BuilderStep::Filter {
1085            predicate: self.predicate,
1086            steps: self.steps,
1087        };
1088        self.parent.steps.push(step);
1089        self.parent
1090    }
1091}
1092
1093impl StepAccumulator for FilterInSplitBuilder {
1094    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1095        &mut self.steps
1096    }
1097}
1098
1099// ── Choice/When/Otherwise builders ─────────────────────────────────────────
1100
1101/// Builder for a `.choice()` ... `.end_choice()` block.
1102///
1103/// Accumulates `when` clauses and an optional `otherwise` clause.
1104/// Cannot call `.build()` until `.end_choice()` is called.
1105pub struct ChoiceBuilder {
1106    parent: RouteBuilder,
1107    whens: Vec<WhenStep>,
1108    _otherwise: Option<Vec<BuilderStep>>,
1109}
1110
1111impl ChoiceBuilder {
1112    /// Open a `when` clause. Only exchanges matching `predicate` will be
1113    /// processed by the steps inside the `.when()` ... `.end_when()` scope.
1114    pub fn when<F>(self, predicate: F) -> WhenBuilder
1115    where
1116        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1117    {
1118        WhenBuilder {
1119            parent: self,
1120            predicate: std::sync::Arc::new(predicate),
1121            steps: vec![],
1122        }
1123    }
1124
1125    /// Open an `otherwise` clause. Executed when no `when` predicate matched.
1126    ///
1127    /// Only one `otherwise` is allowed per `choice`. Call this after all `.when()` clauses.
1128    pub fn otherwise(self) -> OtherwiseBuilder {
1129        OtherwiseBuilder {
1130            parent: self,
1131            steps: vec![],
1132        }
1133    }
1134
1135    /// Close the choice scope. Packages all accumulated `when` clauses and
1136    /// optional `otherwise` into a `BuilderStep::Choice` and returns the
1137    /// parent `RouteBuilder`.
1138    pub fn end_choice(mut self) -> RouteBuilder {
1139        let step = BuilderStep::Choice {
1140            whens: self.whens,
1141            otherwise: self._otherwise,
1142        };
1143        self.parent.steps.push(step);
1144        self.parent
1145    }
1146}
1147
1148/// Builder for the sub-pipeline within a `.when()` ... `.end_when()` block.
1149pub struct WhenBuilder {
1150    parent: ChoiceBuilder,
1151    predicate: camel_api::FilterPredicate,
1152    steps: Vec<BuilderStep>,
1153}
1154
1155impl WhenBuilder {
1156    /// Close the when scope. Packages the accumulated sub-steps into a
1157    /// `WhenStep` and returns the parent `ChoiceBuilder`.
1158    pub fn end_when(mut self) -> ChoiceBuilder {
1159        self.parent.whens.push(WhenStep {
1160            predicate: self.predicate,
1161            steps: self.steps,
1162        });
1163        self.parent
1164    }
1165}
1166
1167impl StepAccumulator for WhenBuilder {
1168    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1169        &mut self.steps
1170    }
1171}
1172
1173/// Builder for the sub-pipeline within an `.otherwise()` ... `.end_otherwise()` block.
1174pub struct OtherwiseBuilder {
1175    parent: ChoiceBuilder,
1176    steps: Vec<BuilderStep>,
1177}
1178
1179impl OtherwiseBuilder {
1180    /// Close the otherwise scope and return the parent `ChoiceBuilder`.
1181    pub fn end_otherwise(self) -> ChoiceBuilder {
1182        let OtherwiseBuilder { mut parent, steps } = self;
1183        parent._otherwise = Some(steps);
1184        parent
1185    }
1186}
1187
1188impl StepAccumulator for OtherwiseBuilder {
1189    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1190        &mut self.steps
1191    }
1192}
1193
1194/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
1195///
1196/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1197/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
1198///
1199/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
1200/// and returns the parent `RouteBuilder`.
1201pub struct MulticastBuilder {
1202    parent: RouteBuilder,
1203    steps: Vec<BuilderStep>,
1204    config: MulticastConfig,
1205}
1206
1207impl MulticastBuilder {
1208    pub fn parallel(mut self, parallel: bool) -> Self {
1209        self.config = self.config.parallel(parallel);
1210        self
1211    }
1212
1213    pub fn parallel_limit(mut self, limit: usize) -> Self {
1214        self.config = self.config.parallel_limit(limit);
1215        self
1216    }
1217
1218    pub fn stop_on_exception(mut self, stop: bool) -> Self {
1219        self.config = self.config.stop_on_exception(stop);
1220        self
1221    }
1222
1223    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1224        self.config = self.config.timeout(duration);
1225        self
1226    }
1227
1228    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1229        self.config = self.config.aggregation(strategy);
1230        self
1231    }
1232
1233    pub fn end_multicast(mut self) -> RouteBuilder {
1234        let step = BuilderStep::Multicast {
1235            steps: self.steps,
1236            config: self.config,
1237        };
1238        self.parent.steps.push(step);
1239        self.parent
1240    }
1241}
1242
1243impl StepAccumulator for MulticastBuilder {
1244    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1245        &mut self.steps
1246    }
1247}
1248
1249/// Builder for the sub-pipeline within a `.throttle()` ... `.end_throttle()` block.
1250///
1251/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1252/// but NOT `.build()` and NOT `.throttle()` (no nested throttles).
1253///
1254/// Calling `.end_throttle()` packages the sub-steps into a `BuilderStep::Throttle`
1255/// and returns the parent `RouteBuilder`.
1256pub struct ThrottleBuilder {
1257    parent: RouteBuilder,
1258    config: ThrottlerConfig,
1259    steps: Vec<BuilderStep>,
1260}
1261
1262impl ThrottleBuilder {
1263    /// Set the throttle strategy. Default is `Delay`.
1264    ///
1265    /// - `Delay`: Queue messages until capacity available
1266    /// - `Reject`: Return error immediately when throttled
1267    /// - `Drop`: Silently discard excess messages
1268    pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1269        self.config = self.config.strategy(strategy);
1270        self
1271    }
1272
1273    /// Close the throttle scope. Packages the accumulated sub-steps into a
1274    /// `BuilderStep::Throttle` and returns the parent `RouteBuilder`.
1275    pub fn end_throttle(mut self) -> RouteBuilder {
1276        let step = BuilderStep::Throttle {
1277            config: self.config,
1278            steps: self.steps,
1279        };
1280        self.parent.steps.push(step);
1281        self.parent
1282    }
1283}
1284
1285impl StepAccumulator for ThrottleBuilder {
1286    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1287        &mut self.steps
1288    }
1289}
1290
1291/// Builder for the sub-pipeline within a `.loop_count()` / `.loop_while()` ... `.end_loop()` block.
1292pub struct LoopBuilder {
1293    parent: RouteBuilder,
1294    config: LoopConfig,
1295    steps: Vec<BuilderStep>,
1296}
1297
1298impl LoopBuilder {
1299    pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1300        LoopInLoopBuilder {
1301            parent: self,
1302            config: LoopConfig {
1303                mode: LoopMode::Count(count),
1304            },
1305            steps: vec![],
1306        }
1307    }
1308
1309    pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1310    where
1311        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1312    {
1313        LoopInLoopBuilder {
1314            parent: self,
1315            config: LoopConfig {
1316                mode: LoopMode::While(std::sync::Arc::new(predicate)),
1317            },
1318            steps: vec![],
1319        }
1320    }
1321
1322    pub fn end_loop(mut self) -> RouteBuilder {
1323        let step = BuilderStep::Loop {
1324            config: self.config,
1325            steps: self.steps,
1326        };
1327        self.parent.steps.push(step);
1328        self.parent
1329    }
1330}
1331
1332impl StepAccumulator for LoopBuilder {
1333    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1334        &mut self.steps
1335    }
1336}
1337
1338pub struct LoopInLoopBuilder {
1339    parent: LoopBuilder,
1340    config: LoopConfig,
1341    steps: Vec<BuilderStep>,
1342}
1343
1344impl LoopInLoopBuilder {
1345    pub fn end_loop(mut self) -> LoopBuilder {
1346        let step = BuilderStep::Loop {
1347            config: self.config,
1348            steps: self.steps,
1349        };
1350        self.parent.steps.push(step);
1351        self.parent
1352    }
1353}
1354
1355impl StepAccumulator for LoopInLoopBuilder {
1356    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1357        &mut self.steps
1358    }
1359}
1360
1361/// Builder for the sub-pipeline within a `.load_balance()` ... `.end_load_balance()` block.
1362///
1363/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1364/// but NOT `.build()` and NOT `.load_balance()` (no nested load balancers).
1365///
1366/// Calling `.end_load_balance()` packages the sub-steps into a `BuilderStep::LoadBalance`
1367/// and returns the parent `RouteBuilder`.
1368pub struct LoadBalancerBuilder {
1369    parent: RouteBuilder,
1370    config: LoadBalancerConfig,
1371    steps: Vec<BuilderStep>,
1372}
1373
1374impl LoadBalancerBuilder {
1375    /// Set the load balance strategy to round-robin (default).
1376    pub fn round_robin(mut self) -> Self {
1377        self.config = LoadBalancerConfig::round_robin();
1378        self
1379    }
1380
1381    /// Set the load balance strategy to random selection.
1382    pub fn random(mut self) -> Self {
1383        self.config = LoadBalancerConfig::random();
1384        self
1385    }
1386
1387    /// Set the load balance strategy to weighted selection.
1388    ///
1389    /// Each endpoint is assigned a weight that determines its probability
1390    /// of being selected.
1391    pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1392        self.config = LoadBalancerConfig::weighted(weights);
1393        self
1394    }
1395
1396    /// Set the load balance strategy to failover.
1397    ///
1398    /// Exchanges are sent to the first endpoint; on failure, the next endpoint
1399    /// is tried.
1400    pub fn failover(mut self) -> Self {
1401        self.config = LoadBalancerConfig::failover();
1402        self
1403    }
1404
1405    /// Enable or disable parallel execution of endpoints.
1406    ///
1407    /// When enabled, all endpoints receive the exchange simultaneously.
1408    /// When disabled (default), only one endpoint is selected per exchange.
1409    pub fn parallel(mut self, parallel: bool) -> Self {
1410        self.config = self.config.parallel(parallel);
1411        self
1412    }
1413
1414    /// Close the load balance scope. Packages the accumulated sub-steps into a
1415    /// `BuilderStep::LoadBalance` and returns the parent `RouteBuilder`.
1416    pub fn end_load_balance(mut self) -> RouteBuilder {
1417        let step = BuilderStep::LoadBalance {
1418            config: self.config,
1419            steps: self.steps,
1420        };
1421        self.parent.steps.push(step);
1422        self.parent
1423    }
1424}
1425
1426impl StepAccumulator for LoadBalancerBuilder {
1427    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1428        &mut self.steps
1429    }
1430}
1431
1432// ---------------------------------------------------------------------------
1433// Tests
1434// ---------------------------------------------------------------------------
1435
1436#[cfg(test)]
1437mod tests {
1438    use super::*;
1439    use camel_api::error_handler::ErrorHandlerConfig;
1440    use camel_api::load_balancer::LoadBalanceStrategy;
1441    use camel_api::{Exchange, Message};
1442    use camel_core::route::BuilderStep;
1443    use std::sync::Arc;
1444    use std::time::Duration;
1445    use tower::{Service, ServiceExt};
1446
1447    #[test]
1448    fn test_builder_from_creates_definition() {
1449        let definition = RouteBuilder::from("timer:tick")
1450            .route_id("test-route")
1451            .build()
1452            .unwrap();
1453        assert_eq!(definition.from_uri(), "timer:tick");
1454    }
1455
1456    #[test]
1457    fn test_builder_empty_from_uri_errors() {
1458        let result = RouteBuilder::from("").route_id("test-route").build();
1459        assert!(result.is_err());
1460    }
1461
1462    #[test]
1463    fn test_builder_to_adds_step() {
1464        let definition = RouteBuilder::from("timer:tick")
1465            .route_id("test-route")
1466            .to("log:info")
1467            .build()
1468            .unwrap();
1469
1470        assert_eq!(definition.from_uri(), "timer:tick");
1471        // We can verify steps were added by checking the structure
1472        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1473    }
1474
1475    #[test]
1476    fn test_builder_filter_adds_filter_step() {
1477        let definition = RouteBuilder::from("timer:tick")
1478            .route_id("test-route")
1479            .filter(|_ex| true)
1480            .to("mock:result")
1481            .end_filter()
1482            .build()
1483            .unwrap();
1484
1485        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1486    }
1487
1488    #[test]
1489    fn test_builder_set_header_adds_processor_step() {
1490        let definition = RouteBuilder::from("timer:tick")
1491            .route_id("test-route")
1492            .set_header("key", Value::String("value".into()))
1493            .build()
1494            .unwrap();
1495
1496        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1497    }
1498
1499    #[test]
1500    fn test_builder_map_body_adds_processor_step() {
1501        let definition = RouteBuilder::from("timer:tick")
1502            .route_id("test-route")
1503            .map_body(|body| body)
1504            .build()
1505            .unwrap();
1506
1507        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1508    }
1509
1510    #[test]
1511    fn test_builder_process_adds_processor_step() {
1512        let definition = RouteBuilder::from("timer:tick")
1513            .route_id("test-route")
1514            .process(|ex| async move { Ok(ex) })
1515            .build()
1516            .unwrap();
1517
1518        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1519    }
1520
1521    #[test]
1522    fn test_builder_chain_multiple_steps() {
1523        let definition = RouteBuilder::from("timer:tick")
1524            .route_id("test-route")
1525            .set_header("source", Value::String("timer".into()))
1526            .filter(|ex| ex.input.header("source").is_some())
1527            .to("log:info")
1528            .end_filter()
1529            .to("mock:result")
1530            .build()
1531            .unwrap();
1532
1533        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
1534        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
1535        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
1536        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1537    }
1538
1539    #[test]
1540    fn test_loop_count_builder() {
1541        use camel_api::loop_eip::LoopMode;
1542
1543        let def = RouteBuilder::from("direct:start")
1544            .route_id("loop-test")
1545            .loop_count(3)
1546            .to("mock:inside")
1547            .end_loop()
1548            .to("mock:after")
1549            .build()
1550            .unwrap();
1551
1552        assert_eq!(def.steps().len(), 2);
1553        match &def.steps()[0] {
1554            BuilderStep::Loop { config, steps } => {
1555                assert!(matches!(config.mode, LoopMode::Count(3)));
1556                assert_eq!(steps.len(), 1);
1557            }
1558            other => panic!("Expected Loop, got {:?}", other),
1559        }
1560        assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1561    }
1562
1563    #[test]
1564    fn test_loop_while_builder() {
1565        use camel_api::loop_eip::LoopMode;
1566
1567        let def = RouteBuilder::from("direct:start")
1568            .route_id("loop-while-test")
1569            .loop_while(|_ex| true)
1570            .to("mock:retry")
1571            .end_loop()
1572            .build()
1573            .unwrap();
1574
1575        assert_eq!(def.steps().len(), 1);
1576        match &def.steps()[0] {
1577            BuilderStep::Loop { config, steps } => {
1578                assert!(matches!(config.mode, LoopMode::While(_)));
1579                assert_eq!(steps.len(), 1);
1580            }
1581            other => panic!("Expected Loop, got {:?}", other),
1582        }
1583    }
1584
1585    #[test]
1586    fn test_nested_loop_builder() {
1587        use camel_api::loop_eip::LoopMode;
1588
1589        let def = RouteBuilder::from("direct:start")
1590            .route_id("nested-loop-test")
1591            .loop_count(2)
1592            .to("mock:outer")
1593            .loop_count(3)
1594            .to("mock:inner")
1595            .end_loop()
1596            .end_loop()
1597            .to("mock:after")
1598            .build()
1599            .unwrap();
1600
1601        assert_eq!(def.steps().len(), 2);
1602        match &def.steps()[0] {
1603            BuilderStep::Loop { steps, .. } => {
1604                assert_eq!(steps.len(), 2);
1605                match &steps[1] {
1606                    BuilderStep::Loop {
1607                        config,
1608                        steps: inner_steps,
1609                    } => {
1610                        assert!(matches!(config.mode, LoopMode::Count(3)));
1611                        assert_eq!(inner_steps.len(), 1);
1612                    }
1613                    other => panic!("Expected nested Loop, got {:?}", other),
1614                }
1615            }
1616            other => panic!("Expected outer Loop, got {:?}", other),
1617        }
1618    }
1619
1620    // -----------------------------------------------------------------------
1621    // Processor behavior tests — exercise the real Tower services directly
1622    // -----------------------------------------------------------------------
1623
1624    #[tokio::test]
1625    async fn test_set_header_processor_works() {
1626        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1627        let exchange = Exchange::new(Message::new("test"));
1628        let result = svc.call(exchange).await.unwrap();
1629        assert_eq!(
1630            result.input.header("greeting"),
1631            Some(&Value::String("hello".into()))
1632        );
1633    }
1634
1635    #[tokio::test]
1636    async fn test_filter_processor_passes() {
1637        use camel_api::BoxProcessorExt;
1638        use camel_processor::FilterService;
1639
1640        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1641        let mut svc =
1642            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1643        let exchange = Exchange::new(Message::new("pass"));
1644        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1645        assert_eq!(result.input.body.as_text(), Some("pass"));
1646    }
1647
1648    #[tokio::test]
1649    async fn test_filter_processor_blocks() {
1650        use camel_api::BoxProcessorExt;
1651        use camel_processor::FilterService;
1652
1653        let sub = BoxProcessor::from_fn(|_ex| {
1654            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1655        });
1656        let mut svc =
1657            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1658        let exchange = Exchange::new(Message::new("reject"));
1659        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1660        assert_eq!(result.input.body.as_text(), Some("reject"));
1661    }
1662
1663    #[tokio::test]
1664    async fn test_map_body_processor_works() {
1665        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1666            if let Some(text) = body.as_text() {
1667                Body::Text(text.to_uppercase())
1668            } else {
1669                body
1670            }
1671        });
1672        let exchange = Exchange::new(Message::new("hello"));
1673        let result = mapper.oneshot(exchange).await.unwrap();
1674        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1675    }
1676
1677    #[tokio::test]
1678    async fn test_process_custom_processor_works() {
1679        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1680            ex.set_property("custom", Value::Bool(true));
1681            Ok(ex)
1682        });
1683        let exchange = Exchange::new(Message::default());
1684        let result = processor.oneshot(exchange).await.unwrap();
1685        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1686    }
1687
1688    // -----------------------------------------------------------------------
1689    // Sequential pipeline test
1690    // -----------------------------------------------------------------------
1691
1692    #[tokio::test]
1693    async fn test_compose_pipeline_runs_steps_in_order() {
1694        use camel_core::route::compose_pipeline;
1695
1696        let processors = vec![
1697            BoxProcessor::new(SetHeader::new(
1698                IdentityProcessor,
1699                "step",
1700                Value::String("one".into()),
1701            )),
1702            BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1703                if let Some(text) = body.as_text() {
1704                    Body::Text(format!("{}-processed", text))
1705                } else {
1706                    body
1707                }
1708            })),
1709        ];
1710
1711        let pipeline = compose_pipeline(processors);
1712        let exchange = Exchange::new(Message::new("hello"));
1713        let result = pipeline.oneshot(exchange).await.unwrap();
1714
1715        assert_eq!(
1716            result.input.header("step"),
1717            Some(&Value::String("one".into()))
1718        );
1719        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1720    }
1721
1722    #[tokio::test]
1723    async fn test_compose_pipeline_empty_is_identity() {
1724        use camel_core::route::compose_pipeline;
1725
1726        let pipeline = compose_pipeline(vec![]);
1727        let exchange = Exchange::new(Message::new("unchanged"));
1728        let result = pipeline.oneshot(exchange).await.unwrap();
1729        assert_eq!(result.input.body.as_text(), Some("unchanged"));
1730    }
1731
1732    // -----------------------------------------------------------------------
1733    // Circuit breaker builder tests
1734    // -----------------------------------------------------------------------
1735
1736    #[test]
1737    fn test_builder_circuit_breaker_sets_config() {
1738        use camel_api::circuit_breaker::CircuitBreakerConfig;
1739
1740        let config = CircuitBreakerConfig::new().failure_threshold(5);
1741        let definition = RouteBuilder::from("timer:tick")
1742            .route_id("test-route")
1743            .circuit_breaker(config)
1744            .build()
1745            .unwrap();
1746
1747        let cb = definition
1748            .circuit_breaker_config()
1749            .expect("circuit breaker should be set");
1750        assert_eq!(cb.failure_threshold, 5);
1751    }
1752
1753    #[test]
1754    fn test_builder_circuit_breaker_with_error_handler() {
1755        use camel_api::circuit_breaker::CircuitBreakerConfig;
1756        use camel_api::error_handler::ErrorHandlerConfig;
1757
1758        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1759        let eh_config = ErrorHandlerConfig::log_only();
1760
1761        let definition = RouteBuilder::from("timer:tick")
1762            .route_id("test-route")
1763            .to("log:info")
1764            .circuit_breaker(cb_config)
1765            .error_handler(eh_config)
1766            .build()
1767            .unwrap();
1768
1769        assert!(
1770            definition.circuit_breaker_config().is_some(),
1771            "circuit breaker config should be set"
1772        );
1773        // Route definition was built successfully with both configs.
1774    }
1775
1776    #[test]
1777    fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1778        let definition = RouteBuilder::from("direct:start")
1779            .route_id("test-route")
1780            .dead_letter_channel("log:dlc")
1781            .on_exception(|e| matches!(e, CamelError::Io(_)))
1782            .retry(3)
1783            .handled_by("log:io")
1784            .end_on_exception()
1785            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1786            .retry(1)
1787            .end_on_exception()
1788            .to("mock:out")
1789            .build()
1790            .expect("route should build");
1791
1792        let cfg = definition
1793            .error_handler_config()
1794            .expect("error handler should be set");
1795        assert_eq!(cfg.policies.len(), 2);
1796        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1797        assert_eq!(
1798            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1799            Some(3)
1800        );
1801        assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1802        assert_eq!(
1803            cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1804            Some(1)
1805        );
1806    }
1807
1808    #[test]
1809    fn test_builder_on_exception_mixed_mode_rejected() {
1810        let result = RouteBuilder::from("direct:start")
1811            .route_id("test-route")
1812            .error_handler(ErrorHandlerConfig::log_only())
1813            .on_exception(|_e| true)
1814            .end_on_exception()
1815            .to("mock:out")
1816            .build();
1817
1818        let err = result.err().expect("mixed mode should fail with an error");
1819
1820        assert!(
1821            format!("{err}").contains("mixed error handler modes"),
1822            "unexpected error: {err}"
1823        );
1824    }
1825
1826    #[test]
1827    fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1828        let definition = RouteBuilder::from("direct:start")
1829            .route_id("test-route")
1830            .on_exception(|_e| true)
1831            .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1832            .with_jitter(0.5)
1833            .end_on_exception()
1834            .to("mock:out")
1835            .build()
1836            .expect("route should build");
1837
1838        let cfg = definition
1839            .error_handler_config()
1840            .expect("error handler should be set");
1841        assert_eq!(cfg.policies.len(), 1);
1842        assert!(cfg.policies[0].retry.is_none());
1843    }
1844
1845    #[test]
1846    fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1847        let definition = RouteBuilder::from("direct:start")
1848            .route_id("test-route")
1849            .dead_letter_channel("log:dlc")
1850            .to("mock:out")
1851            .build()
1852            .expect("route should build");
1853
1854        let cfg = definition
1855            .error_handler_config()
1856            .expect("error handler should be set");
1857        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1858        assert!(cfg.policies.is_empty());
1859    }
1860
1861    #[test]
1862    fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1863        let definition = RouteBuilder::from("direct:start")
1864            .route_id("test-route")
1865            .dead_letter_channel("log:first")
1866            .on_exception(|e| matches!(e, CamelError::Io(_)))
1867            .retry(2)
1868            .end_on_exception()
1869            .dead_letter_channel("log:second")
1870            .to("mock:out")
1871            .build()
1872            .expect("route should build");
1873
1874        let cfg = definition
1875            .error_handler_config()
1876            .expect("error handler should be set");
1877        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1878        assert_eq!(cfg.policies.len(), 1);
1879        assert_eq!(
1880            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1881            Some(2)
1882        );
1883    }
1884
1885    #[test]
1886    fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
1887        let definition = RouteBuilder::from("direct:start")
1888            .route_id("test-route")
1889            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1890            .retry(1)
1891            .end_on_exception()
1892            .to("mock:out")
1893            .build()
1894            .expect("route should build");
1895
1896        let cfg = definition
1897            .error_handler_config()
1898            .expect("error handler should be set");
1899        assert!(cfg.dlc_uri.is_none());
1900        assert_eq!(cfg.policies.len(), 1);
1901    }
1902
1903    #[test]
1904    fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
1905        let first = ErrorHandlerConfig::dead_letter_channel("log:first");
1906        let second = ErrorHandlerConfig::dead_letter_channel("log:second");
1907
1908        let definition = RouteBuilder::from("direct:start")
1909            .route_id("test-route")
1910            .error_handler(first)
1911            .error_handler(second)
1912            .to("mock:out")
1913            .build()
1914            .expect("route should build");
1915
1916        let cfg = definition
1917            .error_handler_config()
1918            .expect("error handler should be set");
1919        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1920    }
1921
1922    // --- Splitter builder tests ---
1923
1924    #[test]
1925    fn test_split_builder_typestate() {
1926        use camel_api::splitter::{SplitterConfig, split_body_lines};
1927
1928        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
1929        let definition = RouteBuilder::from("timer:test?period=1000")
1930            .route_id("test-route")
1931            .split(SplitterConfig::new(split_body_lines()))
1932            .to("mock:per-fragment")
1933            .end_split()
1934            .to("mock:final")
1935            .build()
1936            .unwrap();
1937
1938        // Should have 2 top-level steps: Split + To("mock:final")
1939        assert_eq!(definition.steps().len(), 2);
1940    }
1941
1942    #[test]
1943    fn test_split_builder_steps_collected() {
1944        use camel_api::splitter::{SplitterConfig, split_body_lines};
1945
1946        let definition = RouteBuilder::from("timer:test?period=1000")
1947            .route_id("test-route")
1948            .split(SplitterConfig::new(split_body_lines()))
1949            .set_header("fragment", Value::String("yes".into()))
1950            .to("mock:per-fragment")
1951            .end_split()
1952            .build()
1953            .unwrap();
1954
1955        // Should have 1 top-level step: Split (containing 2 sub-steps)
1956        assert_eq!(definition.steps().len(), 1);
1957        match &definition.steps()[0] {
1958            BuilderStep::Split { steps, .. } => {
1959                assert_eq!(steps.len(), 2); // SetHeader + To
1960            }
1961            other => panic!("Expected Split, got {:?}", other),
1962        }
1963    }
1964
1965    #[test]
1966    fn test_split_builder_config_propagated() {
1967        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1968
1969        let definition = RouteBuilder::from("timer:test?period=1000")
1970            .route_id("test-route")
1971            .split(
1972                SplitterConfig::new(split_body_lines())
1973                    .parallel(true)
1974                    .parallel_limit(4)
1975                    .aggregation(AggregationStrategy::CollectAll),
1976            )
1977            .to("mock:per-fragment")
1978            .end_split()
1979            .build()
1980            .unwrap();
1981
1982        match &definition.steps()[0] {
1983            BuilderStep::Split { config, .. } => {
1984                assert!(config.parallel);
1985                assert_eq!(config.parallel_limit, Some(4));
1986                assert!(matches!(
1987                    config.aggregation,
1988                    AggregationStrategy::CollectAll
1989                ));
1990            }
1991            other => panic!("Expected Split, got {:?}", other),
1992        }
1993    }
1994
1995    #[test]
1996    fn test_aggregate_builder_adds_step() {
1997        use camel_api::aggregator::AggregatorConfig;
1998        use camel_core::route::BuilderStep;
1999
2000        let definition = RouteBuilder::from("timer:tick")
2001            .route_id("test-route")
2002            .aggregate(
2003                AggregatorConfig::correlate_by("key")
2004                    .complete_when_size(2)
2005                    .build(),
2006            )
2007            .build()
2008            .unwrap();
2009
2010        assert_eq!(definition.steps().len(), 1);
2011        assert!(matches!(
2012            definition.steps()[0],
2013            BuilderStep::Aggregate { .. }
2014        ));
2015    }
2016
2017    #[test]
2018    fn test_aggregate_in_split_builder() {
2019        use camel_api::aggregator::AggregatorConfig;
2020        use camel_api::splitter::{SplitterConfig, split_body_lines};
2021        use camel_core::route::BuilderStep;
2022
2023        let definition = RouteBuilder::from("timer:tick")
2024            .route_id("test-route")
2025            .split(SplitterConfig::new(split_body_lines()))
2026            .aggregate(
2027                AggregatorConfig::correlate_by("key")
2028                    .complete_when_size(1)
2029                    .build(),
2030            )
2031            .end_split()
2032            .build()
2033            .unwrap();
2034
2035        assert_eq!(definition.steps().len(), 1);
2036        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2037            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2038        } else {
2039            panic!("expected Split step");
2040        }
2041    }
2042
2043    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
2044
2045    #[test]
2046    fn test_builder_set_body_static_adds_processor() {
2047        let definition = RouteBuilder::from("timer:tick")
2048            .route_id("test-route")
2049            .set_body("fixed")
2050            .build()
2051            .unwrap();
2052        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2053    }
2054
2055    #[test]
2056    fn test_builder_set_body_fn_adds_processor() {
2057        let definition = RouteBuilder::from("timer:tick")
2058            .route_id("test-route")
2059            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2060            .build()
2061            .unwrap();
2062        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2063    }
2064
2065    #[test]
2066    fn transform_alias_produces_same_as_set_body() {
2067        let route_transform = RouteBuilder::from("timer:tick")
2068            .route_id("test-route")
2069            .transform("hello")
2070            .build()
2071            .unwrap();
2072
2073        let route_set_body = RouteBuilder::from("timer:tick")
2074            .route_id("test-route")
2075            .set_body("hello")
2076            .build()
2077            .unwrap();
2078
2079        assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2080    }
2081
2082    #[test]
2083    fn test_builder_set_header_fn_adds_processor() {
2084        let definition = RouteBuilder::from("timer:tick")
2085            .route_id("test-route")
2086            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2087            .build()
2088            .unwrap();
2089        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2090    }
2091
2092    #[tokio::test]
2093    async fn test_set_body_static_processor_works() {
2094        use camel_core::route::compose_pipeline;
2095        let def = RouteBuilder::from("t:t")
2096            .route_id("test-route")
2097            .set_body("replaced")
2098            .build()
2099            .unwrap();
2100        let pipeline = compose_pipeline(
2101            def.steps()
2102                .iter()
2103                .filter_map(|s| {
2104                    if let BuilderStep::Processor(p) = s {
2105                        Some(p.clone())
2106                    } else {
2107                        None
2108                    }
2109                })
2110                .collect(),
2111        );
2112        let exchange = Exchange::new(Message::new("original"));
2113        let result = pipeline.oneshot(exchange).await.unwrap();
2114        assert_eq!(result.input.body.as_text(), Some("replaced"));
2115    }
2116
2117    #[tokio::test]
2118    async fn test_set_body_fn_processor_works() {
2119        use camel_core::route::compose_pipeline;
2120        let def = RouteBuilder::from("t:t")
2121            .route_id("test-route")
2122            .set_body_fn(|ex: &Exchange| {
2123                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2124            })
2125            .build()
2126            .unwrap();
2127        let pipeline = compose_pipeline(
2128            def.steps()
2129                .iter()
2130                .filter_map(|s| {
2131                    if let BuilderStep::Processor(p) = s {
2132                        Some(p.clone())
2133                    } else {
2134                        None
2135                    }
2136                })
2137                .collect(),
2138        );
2139        let exchange = Exchange::new(Message::new("hello"));
2140        let result = pipeline.oneshot(exchange).await.unwrap();
2141        assert_eq!(result.input.body.as_text(), Some("HELLO"));
2142    }
2143
2144    #[tokio::test]
2145    async fn test_set_header_fn_processor_works() {
2146        use camel_core::route::compose_pipeline;
2147        let def = RouteBuilder::from("t:t")
2148            .route_id("test-route")
2149            .set_header_fn("echo", |ex: &Exchange| {
2150                ex.input
2151                    .body
2152                    .as_text()
2153                    .map(|t| Value::String(t.into()))
2154                    .unwrap_or(Value::Null)
2155            })
2156            .build()
2157            .unwrap();
2158        let pipeline = compose_pipeline(
2159            def.steps()
2160                .iter()
2161                .filter_map(|s| {
2162                    if let BuilderStep::Processor(p) = s {
2163                        Some(p.clone())
2164                    } else {
2165                        None
2166                    }
2167                })
2168                .collect(),
2169        );
2170        let exchange = Exchange::new(Message::new("ping"));
2171        let result = pipeline.oneshot(exchange).await.unwrap();
2172        assert_eq!(
2173            result.input.header("echo"),
2174            Some(&Value::String("ping".into()))
2175        );
2176    }
2177
2178    // ── FilterBuilder typestate tests ─────────────────────────────────────
2179
2180    #[test]
2181    fn test_filter_builder_typestate() {
2182        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2183            .route_id("test-route")
2184            .filter(|_ex| true)
2185            .to("mock:inner")
2186            .end_filter()
2187            .to("mock:outer")
2188            .build();
2189        assert!(result.is_ok());
2190    }
2191
2192    #[test]
2193    fn test_filter_builder_steps_collected() {
2194        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2195            .route_id("test-route")
2196            .filter(|_ex| true)
2197            .to("mock:inner")
2198            .end_filter()
2199            .build()
2200            .unwrap();
2201
2202        assert_eq!(definition.steps().len(), 1);
2203        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2204    }
2205
2206    #[test]
2207    fn test_wire_tap_builder_adds_step() {
2208        let definition = RouteBuilder::from("timer:tick")
2209            .route_id("test-route")
2210            .wire_tap("mock:tap")
2211            .to("mock:result")
2212            .build()
2213            .unwrap();
2214
2215        assert_eq!(definition.steps().len(), 2);
2216        assert!(
2217            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2218        );
2219        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2220    }
2221
2222    // ── MulticastBuilder typestate tests ─────────────────────────────────────
2223
2224    #[test]
2225    fn test_multicast_builder_typestate() {
2226        let definition = RouteBuilder::from("timer:tick")
2227            .route_id("test-route")
2228            .multicast()
2229            .to("direct:a")
2230            .to("direct:b")
2231            .end_multicast()
2232            .to("mock:result")
2233            .build()
2234            .unwrap();
2235
2236        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
2237    }
2238
2239    #[test]
2240    fn test_multicast_builder_steps_collected() {
2241        let definition = RouteBuilder::from("timer:tick")
2242            .route_id("test-route")
2243            .multicast()
2244            .to("direct:a")
2245            .to("direct:b")
2246            .end_multicast()
2247            .build()
2248            .unwrap();
2249
2250        match &definition.steps()[0] {
2251            BuilderStep::Multicast { steps, .. } => {
2252                assert_eq!(steps.len(), 2);
2253            }
2254            other => panic!("Expected Multicast, got {:?}", other),
2255        }
2256    }
2257
2258    // ── Concurrency builder tests ─────────────────────────────────────
2259
2260    #[test]
2261    fn test_builder_concurrent_sets_concurrency() {
2262        use camel_component_api::ConcurrencyModel;
2263
2264        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2265            .route_id("test-route")
2266            .concurrent(16)
2267            .to("log:info")
2268            .build()
2269            .unwrap();
2270
2271        assert_eq!(
2272            definition.concurrency_override(),
2273            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2274        );
2275    }
2276
2277    #[test]
2278    fn test_builder_concurrent_zero_means_unbounded() {
2279        use camel_component_api::ConcurrencyModel;
2280
2281        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2282            .route_id("test-route")
2283            .concurrent(0)
2284            .to("log:info")
2285            .build()
2286            .unwrap();
2287
2288        assert_eq!(
2289            definition.concurrency_override(),
2290            Some(&ConcurrencyModel::Concurrent { max: None })
2291        );
2292    }
2293
2294    #[test]
2295    fn test_builder_sequential_sets_concurrency() {
2296        use camel_component_api::ConcurrencyModel;
2297
2298        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2299            .route_id("test-route")
2300            .sequential()
2301            .to("log:info")
2302            .build()
2303            .unwrap();
2304
2305        assert_eq!(
2306            definition.concurrency_override(),
2307            Some(&ConcurrencyModel::Sequential)
2308        );
2309    }
2310
2311    #[test]
2312    fn test_builder_default_concurrency_is_none() {
2313        let definition = RouteBuilder::from("timer:tick")
2314            .route_id("test-route")
2315            .to("log:info")
2316            .build()
2317            .unwrap();
2318
2319        assert_eq!(definition.concurrency_override(), None);
2320    }
2321
2322    // ── Route lifecycle builder tests ─────────────────────────────────────
2323
2324    #[test]
2325    fn test_builder_route_id_sets_id() {
2326        let definition = RouteBuilder::from("timer:tick")
2327            .route_id("my-route")
2328            .build()
2329            .unwrap();
2330
2331        assert_eq!(definition.route_id(), "my-route");
2332    }
2333
2334    #[test]
2335    fn test_build_without_route_id_fails() {
2336        let result = RouteBuilder::from("timer:tick?period=1000")
2337            .to("log:info")
2338            .build();
2339        let err = match result {
2340            Err(e) => e.to_string(),
2341            Ok(_) => panic!("build() should fail without route_id"),
2342        };
2343        assert!(
2344            err.contains("route_id"),
2345            "error should mention route_id, got: {}",
2346            err
2347        );
2348    }
2349
2350    #[test]
2351    fn test_builder_empty_route_id_rejected() {
2352        let result = RouteBuilder::from("timer:tick").route_id("").build();
2353        let err = result.err().expect("empty route_id should be rejected");
2354        assert!(matches!(err, CamelError::RouteError(_)));
2355    }
2356
2357    #[test]
2358    fn test_builder_whitespace_route_id_rejected() {
2359        let result = RouteBuilder::from("timer:tick").route_id("   ").build();
2360        assert!(result.is_err());
2361    }
2362
2363    #[test]
2364    fn test_builder_auto_startup_false() {
2365        let definition = RouteBuilder::from("timer:tick")
2366            .route_id("test-route")
2367            .auto_startup(false)
2368            .build()
2369            .unwrap();
2370
2371        assert!(!definition.auto_startup());
2372    }
2373
2374    #[test]
2375    fn test_builder_startup_order_custom() {
2376        let definition = RouteBuilder::from("timer:tick")
2377            .route_id("test-route")
2378            .startup_order(50)
2379            .build()
2380            .unwrap();
2381
2382        assert_eq!(definition.startup_order(), 50);
2383    }
2384
2385    #[test]
2386    fn test_builder_defaults() {
2387        let definition = RouteBuilder::from("timer:tick")
2388            .route_id("test-route")
2389            .build()
2390            .unwrap();
2391
2392        assert_eq!(definition.route_id(), "test-route");
2393        assert!(definition.auto_startup());
2394        assert_eq!(definition.startup_order(), 1000);
2395    }
2396
2397    // ── Choice typestate tests ──────────────────────────────────────────────────
2398
2399    #[test]
2400    fn test_choice_builder_single_when() {
2401        let definition = RouteBuilder::from("timer:tick")
2402            .route_id("test-route")
2403            .choice()
2404            .when(|ex: &Exchange| ex.input.header("type").is_some())
2405            .to("mock:typed")
2406            .end_when()
2407            .end_choice()
2408            .build()
2409            .unwrap();
2410        assert_eq!(definition.steps().len(), 1);
2411        assert!(
2412            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2413            if whens.len() == 1 && otherwise.is_none())
2414        );
2415    }
2416
2417    #[test]
2418    fn test_choice_builder_when_otherwise() {
2419        let definition = RouteBuilder::from("timer:tick")
2420            .route_id("test-route")
2421            .choice()
2422            .when(|ex: &Exchange| ex.input.header("a").is_some())
2423            .to("mock:a")
2424            .end_when()
2425            .otherwise()
2426            .to("mock:fallback")
2427            .end_otherwise()
2428            .end_choice()
2429            .build()
2430            .unwrap();
2431        assert!(
2432            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2433            if whens.len() == 1 && otherwise.is_some())
2434        );
2435    }
2436
2437    #[test]
2438    fn test_choice_builder_multiple_whens() {
2439        let definition = RouteBuilder::from("timer:tick")
2440            .route_id("test-route")
2441            .choice()
2442            .when(|ex: &Exchange| ex.input.header("a").is_some())
2443            .to("mock:a")
2444            .end_when()
2445            .when(|ex: &Exchange| ex.input.header("b").is_some())
2446            .to("mock:b")
2447            .end_when()
2448            .end_choice()
2449            .build()
2450            .unwrap();
2451        assert!(
2452            matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2453            if whens.len() == 2)
2454        );
2455    }
2456
2457    #[test]
2458    fn test_choice_step_after_choice() {
2459        // Steps after end_choice() are added to the outer pipeline, not inside choice.
2460        let definition = RouteBuilder::from("timer:tick")
2461            .route_id("test-route")
2462            .choice()
2463            .when(|_ex: &Exchange| true)
2464            .to("mock:inner")
2465            .end_when()
2466            .end_choice()
2467            .to("mock:outer") // must be step[1], not inside choice
2468            .build()
2469            .unwrap();
2470        assert_eq!(definition.steps().len(), 2);
2471        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2472    }
2473
2474    // ── Throttle typestate tests ──────────────────────────────────────────────────
2475
2476    #[test]
2477    fn test_throttle_builder_typestate() {
2478        let definition = RouteBuilder::from("timer:tick")
2479            .route_id("test-route")
2480            .throttle(10, std::time::Duration::from_secs(1))
2481            .to("mock:result")
2482            .end_throttle()
2483            .build()
2484            .unwrap();
2485
2486        assert_eq!(definition.steps().len(), 1);
2487        assert!(matches!(
2488            &definition.steps()[0],
2489            BuilderStep::Throttle { .. }
2490        ));
2491    }
2492
2493    #[test]
2494    fn test_throttle_builder_with_strategy() {
2495        let definition = RouteBuilder::from("timer:tick")
2496            .route_id("test-route")
2497            .throttle(10, std::time::Duration::from_secs(1))
2498            .strategy(ThrottleStrategy::Reject)
2499            .to("mock:result")
2500            .end_throttle()
2501            .build()
2502            .unwrap();
2503
2504        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2505            assert_eq!(config.strategy, ThrottleStrategy::Reject);
2506        } else {
2507            panic!("Expected Throttle step");
2508        }
2509    }
2510
2511    #[test]
2512    fn test_throttle_builder_steps_collected() {
2513        let definition = RouteBuilder::from("timer:tick")
2514            .route_id("test-route")
2515            .throttle(5, std::time::Duration::from_secs(1))
2516            .set_header("throttled", Value::Bool(true))
2517            .to("mock:throttled")
2518            .end_throttle()
2519            .build()
2520            .unwrap();
2521
2522        match &definition.steps()[0] {
2523            BuilderStep::Throttle { steps, .. } => {
2524                assert_eq!(steps.len(), 2); // SetHeader + To
2525            }
2526            other => panic!("Expected Throttle, got {:?}", other),
2527        }
2528    }
2529
2530    #[test]
2531    fn test_throttle_step_after_throttle() {
2532        // Steps after end_throttle() are added to the outer pipeline, not inside throttle.
2533        let definition = RouteBuilder::from("timer:tick")
2534            .route_id("test-route")
2535            .throttle(10, std::time::Duration::from_secs(1))
2536            .to("mock:inner")
2537            .end_throttle()
2538            .to("mock:outer")
2539            .build()
2540            .unwrap();
2541
2542        assert_eq!(definition.steps().len(), 2);
2543        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2544    }
2545
2546    // ── LoadBalance typestate tests ──────────────────────────────────────────────────
2547
2548    #[test]
2549    fn test_load_balance_builder_typestate() {
2550        let definition = RouteBuilder::from("timer:tick")
2551            .route_id("test-route")
2552            .load_balance()
2553            .round_robin()
2554            .to("mock:a")
2555            .to("mock:b")
2556            .end_load_balance()
2557            .build()
2558            .unwrap();
2559
2560        assert_eq!(definition.steps().len(), 1);
2561        assert!(matches!(
2562            &definition.steps()[0],
2563            BuilderStep::LoadBalance { .. }
2564        ));
2565    }
2566
2567    #[test]
2568    fn test_load_balance_builder_with_strategy() {
2569        let definition = RouteBuilder::from("timer:tick")
2570            .route_id("test-route")
2571            .load_balance()
2572            .random()
2573            .to("mock:result")
2574            .end_load_balance()
2575            .build()
2576            .unwrap();
2577
2578        if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2579            assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2580        } else {
2581            panic!("Expected LoadBalance step");
2582        }
2583    }
2584
2585    #[test]
2586    fn test_load_balance_builder_steps_collected() {
2587        let definition = RouteBuilder::from("timer:tick")
2588            .route_id("test-route")
2589            .load_balance()
2590            .set_header("lb", Value::Bool(true))
2591            .to("mock:a")
2592            .end_load_balance()
2593            .build()
2594            .unwrap();
2595
2596        match &definition.steps()[0] {
2597            BuilderStep::LoadBalance { steps, .. } => {
2598                assert_eq!(steps.len(), 2); // SetHeader + To
2599            }
2600            other => panic!("Expected LoadBalance, got {:?}", other),
2601        }
2602    }
2603
2604    #[test]
2605    fn test_load_balance_step_after_load_balance() {
2606        // Steps after end_load_balance() are added to the outer pipeline, not inside load_balance.
2607        let definition = RouteBuilder::from("timer:tick")
2608            .route_id("test-route")
2609            .load_balance()
2610            .to("mock:inner")
2611            .end_load_balance()
2612            .to("mock:outer")
2613            .build()
2614            .unwrap();
2615
2616        assert_eq!(definition.steps().len(), 2);
2617        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2618    }
2619
2620    // ── DynamicRouter typestate tests ──────────────────────────────────────────────────
2621
2622    #[test]
2623    fn test_dynamic_router_builder() {
2624        let definition = RouteBuilder::from("timer:tick")
2625            .route_id("test-route")
2626            .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2627            .build()
2628            .unwrap();
2629
2630        assert_eq!(definition.steps().len(), 1);
2631        assert!(matches!(
2632            &definition.steps()[0],
2633            BuilderStep::DynamicRouter { .. }
2634        ));
2635    }
2636
2637    #[test]
2638    fn test_dynamic_router_builder_with_config() {
2639        let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2640            .max_iterations(100)
2641            .cache_size(500);
2642
2643        let definition = RouteBuilder::from("timer:tick")
2644            .route_id("test-route")
2645            .dynamic_router_with_config(config)
2646            .build()
2647            .unwrap();
2648
2649        assert_eq!(definition.steps().len(), 1);
2650        if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2651            assert_eq!(config.max_iterations, 100);
2652            assert_eq!(config.cache_size, 500);
2653        } else {
2654            panic!("Expected DynamicRouter step");
2655        }
2656    }
2657
2658    #[test]
2659    fn test_dynamic_router_step_after_router() {
2660        // Steps after dynamic_router() are added to the outer pipeline.
2661        let definition = RouteBuilder::from("timer:tick")
2662            .route_id("test-route")
2663            .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2664            .to("mock:outer")
2665            .build()
2666            .unwrap();
2667
2668        assert_eq!(definition.steps().len(), 2);
2669        assert!(matches!(
2670            &definition.steps()[0],
2671            BuilderStep::DynamicRouter { .. }
2672        ));
2673        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2674    }
2675
2676    #[test]
2677    fn routing_slip_builder_creates_step() {
2678        use camel_api::RoutingSlipExpression;
2679
2680        let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2681
2682        let route = RouteBuilder::from("direct:start")
2683            .route_id("routing-slip-test")
2684            .routing_slip(expression)
2685            .build()
2686            .unwrap();
2687
2688        assert!(
2689            matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2690            "Expected RoutingSlip step"
2691        );
2692    }
2693
2694    #[test]
2695    fn routing_slip_with_config_builder_creates_step() {
2696        use camel_api::RoutingSlipConfig;
2697
2698        let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2699            .uri_delimiter("|")
2700            .cache_size(50)
2701            .ignore_invalid_endpoints(true);
2702
2703        let route = RouteBuilder::from("direct:start")
2704            .route_id("routing-slip-config-test")
2705            .routing_slip_with_config(config)
2706            .build()
2707            .unwrap();
2708
2709        if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2710            assert_eq!(config.uri_delimiter, "|");
2711            assert_eq!(config.cache_size, 50);
2712            assert!(config.ignore_invalid_endpoints);
2713        } else {
2714            panic!("Expected RoutingSlip step");
2715        }
2716    }
2717
2718    #[test]
2719    fn test_builder_marshal_adds_processor_step() {
2720        let definition = RouteBuilder::from("timer:tick")
2721            .route_id("test-route")
2722            .marshal("json")
2723            .build()
2724            .unwrap();
2725        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2726    }
2727
2728    #[test]
2729    fn test_builder_unmarshal_adds_processor_step() {
2730        let definition = RouteBuilder::from("timer:tick")
2731            .route_id("test-route")
2732            .unmarshal("json")
2733            .build()
2734            .unwrap();
2735        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2736    }
2737
2738    #[test]
2739    fn test_builder_stream_cache_adds_processor_step() {
2740        let definition = RouteBuilder::from("timer:tick")
2741            .route_id("test-route")
2742            .stream_cache(1024)
2743            .build()
2744            .unwrap();
2745        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2746    }
2747
2748    #[test]
2749    fn validate_adds_to_step_with_validator_uri() {
2750        let def = RouteBuilder::from("direct:in")
2751            .route_id("test")
2752            .validate("schemas/order.xsd")
2753            .build()
2754            .unwrap();
2755        let steps = def.steps();
2756        assert_eq!(steps.len(), 1);
2757        assert!(
2758            matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2759            "got: {:?}",
2760            steps[0]
2761        );
2762    }
2763
2764    #[test]
2765    #[should_panic(expected = "unknown data format: 'csv'")]
2766    fn test_builder_marshal_panics_on_unknown_format() {
2767        let _ = RouteBuilder::from("timer:tick")
2768            .route_id("test-route")
2769            .marshal("csv")
2770            .build();
2771    }
2772
2773    #[test]
2774    #[should_panic(expected = "unknown data format: 'csv'")]
2775    fn test_builder_unmarshal_panics_on_unknown_format() {
2776        let _ = RouteBuilder::from("timer:tick")
2777            .route_id("test-route")
2778            .unmarshal("csv")
2779            .build();
2780    }
2781
2782    #[test]
2783    fn test_builder_recipient_list_creates_step() {
2784        let route = RouteBuilder::from("direct:start")
2785            .route_id("recipient-list-test")
2786            .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2787            .build()
2788            .unwrap();
2789
2790        assert!(matches!(
2791            &route.steps()[0],
2792            BuilderStep::RecipientList { .. }
2793        ));
2794    }
2795
2796    #[test]
2797    fn test_builder_recipient_list_with_config_creates_step() {
2798        let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2799
2800        let route = RouteBuilder::from("direct:start")
2801            .route_id("recipient-list-config-test")
2802            .recipient_list_with_config(config)
2803            .build()
2804            .unwrap();
2805
2806        assert!(matches!(
2807            &route.steps()[0],
2808            BuilderStep::RecipientList { .. }
2809        ));
2810    }
2811
2812    #[test]
2813    fn test_builder_script_adds_script_step() {
2814        let route = RouteBuilder::from("direct:start")
2815            .route_id("script-test")
2816            .script("rhai", "headers[\"x\"] = \"y\"")
2817            .build()
2818            .unwrap();
2819
2820        assert!(matches!(
2821            &route.steps()[0],
2822            BuilderStep::Script { language, script }
2823            if language == "rhai" && script == "headers[\"x\"] = \"y\""
2824        ));
2825    }
2826
2827    #[test]
2828    fn test_builder_delay_and_delay_with_header_add_steps() {
2829        let route = RouteBuilder::from("direct:start")
2830            .route_id("delay-test")
2831            .delay(Duration::from_millis(250))
2832            .delay_with_header(Duration::from_millis(500), "x-delay")
2833            .build()
2834            .unwrap();
2835
2836        assert_eq!(route.steps().len(), 2);
2837        assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
2838        assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
2839    }
2840
2841    #[test]
2842    fn test_builder_log_and_stop_add_steps_in_order() {
2843        let route = RouteBuilder::from("direct:start")
2844            .route_id("log-stop-test")
2845            .log("hello", LogLevel::Info)
2846            .stop()
2847            .to("mock:after")
2848            .build()
2849            .unwrap();
2850
2851        assert_eq!(route.steps().len(), 3);
2852        assert!(matches!(
2853            &route.steps()[0],
2854            BuilderStep::Log { message, .. } if message == "hello"
2855        ));
2856        assert!(matches!(&route.steps()[1], BuilderStep::Stop));
2857        assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
2858    }
2859
2860    #[test]
2861    fn test_builder_stream_cache_default_adds_processor_step() {
2862        let route = RouteBuilder::from("direct:start")
2863            .route_id("stream-cache-default-test")
2864            .stream_cache_default()
2865            .build()
2866            .unwrap();
2867
2868        assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
2869    }
2870
2871    #[test]
2872    fn test_validate_preserves_existing_validator_prefix() {
2873        let route = RouteBuilder::from("direct:in")
2874            .route_id("validate-prefix-test")
2875            .validate("validator:schemas/order.xsd")
2876            .build()
2877            .unwrap();
2878
2879        assert!(matches!(
2880            &route.steps()[0],
2881            BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
2882        ));
2883    }
2884
2885    #[test]
2886    fn test_load_balance_builder_weighted_failover_parallel_config() {
2887        let route = RouteBuilder::from("direct:start")
2888            .route_id("lb-weighted-failover-parallel")
2889            .load_balance()
2890            .weighted(vec![
2891                ("direct:a".to_string(), 3),
2892                ("direct:b".to_string(), 1),
2893            ])
2894            .failover()
2895            .parallel(true)
2896            .to("mock:result")
2897            .end_load_balance()
2898            .build()
2899            .unwrap();
2900
2901        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
2902            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
2903            assert!(config.parallel);
2904        } else {
2905            panic!("Expected LoadBalance step");
2906        }
2907    }
2908
2909    #[test]
2910    fn test_multicast_builder_all_config_setters() {
2911        let route = RouteBuilder::from("direct:start")
2912            .route_id("multicast-config-test")
2913            .multicast()
2914            .parallel(true)
2915            .parallel_limit(4)
2916            .stop_on_exception(true)
2917            .timeout(Duration::from_millis(300))
2918            .aggregation(MulticastStrategy::Original)
2919            .to("mock:a")
2920            .end_multicast()
2921            .build()
2922            .unwrap();
2923
2924        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
2925            assert!(config.parallel);
2926            assert_eq!(config.parallel_limit, Some(4));
2927            assert!(config.stop_on_exception);
2928            assert_eq!(config.timeout, Some(Duration::from_millis(300)));
2929            assert!(matches!(config.aggregation, MulticastStrategy::Original));
2930        } else {
2931            panic!("Expected Multicast step");
2932        }
2933    }
2934
2935    #[test]
2936    fn test_build_canonical_rejects_unsupported_processor_step() {
2937        let err = RouteBuilder::from("direct:start")
2938            .route_id("canonical-reject")
2939            .set_header("k", Value::String("v".into()))
2940            .build_canonical()
2941            .unwrap_err();
2942
2943        assert!(format!("{err}").contains("does not support step `processor`"));
2944    }
2945
2946    // ── LoadBalance strategy-specific tests ─────────────────────────────────────
2947
2948    #[test]
2949    fn test_load_balance_builder_weighted_strategy() {
2950        let route = RouteBuilder::from("direct:start")
2951            .route_id("lb-weighted")
2952            .load_balance()
2953            .weighted(vec![
2954                ("direct:a".to_string(), 5),
2955                ("direct:b".to_string(), 2),
2956                ("direct:c".to_string(), 1),
2957            ])
2958            .to("mock:result")
2959            .end_load_balance()
2960            .build()
2961            .unwrap();
2962
2963        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
2964            assert!(matches!(config.strategy, LoadBalanceStrategy::Weighted(_)));
2965        } else {
2966            panic!("Expected LoadBalance step");
2967        }
2968    }
2969
2970    #[test]
2971    fn test_load_balance_builder_failover_strategy() {
2972        let route = RouteBuilder::from("direct:start")
2973            .route_id("lb-failover")
2974            .load_balance()
2975            .failover()
2976            .to("mock:primary")
2977            .end_load_balance()
2978            .build()
2979            .unwrap();
2980
2981        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
2982            assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
2983            assert!(!config.parallel);
2984        } else {
2985            panic!("Expected LoadBalance step");
2986        }
2987    }
2988
2989    #[test]
2990    fn test_load_balance_builder_parallel_false_explicit() {
2991        let route = RouteBuilder::from("direct:start")
2992            .route_id("lb-parallel-false")
2993            .load_balance()
2994            .round_robin()
2995            .parallel(false)
2996            .to("mock:result")
2997            .end_load_balance()
2998            .build()
2999            .unwrap();
3000
3001        if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3002            assert!(!config.parallel);
3003        } else {
3004            panic!("Expected LoadBalance step");
3005        }
3006    }
3007
3008    // ── FilterInSplitBuilder tests ──────────────────────────────────────────────
3009
3010    #[test]
3011    fn test_filter_in_split_builder_typestate() {
3012        use camel_api::splitter::{SplitterConfig, split_body_lines};
3013
3014        let definition = RouteBuilder::from("timer:test")
3015            .route_id("filter-in-split")
3016            .split(SplitterConfig::new(split_body_lines()))
3017            .filter(|_ex| true)
3018            .to("mock:filtered")
3019            .end_filter()
3020            .end_split()
3021            .build()
3022            .unwrap();
3023
3024        assert_eq!(definition.steps().len(), 1);
3025        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3026            assert_eq!(steps.len(), 1);
3027            assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3028        } else {
3029            panic!("Expected Split step");
3030        }
3031    }
3032
3033    #[test]
3034    fn test_filter_in_split_builder_multiple_steps() {
3035        use camel_api::splitter::{SplitterConfig, split_body_lines};
3036
3037        let definition = RouteBuilder::from("timer:test")
3038            .route_id("filter-in-split-multi")
3039            .split(SplitterConfig::new(split_body_lines()))
3040            .to("mock:before-filter")
3041            .filter(|_ex| true)
3042            .to("mock:inside-filter")
3043            .end_filter()
3044            .to("mock:after-filter")
3045            .end_split()
3046            .build()
3047            .unwrap();
3048
3049        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3050            // To("before-filter") + Filter{...} + To("after-filter") = 3
3051            assert_eq!(steps.len(), 3);
3052        } else {
3053            panic!("Expected Split step");
3054        }
3055    }
3056
3057    // ── build_canonical tests ───────────────────────────────────────────────────
3058
3059    #[test]
3060    fn test_build_canonical_with_circuit_breaker() {
3061        use camel_api::circuit_breaker::CircuitBreakerConfig;
3062
3063        let spec = RouteBuilder::from("direct:start")
3064            .route_id("canonical-cb")
3065            .circuit_breaker(CircuitBreakerConfig::new().failure_threshold(10))
3066            .to("mock:result")
3067            .build_canonical()
3068            .unwrap();
3069
3070        let cb = spec.circuit_breaker.expect("circuit breaker should be set");
3071        assert_eq!(cb.failure_threshold, 10);
3072    }
3073
3074    #[test]
3075    fn test_build_canonical_rejects_custom_split_aggregation() {
3076        use camel_api::splitter::{SplitterConfig, split_body_lines};
3077
3078        let err = RouteBuilder::from("direct:start")
3079            .route_id("canonical-custom-split")
3080            .split(SplitterConfig::new(split_body_lines()).aggregation(
3081                camel_api::splitter::AggregationStrategy::Custom(Arc::new(|_, ex| ex)),
3082            ))
3083            .to("mock:frag")
3084            .end_split()
3085            .build_canonical()
3086            .unwrap_err();
3087
3088        // Split with closure-based expression is rejected in canonical v1.
3089        assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3090    }
3091
3092    #[test]
3093    fn test_build_canonical_rejects_custom_aggregate_strategy() {
3094        let err = RouteBuilder::from("direct:start")
3095            .route_id("canonical-custom-agg")
3096            .aggregate(
3097                AggregatorConfig::correlate_by("key")
3098                    .complete_when_size(2)
3099                    .strategy(AggregationStrategy::Custom(Arc::new(|_, ex| ex)))
3100                    .build(),
3101            )
3102            .build_canonical()
3103            .unwrap_err();
3104
3105        assert!(format!("{err}").contains("custom aggregate strategy"));
3106    }
3107
3108    #[test]
3109    fn test_build_canonical_rejects_fn_correlation_strategy() {
3110        let err = RouteBuilder::from("direct:start")
3111            .route_id("canonical-fn-corr")
3112            .aggregate(AggregatorConfig {
3113                header_name: "key".to_string(),
3114                completion: CompletionMode::Single(CompletionCondition::Size(1)),
3115                correlation: CorrelationStrategy::Fn(Arc::new(|_| Some("key".to_string()))),
3116                strategy: AggregationStrategy::CollectAll,
3117                max_buckets: None,
3118                bucket_ttl: None,
3119                force_completion_on_stop: false,
3120                discard_on_timeout: false,
3121            })
3122            .build_canonical()
3123            .unwrap_err();
3124
3125        assert!(format!("{err}").contains("Fn correlation strategy"));
3126    }
3127
3128    #[test]
3129    fn test_build_canonical_rejects_predicate_completion() {
3130        let err = RouteBuilder::from("direct:start")
3131            .route_id("canonical-pred-completion")
3132            .aggregate(AggregatorConfig {
3133                header_name: "key".to_string(),
3134                completion: CompletionMode::Single(CompletionCondition::Predicate(Arc::new(
3135                    |_| false,
3136                ))),
3137                correlation: CorrelationStrategy::HeaderName("key".to_string()),
3138                strategy: AggregationStrategy::CollectAll,
3139                max_buckets: None,
3140                bucket_ttl: None,
3141                force_completion_on_stop: false,
3142                discard_on_timeout: false,
3143            })
3144            .build_canonical()
3145            .unwrap_err();
3146
3147        assert!(format!("{err}").contains("predicate completion"));
3148    }
3149
3150    #[test]
3151    fn test_build_canonical_with_expression_correlation() {
3152        let spec = RouteBuilder::from("direct:start")
3153            .route_id("canonical-expr-corr")
3154            .aggregate(AggregatorConfig {
3155                header_name: "key".to_string(),
3156                completion: CompletionMode::Single(CompletionCondition::Size(1)),
3157                correlation: CorrelationStrategy::Expression {
3158                    expr: "header.key".to_string(),
3159                    language: "simple".to_string(),
3160                },
3161                strategy: AggregationStrategy::CollectAll,
3162                max_buckets: None,
3163                bucket_ttl: None,
3164                force_completion_on_stop: false,
3165                discard_on_timeout: false,
3166            })
3167            .build_canonical()
3168            .unwrap();
3169
3170        assert!(spec.steps.iter().any(|s| matches!(s, CanonicalStepSpec::Aggregate(a) if a.correlation_key == Some("header.key".to_string()))));
3171    }
3172
3173    #[test]
3174    fn test_build_canonical_split_rejected_with_closure_expression() {
3175        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
3176
3177        // Builder-based split uses closure expressions, which are not serializable.
3178        let err = RouteBuilder::from("direct:start")
3179            .route_id("canonical-split-last")
3180            .split(
3181                SplitterConfig::new(split_body_lines()).aggregation(AggregationStrategy::LastWins),
3182            )
3183            .to("mock:frag")
3184            .end_split()
3185            .build_canonical()
3186            .unwrap_err();
3187
3188        assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3189    }
3190
3191    // ── OnExceptionBuilder full chain tests ─────────────────────────────────────
3192
3193    #[test]
3194    fn test_on_exception_full_chain_retry_backoff_jitter_handled_by() {
3195        let definition = RouteBuilder::from("direct:start")
3196            .route_id("on-exception-full")
3197            .dead_letter_channel("log:dlc")
3198            .on_exception(|e| matches!(e, CamelError::Io(_)))
3199            .retry(5)
3200            .with_backoff(Duration::from_millis(10), 2.0, Duration::from_millis(500))
3201            .with_jitter(0.3)
3202            .handled_by("log:io-handler")
3203            .end_on_exception()
3204            .to("mock:out")
3205            .build()
3206            .unwrap();
3207
3208        let cfg = definition
3209            .error_handler_config()
3210            .expect("error handler should be set");
3211        assert_eq!(cfg.policies.len(), 1);
3212        let policy = &cfg.policies[0];
3213        let retry = policy.retry.as_ref().expect("retry should be set");
3214        assert_eq!(retry.max_attempts, 5);
3215        assert_eq!(retry.initial_delay, Duration::from_millis(10));
3216        assert_eq!(retry.multiplier, 2.0);
3217        assert_eq!(retry.max_delay, Duration::from_millis(500));
3218        assert!((retry.jitter_factor - 0.3).abs() < f64::EPSILON);
3219        assert_eq!(policy.handled_by.as_deref(), Some("log:io-handler"));
3220    }
3221
3222    #[test]
3223    fn test_on_exception_jitter_clamped_to_valid_range() {
3224        let definition = RouteBuilder::from("direct:start")
3225            .route_id("jitter-clamp")
3226            .on_exception(|_e| true)
3227            .retry(1)
3228            .with_jitter(5.0)
3229            .end_on_exception()
3230            .to("mock:out")
3231            .build()
3232            .unwrap();
3233
3234        let cfg = definition.error_handler_config().unwrap();
3235        let retry = cfg.policies[0].retry.as_ref().unwrap();
3236        assert!((retry.jitter_factor - 1.0).abs() < f64::EPSILON);
3237    }
3238
3239    // ── StepAccumulator: process_fn, convert_body_to, bean ──────────────────────
3240
3241    #[test]
3242    fn test_builder_process_fn_adds_processor_step() {
3243        use camel_api::BoxProcessorExt;
3244        let processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
3245        let definition = RouteBuilder::from("timer:tick")
3246            .route_id("process-fn-test")
3247            .process_fn(processor)
3248            .build()
3249            .unwrap();
3250
3251        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3252    }
3253
3254    #[test]
3255    fn test_builder_convert_body_to_adds_processor_step() {
3256        let definition = RouteBuilder::from("timer:tick")
3257            .route_id("convert-body-test")
3258            .convert_body_to(BodyType::Json)
3259            .build()
3260            .unwrap();
3261
3262        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3263    }
3264
3265    #[test]
3266    fn test_builder_bean_adds_bean_step() {
3267        let definition = RouteBuilder::from("timer:tick")
3268            .route_id("bean-test")
3269            .bean("myBean", "process")
3270            .build()
3271            .unwrap();
3272
3273        assert!(
3274            matches!(&definition.steps()[0], BuilderStep::Bean { name, method }
3275            if name == "myBean" && method == "process")
3276        );
3277    }
3278
3279    // ── Throttle strategy-specific tests ────────────────────────────────────────
3280
3281    #[test]
3282    fn test_throttle_builder_delay_strategy() {
3283        let definition = RouteBuilder::from("timer:tick")
3284            .route_id("throttle-delay")
3285            .throttle(10, Duration::from_secs(1))
3286            .strategy(ThrottleStrategy::Delay)
3287            .to("mock:result")
3288            .end_throttle()
3289            .build()
3290            .unwrap();
3291
3292        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3293            assert_eq!(config.strategy, ThrottleStrategy::Delay);
3294        } else {
3295            panic!("Expected Throttle step");
3296        }
3297    }
3298
3299    #[test]
3300    fn test_throttle_builder_drop_strategy() {
3301        let definition = RouteBuilder::from("timer:tick")
3302            .route_id("throttle-drop")
3303            .throttle(10, Duration::from_secs(1))
3304            .strategy(ThrottleStrategy::Drop)
3305            .to("mock:result")
3306            .end_throttle()
3307            .build()
3308            .unwrap();
3309
3310        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3311            assert_eq!(config.strategy, ThrottleStrategy::Drop);
3312        } else {
3313            panic!("Expected Throttle step");
3314        }
3315    }
3316
3317    // ── LoopInLoopBuilder with loop_while ───────────────────────────────────────
3318
3319    #[test]
3320    fn test_nested_loop_while_builder() {
3321        use camel_api::loop_eip::LoopMode;
3322
3323        let def = RouteBuilder::from("direct:start")
3324            .route_id("nested-loop-while")
3325            .loop_count(2)
3326            .to("mock:outer")
3327            .loop_while(|_ex| true)
3328            .to("mock:inner")
3329            .end_loop()
3330            .end_loop()
3331            .build()
3332            .unwrap();
3333
3334        assert_eq!(def.steps().len(), 1);
3335        if let BuilderStep::Loop { steps, .. } = &def.steps()[0] {
3336            assert_eq!(steps.len(), 2);
3337            if let BuilderStep::Loop { config, .. } = &steps[1] {
3338                assert!(matches!(config.mode, LoopMode::While(_)));
3339            } else {
3340                panic!("Expected inner Loop step");
3341            }
3342        } else {
3343            panic!("Expected outer Loop step");
3344        }
3345    }
3346
3347    // ── Choice with multiple whens + otherwise ──────────────────────────────────
3348
3349    #[test]
3350    fn test_choice_builder_multiple_whens_with_otherwise() {
3351        let definition = RouteBuilder::from("timer:tick")
3352            .route_id("choice-multi-otherwise")
3353            .choice()
3354            .when(|ex: &Exchange| ex.input.header("a").is_some())
3355            .to("mock:a")
3356            .end_when()
3357            .when(|ex: &Exchange| ex.input.header("b").is_some())
3358            .to("mock:b")
3359            .end_when()
3360            .when(|ex: &Exchange| ex.input.header("c").is_some())
3361            .to("mock:c")
3362            .end_when()
3363            .otherwise()
3364            .to("mock:fallback")
3365            .end_otherwise()
3366            .end_choice()
3367            .build()
3368            .unwrap();
3369
3370        if let BuilderStep::Choice { whens, otherwise } = &definition.steps()[0] {
3371            assert_eq!(whens.len(), 3);
3372            assert!(otherwise.is_some());
3373            assert_eq!(otherwise.as_ref().unwrap().len(), 1);
3374        } else {
3375            panic!("Expected Choice step");
3376        }
3377    }
3378
3379    // ── Multicast individual config tests ───────────────────────────────────────
3380
3381    #[test]
3382    fn test_multicast_builder_parallel_only() {
3383        let route = RouteBuilder::from("direct:start")
3384            .route_id("multicast-parallel")
3385            .multicast()
3386            .parallel(true)
3387            .to("mock:a")
3388            .end_multicast()
3389            .build()
3390            .unwrap();
3391
3392        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3393            assert!(config.parallel);
3394            assert_eq!(config.parallel_limit, None);
3395        } else {
3396            panic!("Expected Multicast step");
3397        }
3398    }
3399
3400    #[test]
3401    fn test_multicast_builder_timeout_only() {
3402        let route = RouteBuilder::from("direct:start")
3403            .route_id("multicast-timeout")
3404            .multicast()
3405            .timeout(Duration::from_secs(5))
3406            .to("mock:a")
3407            .end_multicast()
3408            .build()
3409            .unwrap();
3410
3411        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3412            assert_eq!(config.timeout, Some(Duration::from_secs(5)));
3413        } else {
3414            panic!("Expected Multicast step");
3415        }
3416    }
3417
3418    #[test]
3419    fn test_multicast_builder_aggregation_collect_all() {
3420        let route = RouteBuilder::from("direct:start")
3421            .route_id("multicast-collect")
3422            .multicast()
3423            .aggregation(MulticastStrategy::CollectAll)
3424            .to("mock:a")
3425            .end_multicast()
3426            .build()
3427            .unwrap();
3428
3429        if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3430            assert!(matches!(config.aggregation, MulticastStrategy::CollectAll));
3431        } else {
3432            panic!("Expected Multicast step");
3433        }
3434    }
3435
3436    // ── extract_completion_fields: Any mode with multiple conditions ────────────
3437
3438    #[test]
3439    fn test_build_canonical_aggregate_any_completion_mode() {
3440        let spec = RouteBuilder::from("direct:start")
3441            .route_id("canonical-any-completion")
3442            .aggregate(
3443                AggregatorConfig::correlate_by("key")
3444                    .complete_on_size_or_timeout(10, Duration::from_secs(30))
3445                    .build(),
3446            )
3447            .build_canonical()
3448            .unwrap();
3449
3450        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3451            assert_eq!(agg.completion_size, Some(10));
3452            assert_eq!(agg.completion_timeout_ms, Some(30_000));
3453        } else {
3454            panic!("Expected Aggregate step");
3455        }
3456    }
3457
3458    #[test]
3459    fn test_build_canonical_aggregate_timeout_completion() {
3460        let spec = RouteBuilder::from("direct:start")
3461            .route_id("canonical-timeout-completion")
3462            .aggregate(
3463                AggregatorConfig::correlate_by("key")
3464                    .complete_on_timeout(Duration::from_millis(500))
3465                    .build(),
3466            )
3467            .build_canonical()
3468            .unwrap();
3469
3470        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3471            assert_eq!(agg.completion_size, None);
3472            assert_eq!(agg.completion_timeout_ms, Some(500));
3473        } else {
3474            panic!("Expected Aggregate step");
3475        }
3476    }
3477
3478    // ── canonicalize_aggregate: discard_on_timeout and force_completion_on_stop ─
3479
3480    #[test]
3481    fn test_build_canonical_aggregate_discard_on_timeout() {
3482        use camel_api::aggregator::AggregatorConfig;
3483
3484        let spec = RouteBuilder::from("direct:start")
3485            .route_id("canonical-discard-timeout")
3486            .aggregate(
3487                AggregatorConfig::correlate_by("key")
3488                    .complete_when_size(1)
3489                    .discard_on_timeout(true)
3490                    .build(),
3491            )
3492            .build_canonical()
3493            .unwrap();
3494
3495        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3496            assert_eq!(agg.discard_on_timeout, Some(true));
3497        } else {
3498            panic!("Expected Aggregate step");
3499        }
3500    }
3501
3502    #[test]
3503    fn test_build_canonical_aggregate_force_completion_on_stop() {
3504        use camel_api::aggregator::AggregatorConfig;
3505
3506        let spec = RouteBuilder::from("direct:start")
3507            .route_id("canonical-force-stop")
3508            .aggregate(
3509                AggregatorConfig::correlate_by("key")
3510                    .complete_when_size(1)
3511                    .force_completion_on_stop(true)
3512                    .build(),
3513            )
3514            .build_canonical()
3515            .unwrap();
3516
3517        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3518            assert_eq!(agg.force_completion_on_stop, Some(true));
3519        } else {
3520            panic!("Expected Aggregate step");
3521        }
3522    }
3523
3524    // ── build_canonical: max_buckets and bucket_ttl ─────────────────────────────
3525
3526    #[test]
3527    fn test_build_canonical_aggregate_max_buckets_and_ttl() {
3528        use camel_api::aggregator::AggregatorConfig;
3529
3530        let spec = RouteBuilder::from("direct:start")
3531            .route_id("canonical-buckets-ttl")
3532            .aggregate(
3533                AggregatorConfig::correlate_by("key")
3534                    .complete_when_size(1)
3535                    .max_buckets(100)
3536                    .bucket_ttl(Duration::from_secs(60))
3537                    .build(),
3538            )
3539            .build_canonical()
3540            .unwrap();
3541
3542        if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3543            assert_eq!(agg.max_buckets, Some(100));
3544            assert_eq!(agg.bucket_ttl_ms, Some(60_000));
3545        } else {
3546            panic!("Expected Aggregate step");
3547        }
3548    }
3549
3550    // ── SplitBuilder with filter inside ─────────────────────────────────────────
3551
3552    #[test]
3553    fn test_split_builder_with_filter_inside() {
3554        use camel_api::splitter::{SplitterConfig, split_body_lines};
3555
3556        let definition = RouteBuilder::from("timer:test")
3557            .route_id("split-with-filter")
3558            .split(SplitterConfig::new(split_body_lines()))
3559            .filter(|_ex| true)
3560            .to("mock:filtered-frag")
3561            .end_filter()
3562            .end_split()
3563            .build()
3564            .unwrap();
3565
3566        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3567            assert_eq!(steps.len(), 1);
3568            assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3569        } else {
3570            panic!("Expected Split step");
3571        }
3572    }
3573
3574    // ── WireTap additional tests ────────────────────────────────────────────────
3575
3576    #[test]
3577    fn test_wire_tap_multiple_taps() {
3578        let definition = RouteBuilder::from("timer:tick")
3579            .route_id("multi-wire-tap")
3580            .wire_tap("mock:tap1")
3581            .wire_tap("mock:tap2")
3582            .to("mock:result")
3583            .build()
3584            .unwrap();
3585
3586        assert_eq!(definition.steps().len(), 3);
3587        assert!(
3588            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap1")
3589        );
3590        assert!(
3591            matches!(&definition.steps()[1], BuilderStep::WireTap { uri } if uri == "mock:tap2")
3592        );
3593    }
3594
3595    // ── Error handler: explicit config after shorthand → Mixed mode ─────────────
3596
3597    #[test]
3598    fn test_builder_shorthand_then_explicit_mixed_mode() {
3599        let result = RouteBuilder::from("direct:start")
3600            .route_id("mixed-mode-2")
3601            .dead_letter_channel("log:dlc")
3602            .error_handler(ErrorHandlerConfig::log_only())
3603            .to("mock:out")
3604            .build();
3605
3606        let err = result.err().expect("mixed mode should fail");
3607        assert!(format!("{err}").contains("mixed error handler modes"));
3608    }
3609
3610    // ── build_canonical: empty from_uri error ───────────────────────────────────
3611
3612    #[test]
3613    fn test_build_canonical_empty_from_uri_errors() {
3614        let result = RouteBuilder::from("").route_id("test").build_canonical();
3615        assert!(result.is_err());
3616    }
3617
3618    #[test]
3619    fn test_build_canonical_missing_route_id_errors() {
3620        let result = RouteBuilder::from("direct:start").build_canonical();
3621        assert!(result.is_err());
3622        let err = result.unwrap_err().to_string();
3623        assert!(err.contains("route_id"));
3624    }
3625
3626    // ── SplitBuilder: aggregate inside split ────────────────────────────────────
3627
3628    #[test]
3629    fn test_split_builder_with_aggregate_inside() {
3630        use camel_api::aggregator::AggregatorConfig;
3631        use camel_api::splitter::{SplitterConfig, split_body_lines};
3632
3633        let definition = RouteBuilder::from("timer:test")
3634            .route_id("split-agg")
3635            .split(SplitterConfig::new(split_body_lines()))
3636            .aggregate(
3637                AggregatorConfig::correlate_by("frag-key")
3638                    .complete_when_size(3)
3639                    .build(),
3640            )
3641            .end_split()
3642            .build()
3643            .unwrap();
3644
3645        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3646            assert_eq!(steps.len(), 1);
3647            assert!(matches!(&steps[0], BuilderStep::Aggregate { .. }));
3648        } else {
3649            panic!("Expected Split step");
3650        }
3651    }
3652
3653    // ── Throttle: steps collected inside throttle scope ─────────────────────────
3654
3655    #[test]
3656    fn test_throttle_builder_with_steps_inside() {
3657        let definition = RouteBuilder::from("timer:tick")
3658            .route_id("throttle-steps")
3659            .throttle(10, Duration::from_secs(1))
3660            .set_header("throttled", Value::Bool(true))
3661            .to("mock:throttled")
3662            .end_throttle()
3663            .build()
3664            .unwrap();
3665
3666        if let BuilderStep::Throttle { steps, .. } = &definition.steps()[0] {
3667            assert_eq!(steps.len(), 2);
3668        } else {
3669            panic!("Expected Throttle step");
3670        }
3671    }
3672
3673    // ── LoadBalance: steps collected inside scope ───────────────────────────────
3674
3675    #[test]
3676    fn test_load_balance_builder_with_steps_inside() {
3677        let definition = RouteBuilder::from("timer:tick")
3678            .route_id("lb-steps")
3679            .load_balance()
3680            .round_robin()
3681            .set_header("lb", Value::Bool(true))
3682            .to("mock:lb")
3683            .end_load_balance()
3684            .build()
3685            .unwrap();
3686
3687        if let BuilderStep::LoadBalance { steps, .. } = &definition.steps()[0] {
3688            assert_eq!(steps.len(), 2);
3689        } else {
3690            panic!("Expected LoadBalance step");
3691        }
3692    }
3693
3694    // ── Multicast: steps collected inside scope ─────────────────────────────────
3695
3696    #[test]
3697    fn test_multicast_builder_with_steps_inside() {
3698        let definition = RouteBuilder::from("timer:tick")
3699            .route_id("multicast-steps")
3700            .multicast()
3701            .set_header("mc", Value::Bool(true))
3702            .to("mock:multicast")
3703            .end_multicast()
3704            .build()
3705            .unwrap();
3706
3707        if let BuilderStep::Multicast { steps, .. } = &definition.steps()[0] {
3708            assert_eq!(steps.len(), 2);
3709        } else {
3710            panic!("Expected Multicast step");
3711        }
3712    }
3713
3714    // ── LoopBuilder: steps collected inside loop scope ──────────────────────────
3715
3716    #[test]
3717    fn test_loop_builder_with_steps_inside() {
3718        let definition = RouteBuilder::from("timer:tick")
3719            .route_id("loop-steps")
3720            .loop_count(3)
3721            .set_header("loop", Value::Bool(true))
3722            .to("mock:loop")
3723            .end_loop()
3724            .build()
3725            .unwrap();
3726
3727        if let BuilderStep::Loop { steps, .. } = &definition.steps()[0] {
3728            assert_eq!(steps.len(), 2);
3729        } else {
3730            panic!("Expected Loop step");
3731        }
3732    }
3733
3734    // ── canonical_step_name coverage for remaining variants ─────────────────────
3735
3736    #[test]
3737    fn test_build_canonical_rejects_loop_step() {
3738        let err = RouteBuilder::from("direct:start")
3739            .route_id("canonical-loop")
3740            .loop_count(3)
3741            .to("mock:loop")
3742            .end_loop()
3743            .build_canonical()
3744            .unwrap_err();
3745
3746        assert!(format!("{err}").contains("does not support step `loop`"));
3747    }
3748
3749    #[test]
3750    fn test_build_canonical_rejects_multicast_step() {
3751        let err = RouteBuilder::from("direct:start")
3752            .route_id("canonical-multicast")
3753            .multicast()
3754            .to("mock:a")
3755            .end_multicast()
3756            .build_canonical()
3757            .unwrap_err();
3758
3759        assert!(format!("{err}").contains("does not support step `multicast`"));
3760    }
3761
3762    #[test]
3763    fn test_build_canonical_rejects_throttle_step() {
3764        let err = RouteBuilder::from("direct:start")
3765            .route_id("canonical-throttle")
3766            .throttle(10, Duration::from_secs(1))
3767            .to("mock:result")
3768            .end_throttle()
3769            .build_canonical()
3770            .unwrap_err();
3771
3772        assert!(format!("{err}").contains("does not support step `throttle`"));
3773    }
3774
3775    #[test]
3776    fn test_build_canonical_rejects_load_balancer_step() {
3777        let err = RouteBuilder::from("direct:start")
3778            .route_id("canonical-lb")
3779            .load_balance()
3780            .round_robin()
3781            .to("mock:result")
3782            .end_load_balance()
3783            .build_canonical()
3784            .unwrap_err();
3785
3786        assert!(format!("{err}").contains("does not support step `load_balancer`"));
3787    }
3788
3789    #[test]
3790    fn test_build_canonical_rejects_bean_step() {
3791        let err = RouteBuilder::from("direct:start")
3792            .route_id("canonical-bean")
3793            .bean("myBean", "process")
3794            .build_canonical()
3795            .unwrap_err();
3796
3797        assert!(format!("{err}").contains("does not support step `bean`"));
3798    }
3799
3800    #[test]
3801    fn test_build_canonical_rejects_script_step() {
3802        let err = RouteBuilder::from("direct:start")
3803            .route_id("canonical-script")
3804            .script("rhai", "x = 1")
3805            .build_canonical()
3806            .unwrap_err();
3807
3808        assert!(format!("{err}").contains("does not support step `script`"));
3809    }
3810
3811    #[test]
3812    fn test_build_canonical_accepts_delay_step() {
3813        let spec = RouteBuilder::from("direct:start")
3814            .route_id("canonical-delay")
3815            .delay(Duration::from_millis(100))
3816            .build_canonical()
3817            .unwrap();
3818
3819        assert!(
3820            spec.steps.iter().any(
3821                |s| matches!(s, CanonicalStepSpec::Delay { delay_ms, .. } if *delay_ms == 100)
3822            )
3823        );
3824    }
3825
3826    #[test]
3827    fn test_build_canonical_accepts_wire_tap_step() {
3828        let spec = RouteBuilder::from("direct:start")
3829            .route_id("canonical-wiretap")
3830            .wire_tap("mock:tap")
3831            .build_canonical()
3832            .unwrap();
3833
3834        assert!(
3835            spec.steps
3836                .iter()
3837                .any(|s| matches!(s, CanonicalStepSpec::WireTap { uri } if uri == "mock:tap"))
3838        );
3839    }
3840
3841    #[test]
3842    fn test_build_canonical_rejects_dynamic_router_step() {
3843        let err = RouteBuilder::from("direct:start")
3844            .route_id("canonical-dyn-router")
3845            .dynamic_router(Arc::new(|_| Some("mock:a".to_string())))
3846            .build_canonical()
3847            .unwrap_err();
3848
3849        assert!(format!("{err}").contains("does not support step `dynamic_router`"));
3850    }
3851
3852    #[test]
3853    fn test_build_canonical_rejects_routing_slip_step() {
3854        let err = RouteBuilder::from("direct:start")
3855            .route_id("canonical-routing-slip")
3856            .routing_slip(Arc::new(|_| Some("mock:a".to_string())))
3857            .build_canonical()
3858            .unwrap_err();
3859
3860        assert!(format!("{err}").contains("does not support step `routing_slip`"));
3861    }
3862
3863    #[test]
3864    fn test_build_canonical_rejects_recipient_list_step() {
3865        let err = RouteBuilder::from("direct:start")
3866            .route_id("canonical-recipient")
3867            .recipient_list(Arc::new(|_| "mock:a".to_string()))
3868            .build_canonical()
3869            .unwrap_err();
3870
3871        assert!(format!("{err}").contains("does not support step `recipient_list`"));
3872    }
3873
3874    // ── extract_completion_fields: Any mode with predicate → error ──────────────
3875
3876    #[test]
3877    fn test_build_canonical_rejects_any_mode_with_predicate() {
3878        let err = RouteBuilder::from("direct:start")
3879            .route_id("canonical-any-pred")
3880            .aggregate(AggregatorConfig {
3881                header_name: "key".to_string(),
3882                completion: CompletionMode::Any(vec![
3883                    CompletionCondition::Size(5),
3884                    CompletionCondition::Predicate(Arc::new(|_| false)),
3885                ]),
3886                correlation: CorrelationStrategy::HeaderName("key".to_string()),
3887                strategy: AggregationStrategy::CollectAll,
3888                max_buckets: None,
3889                bucket_ttl: None,
3890                force_completion_on_stop: false,
3891                discard_on_timeout: false,
3892            })
3893            .build_canonical()
3894            .unwrap_err();
3895
3896        assert!(format!("{err}").contains("predicate completion"));
3897    }
3898}