Skip to main content

camel_builder/
lib.rs

1use camel_api::aggregator::{
2    AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
3};
4use camel_api::body::Body;
5use camel_api::body_converter::BodyType;
6use camel_api::circuit_breaker::CircuitBreakerConfig;
7use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
8use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
9use camel_api::load_balancer::LoadBalancerConfig;
10use camel_api::multicast::{MulticastConfig, MulticastStrategy};
11use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
12use camel_api::splitter::SplitterConfig;
13use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
14use camel_api::DelayConfig;
15use camel_api::{
16    BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
17    ProcessorFn, Value,
18    runtime::{
19        CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
20        CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
21        CanonicalWhenSpec,
22    },
23};
24use camel_component_api::ConcurrencyModel;
25use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
26use camel_processor::{
27    ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
28    UnmarshalService, builtin_data_format,
29};
30
31/// Shared step-accumulation methods for all builder types.
32///
33/// Implementors provide `steps_mut()` and get step-adding methods for free.
34/// `filter()` and other branching methods are NOT included — they return
35/// different types per builder and stay as per-builder methods.
36pub trait StepAccumulator: Sized {
37    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
38
39    fn to(mut self, endpoint: impl Into<String>) -> Self {
40        self.steps_mut().push(BuilderStep::To(endpoint.into()));
41        self
42    }
43
44    fn process<F, Fut>(mut self, f: F) -> Self
45    where
46        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
47        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
48    {
49        let svc = ProcessorFn::new(f);
50        self.steps_mut()
51            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
52        self
53    }
54
55    fn process_fn(mut self, processor: BoxProcessor) -> Self {
56        self.steps_mut().push(BuilderStep::Processor(processor));
57        self
58    }
59
60    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
61        let svc = SetHeader::new(IdentityProcessor, key, value);
62        self.steps_mut()
63            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
64        self
65    }
66
67    fn map_body<F>(mut self, mapper: F) -> Self
68    where
69        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
70    {
71        let svc = MapBody::new(IdentityProcessor, mapper);
72        self.steps_mut()
73            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
74        self
75    }
76
77    fn set_body<B>(mut self, body: B) -> Self
78    where
79        B: Into<Body> + Clone + Send + Sync + 'static,
80    {
81        let body: Body = body.into();
82        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
83        self.steps_mut()
84            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
85        self
86    }
87
88    /// Apache Camel-compatible alias for [`set_body`](Self::set_body).
89    ///
90    /// Transforms the message body using the given value. Semantically identical
91    /// to `set_body` — provided for familiarity with Apache Camel route DSLs.
92    fn transform<B>(self, body: B) -> Self
93    where
94        B: Into<Body> + Clone + Send + Sync + 'static,
95    {
96        self.set_body(body)
97    }
98
99    fn set_body_fn<F>(mut self, expr: F) -> Self
100    where
101        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
102    {
103        let svc = SetBody::new(IdentityProcessor, expr);
104        self.steps_mut()
105            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
106        self
107    }
108
109    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
110    where
111        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
112    {
113        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
114        self.steps_mut()
115            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
116        self
117    }
118
119    fn aggregate(mut self, config: AggregatorConfig) -> Self {
120        self.steps_mut().push(BuilderStep::Aggregate { config });
121        self
122    }
123
124    /// Stop processing this exchange immediately. No further steps in the
125    /// current pipeline will run.
126    ///
127    /// Can be used at any point in the route: directly on RouteBuilder,
128    /// inside `.filter()`, inside `.split()`, etc.
129    fn stop(mut self) -> Self {
130        self.steps_mut().push(BuilderStep::Stop);
131        self
132    }
133
134    fn delay(mut self, duration: std::time::Duration) -> Self {
135        self.steps_mut().push(BuilderStep::Delay {
136            config: DelayConfig::from_duration(duration),
137        });
138        self
139    }
140
141    fn delay_with_header(
142        mut self,
143        duration: std::time::Duration,
144        header: impl Into<String>,
145    ) -> Self {
146        self.steps_mut().push(BuilderStep::Delay {
147            config: DelayConfig::from_duration_with_header(duration, header),
148        });
149        self
150    }
151
152    /// Log a message at the specified level.
153    ///
154    /// The message will be logged when an exchange passes through this step.
155    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
156        self.steps_mut().push(BuilderStep::Log {
157            level,
158            message: message.into(),
159        });
160        self
161    }
162
163    /// Convert the message body to the target type.
164    ///
165    /// Supported: Text ↔ Json ↔ Bytes. `Body::Stream` always fails.
166    /// Returns `TypeConversionFailed` if conversion is not possible.
167    ///
168    /// # Example
169    /// ```ignore
170    /// route.set_body(Value::String(r#"{"x":1}"#.into()))
171    ///      .convert_body_to(BodyType::Json)
172    ///      .to("direct:next")
173    /// ```
174    fn convert_body_to(mut self, target: BodyType) -> Self {
175        let svc = ConvertBodyTo::new(IdentityProcessor, target);
176        self.steps_mut()
177            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
178        self
179    }
180
181    /// Marshal the message body using the specified data format.
182    ///
183    /// Supported formats: `"json"`, `"xml"`. Panics if the format name is unknown.
184    /// Converts a structured body (e.g., `Body::Json`) to a wire-format body (e.g., `Body::Text`).
185    ///
186    /// # Example
187    /// ```ignore
188    /// route.marshal("json").to("direct:next")
189    /// ```
190    fn marshal(mut self, format: impl Into<String>) -> Self {
191        let name = format.into();
192        let df =
193            builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
194        let svc = MarshalService::new(IdentityProcessor, df);
195        self.steps_mut()
196            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
197        self
198    }
199
200    /// Unmarshal the message body using the specified data format.
201    ///
202    /// Supported formats: `"json"`, `"xml"`. Panics if the format name is unknown.
203    /// Converts a wire-format body (e.g., `Body::Text`) to a structured body (e.g., `Body::Json`).
204    ///
205    /// # Example
206    /// ```ignore
207    /// route.unmarshal("json").to("direct:next")
208    /// ```
209    fn unmarshal(mut self, format: impl Into<String>) -> Self {
210        let name = format.into();
211        let df =
212            builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
213        let svc = UnmarshalService::new(IdentityProcessor, df);
214        self.steps_mut()
215            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
216        self
217    }
218
219    /// Execute a script that can modify the exchange (headers, properties, body).
220    ///
221    /// The script has access to `headers`, `properties`, and `body` variables
222    /// and can modify them with assignment syntax: `headers["k"] = v`.
223    ///
224    /// # Example
225    /// ```ignore
226    /// // ignore: requires full CamelContext setup with registered language
227    /// route.script("rhai", r#"headers["tenant"] = "acme"; body = body + "_processed""#)
228    /// ```
229    fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
230        self.steps_mut().push(BuilderStep::Script {
231            language: language.into(),
232            script: script.into(),
233        });
234        self
235    }
236}
237
238/// A fluent builder for constructing routes.
239///
240/// # Example
241///
242/// ```ignore
243/// let definition = RouteBuilder::from("timer:tick?period=1000")
244///     .set_header("source", Value::String("timer".into()))
245///     .filter(|ex| ex.input.body.as_text().is_some())
246///     .to("log:info?showHeaders=true")
247///     .build()?;
248/// ```
249pub struct RouteBuilder {
250    from_uri: String,
251    steps: Vec<BuilderStep>,
252    error_handler: Option<ErrorHandlerConfig>,
253    error_handler_mode: ErrorHandlerMode,
254    circuit_breaker_config: Option<CircuitBreakerConfig>,
255    concurrency: Option<ConcurrencyModel>,
256    route_id: Option<String>,
257    auto_startup: Option<bool>,
258    startup_order: Option<i32>,
259}
260
261#[derive(Default)]
262enum ErrorHandlerMode {
263    #[default]
264    None,
265    ExplicitConfig,
266    Shorthand {
267        dlc_uri: Option<String>,
268        specs: Vec<OnExceptionSpec>,
269    },
270    Mixed,
271}
272
273#[derive(Clone)]
274struct OnExceptionSpec {
275    matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
276    retry: Option<RedeliveryPolicy>,
277    handled_by: Option<String>,
278}
279
280impl RouteBuilder {
281    /// Start building a route from the given source endpoint URI.
282    pub fn from(endpoint: &str) -> Self {
283        Self {
284            from_uri: endpoint.to_string(),
285            steps: Vec::new(),
286            error_handler: None,
287            error_handler_mode: ErrorHandlerMode::None,
288            circuit_breaker_config: None,
289            concurrency: None,
290            route_id: None,
291            auto_startup: None,
292            startup_order: None,
293        }
294    }
295
296    /// Open a filter scope. Only exchanges matching `predicate` will be processed
297    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
298    /// and continue to steps after `.end_filter()`.
299    pub fn filter<F>(self, predicate: F) -> FilterBuilder
300    where
301        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
302    {
303        FilterBuilder {
304            parent: self,
305            predicate: std::sync::Arc::new(predicate),
306            steps: vec![],
307        }
308    }
309
310    /// Open a choice scope for content-based routing.
311    ///
312    /// Within the choice, you can define multiple `.when()` clauses and an
313    /// optional `.otherwise()` clause. The first matching `when` predicate
314    /// determines which sub-pipeline executes.
315    pub fn choice(self) -> ChoiceBuilder {
316        ChoiceBuilder {
317            parent: self,
318            whens: vec![],
319            _otherwise: None,
320        }
321    }
322
323    /// Add a WireTap step that sends a clone of the exchange to the given
324    /// endpoint URI (fire-and-forget). The original exchange continues
325    /// downstream unchanged.
326    pub fn wire_tap(mut self, endpoint: &str) -> Self {
327        self.steps.push(BuilderStep::WireTap {
328            uri: endpoint.to_string(),
329        });
330        self
331    }
332
333    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
334    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
335        self.error_handler_mode = match self.error_handler_mode {
336            ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
337                ErrorHandlerMode::ExplicitConfig
338            }
339            ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
340        };
341        self.error_handler = Some(config);
342        self
343    }
344
345    /// Set a dead letter channel URI for shorthand error handler mode.
346    pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
347        let uri = uri.into();
348        self.error_handler_mode = match self.error_handler_mode {
349            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
350                dlc_uri: Some(uri),
351                specs: Vec::new(),
352            },
353            ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
354                dlc_uri: Some(uri),
355                specs,
356            },
357            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
358        };
359        self
360    }
361
362    /// Add a shorthand exception policy scope. Call `.end_on_exception()` to return to route builder.
363    pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
364    where
365        F: Fn(&CamelError) -> bool + Send + Sync + 'static,
366    {
367        self.error_handler_mode = match self.error_handler_mode {
368            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
369                dlc_uri: None,
370                specs: Vec::new(),
371            },
372            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
373            shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
374        };
375
376        OnExceptionBuilder {
377            parent: self,
378            policy: OnExceptionSpec {
379                matches: std::sync::Arc::new(matches),
380                retry: None,
381                handled_by: None,
382            },
383        }
384    }
385
386    /// Set a circuit breaker for this route.
387    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
388        self.circuit_breaker_config = Some(config);
389        self
390    }
391
392    /// Override the consumer's default concurrency model.
393    ///
394    /// When set, the pipeline spawns a task per exchange, processing them
395    /// concurrently. `max` limits the number of simultaneously active
396    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
397    ///
398    /// # Example
399    /// ```ignore
400    /// RouteBuilder::from("http://0.0.0.0:8080/api")
401    ///     .concurrent(16)  // max 16 in-flight pipeline executions
402    ///     .process(handle_request)
403    ///     .build()
404    /// ```
405    pub fn concurrent(mut self, max: usize) -> Self {
406        let max = if max == 0 { None } else { Some(max) };
407        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
408        self
409    }
410
411    /// Force sequential processing, overriding a concurrent-capable consumer.
412    ///
413    /// Useful for HTTP routes that mutate shared state and need ordering
414    /// guarantees.
415    pub fn sequential(mut self) -> Self {
416        self.concurrency = Some(ConcurrencyModel::Sequential);
417        self
418    }
419
420    /// Set the route ID for this route.
421    ///
422    /// If not set, the route will be assigned an auto-generated ID.
423    pub fn route_id(mut self, id: impl Into<String>) -> Self {
424        self.route_id = Some(id.into());
425        self
426    }
427
428    /// Set whether this route should automatically start when the context starts.
429    ///
430    /// Default is `true`.
431    pub fn auto_startup(mut self, auto: bool) -> Self {
432        self.auto_startup = Some(auto);
433        self
434    }
435
436    /// Set the startup order for this route.
437    ///
438    /// Routes with lower values start first. Default is 1000.
439    pub fn startup_order(mut self, order: i32) -> Self {
440        self.startup_order = Some(order);
441        self
442    }
443
444    /// Begin a Splitter sub-pipeline. Steps added after this call (until
445    /// `.end_split()`) will be executed per-fragment.
446    ///
447    /// Returns a `SplitBuilder` — you cannot call `.build()` until
448    /// `.end_split()` closes the split scope (enforced by the type system).
449    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
450        SplitBuilder {
451            parent: self,
452            config,
453            steps: Vec::new(),
454        }
455    }
456
457    /// Begin a Multicast sub-pipeline. Steps added after this call (until
458    /// `.end_multicast()`) will each receive a copy of the exchange.
459    ///
460    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
461    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
462    pub fn multicast(self) -> MulticastBuilder {
463        MulticastBuilder {
464            parent: self,
465            steps: Vec::new(),
466            config: MulticastConfig::new(),
467        }
468    }
469
470    /// Begin a Throttle sub-pipeline. Rate limits message processing to at most
471    /// `max_requests` per `period`. Steps inside the throttle scope are only
472    /// executed when the rate limit allows.
473    ///
474    /// Returns a `ThrottleBuilder` — you cannot call `.build()` until
475    /// `.end_throttle()` closes the throttle scope (enforced by the type system).
476    pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
477        ThrottleBuilder {
478            parent: self,
479            config: ThrottlerConfig::new(max_requests, period),
480            steps: Vec::new(),
481        }
482    }
483
484    /// Begin a LoadBalance sub-pipeline. Distributes exchanges across multiple
485    /// endpoints using a configurable strategy (round-robin, random, weighted, failover).
486    ///
487    /// Returns a `LoadBalancerBuilder` — you cannot call `.build()` until
488    /// `.end_load_balance()` closes the load balance scope (enforced by the type system).
489    pub fn load_balance(self) -> LoadBalancerBuilder {
490        LoadBalancerBuilder {
491            parent: self,
492            config: LoadBalancerConfig::round_robin(),
493            steps: Vec::new(),
494        }
495    }
496
497    /// Add a dynamic router step that routes exchanges dynamically based on
498    /// expression evaluation at runtime.
499    ///
500    /// The expression receives the exchange and returns `Some(uri)` to route to
501    /// the next endpoint, or `None` to stop routing.
502    ///
503    /// # Example
504    /// ```ignore
505    /// RouteBuilder::from("timer:tick")
506    ///     .route_id("test-route")
507    ///     .dynamic_router(|ex| {
508    ///         ex.input.header("dest").and_then(|v| v.as_str().map(|s| s.to_string()))
509    ///     })
510    ///     .build()
511    /// ```
512    pub fn dynamic_router(self, expression: RouterExpression) -> Self {
513        self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
514    }
515
516    /// Add a dynamic router step with full configuration.
517    ///
518    /// Allows customization of URI delimiter, cache size, timeout, and other options.
519    pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
520        self.steps.push(BuilderStep::DynamicRouter { config });
521        self
522    }
523
524    pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
525        self.routing_slip_with_config(RoutingSlipConfig::new(expression))
526    }
527
528    pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
529        self.steps.push(BuilderStep::RoutingSlip { config });
530        self
531    }
532
533    /// Consume the builder and produce a [`RouteDefinition`].
534    pub fn build(self) -> Result<RouteDefinition, CamelError> {
535        if self.from_uri.is_empty() {
536            return Err(CamelError::RouteError(
537                "route must have a 'from' URI".to_string(),
538            ));
539        }
540        let route_id = self.route_id.ok_or_else(|| {
541            CamelError::RouteError(
542                "route must have a 'route_id' — call .route_id(\"name\") on the builder"
543                    .to_string(),
544            )
545        })?;
546        let resolved_error_handler = match self.error_handler_mode {
547            ErrorHandlerMode::None => self.error_handler,
548            ErrorHandlerMode::ExplicitConfig => self.error_handler,
549            ErrorHandlerMode::Mixed => {
550                return Err(CamelError::RouteError(
551                    "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
552                ));
553            }
554            ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
555                let mut config = if let Some(uri) = dlc_uri {
556                    ErrorHandlerConfig::dead_letter_channel(uri)
557                } else {
558                    ErrorHandlerConfig::log_only()
559                };
560
561                for spec in specs {
562                    let matcher = spec.matches.clone();
563                    let mut builder = config.on_exception(move |e| matcher(e));
564
565                    if let Some(retry) = spec.retry {
566                        builder = builder.retry(retry.max_attempts).with_backoff(
567                            retry.initial_delay,
568                            retry.multiplier,
569                            retry.max_delay,
570                        );
571                        if retry.jitter_factor > 0.0 {
572                            builder = builder.with_jitter(retry.jitter_factor);
573                        }
574                    }
575
576                    if let Some(uri) = spec.handled_by {
577                        builder = builder.handled_by(uri);
578                    }
579
580                    config = builder.build();
581                }
582
583                Some(config)
584            }
585        };
586
587        let definition = RouteDefinition::new(self.from_uri, self.steps);
588        let definition = if let Some(eh) = resolved_error_handler {
589            definition.with_error_handler(eh)
590        } else {
591            definition
592        };
593        let definition = if let Some(cb) = self.circuit_breaker_config {
594            definition.with_circuit_breaker(cb)
595        } else {
596            definition
597        };
598        let definition = if let Some(concurrency) = self.concurrency {
599            definition.with_concurrency(concurrency)
600        } else {
601            definition
602        };
603        let definition = definition.with_route_id(route_id);
604        let definition = if let Some(auto) = self.auto_startup {
605            definition.with_auto_startup(auto)
606        } else {
607            definition
608        };
609        let definition = if let Some(order) = self.startup_order {
610            definition.with_startup_order(order)
611        } else {
612            definition
613        };
614        Ok(definition)
615    }
616
617    /// Compile this builder route into canonical v1 spec.
618    pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
619        if self.from_uri.is_empty() {
620            return Err(CamelError::RouteError(
621                "route must have a 'from' URI".to_string(),
622            ));
623        }
624        let route_id = self.route_id.ok_or_else(|| {
625            CamelError::RouteError(
626                "route must have a 'route_id' — call .route_id(\"name\") on the builder"
627                    .to_string(),
628            )
629        })?;
630
631        let steps = canonicalize_steps(self.steps)?;
632        let circuit_breaker = self
633            .circuit_breaker_config
634            .map(canonicalize_circuit_breaker);
635
636        let spec = CanonicalRouteSpec {
637            route_id,
638            from: self.from_uri,
639            steps,
640            circuit_breaker,
641            version: camel_api::CANONICAL_CONTRACT_VERSION,
642        };
643        spec.validate_contract()?;
644        Ok(spec)
645    }
646}
647
648pub struct OnExceptionBuilder {
649    parent: RouteBuilder,
650    policy: OnExceptionSpec,
651}
652
653impl OnExceptionBuilder {
654    pub fn retry(mut self, max_attempts: u32) -> Self {
655        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
656        self
657    }
658
659    pub fn with_backoff(
660        mut self,
661        initial: std::time::Duration,
662        multiplier: f64,
663        max: std::time::Duration,
664    ) -> Self {
665        if let Some(ref mut retry) = self.policy.retry {
666            retry.initial_delay = initial;
667            retry.multiplier = multiplier;
668            retry.max_delay = max;
669        }
670        self
671    }
672
673    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
674        if let Some(ref mut retry) = self.policy.retry {
675            retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
676        }
677        self
678    }
679
680    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
681        self.policy.handled_by = Some(uri.into());
682        self
683    }
684
685    pub fn end_on_exception(mut self) -> RouteBuilder {
686        if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
687            specs.push(self.policy);
688        }
689        self.parent
690    }
691}
692
693fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
694    let mut canonical = Vec::with_capacity(steps.len());
695    for step in steps {
696        canonical.push(canonicalize_step(step)?);
697    }
698    Ok(canonical)
699}
700
701fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
702    match step {
703        BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
704        BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
705        BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
706        BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
707        BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
708            delay_ms: config.delay_ms,
709            dynamic_header: config.dynamic_header,
710        }),
711        BuilderStep::DeclarativeScript { expression } => {
712            Ok(CanonicalStepSpec::Script { expression })
713        }
714        BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
715            predicate,
716            steps: canonicalize_steps(steps)?,
717        }),
718        BuilderStep::DeclarativeChoice { whens, otherwise } => {
719            let mut canonical_whens = Vec::with_capacity(whens.len());
720            for DeclarativeWhenStep { predicate, steps } in whens {
721                canonical_whens.push(CanonicalWhenSpec {
722                    predicate,
723                    steps: canonicalize_steps(steps)?,
724                });
725            }
726            let otherwise = match otherwise {
727                Some(steps) => Some(canonicalize_steps(steps)?),
728                None => None,
729            };
730            Ok(CanonicalStepSpec::Choice {
731                whens: canonical_whens,
732                otherwise,
733            })
734        }
735        BuilderStep::DeclarativeSplit {
736            expression,
737            aggregation,
738            parallel,
739            parallel_limit,
740            stop_on_exception,
741            steps,
742        } => Ok(CanonicalStepSpec::Split {
743            expression: CanonicalSplitExpressionSpec::Language(expression),
744            aggregation: canonicalize_split_aggregation(aggregation)?,
745            parallel,
746            parallel_limit,
747            stop_on_exception,
748            steps: canonicalize_steps(steps)?,
749        }),
750        BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate {
751            config: canonicalize_aggregate(config)?,
752        }),
753        other => {
754            let step_name = canonical_step_name(&other);
755            let detail = camel_api::canonical_contract_rejection_reason(step_name)
756                .unwrap_or("not included in canonical v1");
757            Err(CamelError::RouteError(format!(
758                "canonical v1 does not support step `{step_name}`: {detail}"
759            )))
760        }
761    }
762}
763
764fn canonicalize_split_aggregation(
765    strategy: camel_api::splitter::AggregationStrategy,
766) -> Result<CanonicalSplitAggregationSpec, CamelError> {
767    match strategy {
768        camel_api::splitter::AggregationStrategy::LastWins => {
769            Ok(CanonicalSplitAggregationSpec::LastWins)
770        }
771        camel_api::splitter::AggregationStrategy::CollectAll => {
772            Ok(CanonicalSplitAggregationSpec::CollectAll)
773        }
774        camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
775            "canonical v1 does not support custom split aggregation".to_string(),
776        )),
777        camel_api::splitter::AggregationStrategy::Original => {
778            Ok(CanonicalSplitAggregationSpec::Original)
779        }
780    }
781}
782
783fn extract_completion_fields(
784    mode: &CompletionMode,
785) -> Result<(Option<usize>, Option<u64>), CamelError> {
786    match mode {
787        CompletionMode::Single(cond) => match cond {
788            CompletionCondition::Size(n) => Ok((Some(*n), None)),
789            CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
790            CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
791                "canonical v1 does not support aggregate predicate completion".to_string(),
792            )),
793        },
794        CompletionMode::Any(conds) => {
795            let mut size = None;
796            let mut timeout_ms = None;
797            for cond in conds {
798                match cond {
799                    CompletionCondition::Size(n) => size = Some(*n),
800                    CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
801                    CompletionCondition::Predicate(_) => {
802                        return Err(CamelError::RouteError(
803                            "canonical v1 does not support aggregate predicate completion"
804                                .to_string(),
805                        ));
806                    }
807                }
808            }
809            Ok((size, timeout_ms))
810        }
811    }
812}
813
814fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
815    let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
816
817    let header = match &config.correlation {
818        CorrelationStrategy::HeaderName(h) => h.clone(),
819        CorrelationStrategy::Expression { expr, .. } => expr.clone(),
820        CorrelationStrategy::Fn(_) => {
821            return Err(CamelError::RouteError(
822                "canonical v1 does not support Fn correlation strategy".to_string(),
823            ));
824        }
825    };
826
827    let correlation_key = match &config.correlation {
828        CorrelationStrategy::HeaderName(_) => None,
829        CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
830        CorrelationStrategy::Fn(_) => unreachable!(),
831    };
832
833    let strategy = match config.strategy {
834        AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
835        AggregationStrategy::Custom(_) => {
836            return Err(CamelError::RouteError(
837                "canonical v1 does not support custom aggregate strategy".to_string(),
838            ));
839        }
840    };
841    let bucket_ttl_ms = config
842        .bucket_ttl
843        .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
844
845    Ok(CanonicalAggregateSpec {
846        header,
847        completion_size,
848        completion_timeout_ms,
849        correlation_key,
850        force_completion_on_stop: if config.force_completion_on_stop {
851            Some(true)
852        } else {
853            None
854        },
855        discard_on_timeout: if config.discard_on_timeout {
856            Some(true)
857        } else {
858            None
859        },
860        strategy,
861        max_buckets: config.max_buckets,
862        bucket_ttl_ms,
863    })
864}
865
866fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
867    CanonicalCircuitBreakerSpec {
868        failure_threshold: config.failure_threshold,
869        open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
870    }
871}
872
873fn canonical_step_name(step: &BuilderStep) -> &'static str {
874    match step {
875        BuilderStep::Processor(_) => "processor",
876        BuilderStep::To(_) => "to",
877        BuilderStep::Stop => "stop",
878        BuilderStep::Log { .. } => "log",
879        BuilderStep::DeclarativeSetHeader { .. } => "set_header",
880        BuilderStep::DeclarativeSetBody { .. } => "set_body",
881        BuilderStep::DeclarativeFilter { .. } => "filter",
882        BuilderStep::DeclarativeChoice { .. } => "choice",
883        BuilderStep::DeclarativeScript { .. } => "script",
884        BuilderStep::DeclarativeSplit { .. } => "split",
885        BuilderStep::Split { .. } => "split",
886        BuilderStep::Aggregate { .. } => "aggregate",
887        BuilderStep::Filter { .. } => "filter",
888        BuilderStep::Choice { .. } => "choice",
889        BuilderStep::WireTap { .. } => "wire_tap",
890        BuilderStep::Delay { .. } => "delay",
891        BuilderStep::Multicast { .. } => "multicast",
892        BuilderStep::DeclarativeLog { .. } => "log",
893        BuilderStep::Bean { .. } => "bean",
894        BuilderStep::Script { .. } => "script",
895        BuilderStep::Throttle { .. } => "throttle",
896        BuilderStep::LoadBalance { .. } => "load_balancer",
897        BuilderStep::DynamicRouter { .. } => "dynamic_router",
898        BuilderStep::RoutingSlip { .. } => "routing_slip",
899        BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
900        BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
901    }
902}
903
904impl StepAccumulator for RouteBuilder {
905    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
906        &mut self.steps
907    }
908}
909
910/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
911///
912/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
913/// but NOT `.build()` and NOT `.split()` (no nested splits).
914///
915/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
916/// and returns the parent `RouteBuilder`.
917pub struct SplitBuilder {
918    parent: RouteBuilder,
919    config: SplitterConfig,
920    steps: Vec<BuilderStep>,
921}
922
923impl SplitBuilder {
924    /// Open a filter scope within the split sub-pipeline.
925    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
926    where
927        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
928    {
929        FilterInSplitBuilder {
930            parent: self,
931            predicate: std::sync::Arc::new(predicate),
932            steps: vec![],
933        }
934    }
935
936    /// Close the split scope. Packages the accumulated sub-steps into a
937    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
938    pub fn end_split(mut self) -> RouteBuilder {
939        let split_step = BuilderStep::Split {
940            config: self.config,
941            steps: self.steps,
942        };
943        self.parent.steps.push(split_step);
944        self.parent
945    }
946}
947
948impl StepAccumulator for SplitBuilder {
949    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
950        &mut self.steps
951    }
952}
953
954/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
955pub struct FilterBuilder {
956    parent: RouteBuilder,
957    predicate: FilterPredicate,
958    steps: Vec<BuilderStep>,
959}
960
961impl FilterBuilder {
962    /// Close the filter scope. Packages the accumulated sub-steps into a
963    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
964    pub fn end_filter(mut self) -> RouteBuilder {
965        let step = BuilderStep::Filter {
966            predicate: self.predicate,
967            steps: self.steps,
968        };
969        self.parent.steps.push(step);
970        self.parent
971    }
972}
973
974impl StepAccumulator for FilterBuilder {
975    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
976        &mut self.steps
977    }
978}
979
980/// Builder for a filter scope nested inside a `.split()` block.
981pub struct FilterInSplitBuilder {
982    parent: SplitBuilder,
983    predicate: FilterPredicate,
984    steps: Vec<BuilderStep>,
985}
986
987impl FilterInSplitBuilder {
988    /// Close the filter scope and return the parent `SplitBuilder`.
989    pub fn end_filter(mut self) -> SplitBuilder {
990        let step = BuilderStep::Filter {
991            predicate: self.predicate,
992            steps: self.steps,
993        };
994        self.parent.steps.push(step);
995        self.parent
996    }
997}
998
999impl StepAccumulator for FilterInSplitBuilder {
1000    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1001        &mut self.steps
1002    }
1003}
1004
1005// ── Choice/When/Otherwise builders ─────────────────────────────────────────
1006
1007/// Builder for a `.choice()` ... `.end_choice()` block.
1008///
1009/// Accumulates `when` clauses and an optional `otherwise` clause.
1010/// Cannot call `.build()` until `.end_choice()` is called.
1011pub struct ChoiceBuilder {
1012    parent: RouteBuilder,
1013    whens: Vec<WhenStep>,
1014    _otherwise: Option<Vec<BuilderStep>>,
1015}
1016
1017impl ChoiceBuilder {
1018    /// Open a `when` clause. Only exchanges matching `predicate` will be
1019    /// processed by the steps inside the `.when()` ... `.end_when()` scope.
1020    pub fn when<F>(self, predicate: F) -> WhenBuilder
1021    where
1022        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1023    {
1024        WhenBuilder {
1025            parent: self,
1026            predicate: std::sync::Arc::new(predicate),
1027            steps: vec![],
1028        }
1029    }
1030
1031    /// Open an `otherwise` clause. Executed when no `when` predicate matched.
1032    ///
1033    /// Only one `otherwise` is allowed per `choice`. Call this after all `.when()` clauses.
1034    pub fn otherwise(self) -> OtherwiseBuilder {
1035        OtherwiseBuilder {
1036            parent: self,
1037            steps: vec![],
1038        }
1039    }
1040
1041    /// Close the choice scope. Packages all accumulated `when` clauses and
1042    /// optional `otherwise` into a `BuilderStep::Choice` and returns the
1043    /// parent `RouteBuilder`.
1044    pub fn end_choice(mut self) -> RouteBuilder {
1045        let step = BuilderStep::Choice {
1046            whens: self.whens,
1047            otherwise: self._otherwise,
1048        };
1049        self.parent.steps.push(step);
1050        self.parent
1051    }
1052}
1053
1054/// Builder for the sub-pipeline within a `.when()` ... `.end_when()` block.
1055pub struct WhenBuilder {
1056    parent: ChoiceBuilder,
1057    predicate: camel_api::FilterPredicate,
1058    steps: Vec<BuilderStep>,
1059}
1060
1061impl WhenBuilder {
1062    /// Close the when scope. Packages the accumulated sub-steps into a
1063    /// `WhenStep` and returns the parent `ChoiceBuilder`.
1064    pub fn end_when(mut self) -> ChoiceBuilder {
1065        self.parent.whens.push(WhenStep {
1066            predicate: self.predicate,
1067            steps: self.steps,
1068        });
1069        self.parent
1070    }
1071}
1072
1073impl StepAccumulator for WhenBuilder {
1074    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1075        &mut self.steps
1076    }
1077}
1078
1079/// Builder for the sub-pipeline within an `.otherwise()` ... `.end_otherwise()` block.
1080pub struct OtherwiseBuilder {
1081    parent: ChoiceBuilder,
1082    steps: Vec<BuilderStep>,
1083}
1084
1085impl OtherwiseBuilder {
1086    /// Close the otherwise scope and return the parent `ChoiceBuilder`.
1087    pub fn end_otherwise(self) -> ChoiceBuilder {
1088        let OtherwiseBuilder { mut parent, steps } = self;
1089        parent._otherwise = Some(steps);
1090        parent
1091    }
1092}
1093
1094impl StepAccumulator for OtherwiseBuilder {
1095    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1096        &mut self.steps
1097    }
1098}
1099
1100/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
1101///
1102/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1103/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
1104///
1105/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
1106/// and returns the parent `RouteBuilder`.
1107pub struct MulticastBuilder {
1108    parent: RouteBuilder,
1109    steps: Vec<BuilderStep>,
1110    config: MulticastConfig,
1111}
1112
1113impl MulticastBuilder {
1114    pub fn parallel(mut self, parallel: bool) -> Self {
1115        self.config = self.config.parallel(parallel);
1116        self
1117    }
1118
1119    pub fn parallel_limit(mut self, limit: usize) -> Self {
1120        self.config = self.config.parallel_limit(limit);
1121        self
1122    }
1123
1124    pub fn stop_on_exception(mut self, stop: bool) -> Self {
1125        self.config = self.config.stop_on_exception(stop);
1126        self
1127    }
1128
1129    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1130        self.config = self.config.timeout(duration);
1131        self
1132    }
1133
1134    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1135        self.config = self.config.aggregation(strategy);
1136        self
1137    }
1138
1139    pub fn end_multicast(mut self) -> RouteBuilder {
1140        let step = BuilderStep::Multicast {
1141            steps: self.steps,
1142            config: self.config,
1143        };
1144        self.parent.steps.push(step);
1145        self.parent
1146    }
1147}
1148
1149impl StepAccumulator for MulticastBuilder {
1150    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1151        &mut self.steps
1152    }
1153}
1154
1155/// Builder for the sub-pipeline within a `.throttle()` ... `.end_throttle()` block.
1156///
1157/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1158/// but NOT `.build()` and NOT `.throttle()` (no nested throttles).
1159///
1160/// Calling `.end_throttle()` packages the sub-steps into a `BuilderStep::Throttle`
1161/// and returns the parent `RouteBuilder`.
1162pub struct ThrottleBuilder {
1163    parent: RouteBuilder,
1164    config: ThrottlerConfig,
1165    steps: Vec<BuilderStep>,
1166}
1167
1168impl ThrottleBuilder {
1169    /// Set the throttle strategy. Default is `Delay`.
1170    ///
1171    /// - `Delay`: Queue messages until capacity available
1172    /// - `Reject`: Return error immediately when throttled
1173    /// - `Drop`: Silently discard excess messages
1174    pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1175        self.config = self.config.strategy(strategy);
1176        self
1177    }
1178
1179    /// Close the throttle scope. Packages the accumulated sub-steps into a
1180    /// `BuilderStep::Throttle` and returns the parent `RouteBuilder`.
1181    pub fn end_throttle(mut self) -> RouteBuilder {
1182        let step = BuilderStep::Throttle {
1183            config: self.config,
1184            steps: self.steps,
1185        };
1186        self.parent.steps.push(step);
1187        self.parent
1188    }
1189}
1190
1191impl StepAccumulator for ThrottleBuilder {
1192    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1193        &mut self.steps
1194    }
1195}
1196
1197/// Builder for the sub-pipeline within a `.load_balance()` ... `.end_load_balance()` block.
1198///
1199/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1200/// but NOT `.build()` and NOT `.load_balance()` (no nested load balancers).
1201///
1202/// Calling `.end_load_balance()` packages the sub-steps into a `BuilderStep::LoadBalance`
1203/// and returns the parent `RouteBuilder`.
1204pub struct LoadBalancerBuilder {
1205    parent: RouteBuilder,
1206    config: LoadBalancerConfig,
1207    steps: Vec<BuilderStep>,
1208}
1209
1210impl LoadBalancerBuilder {
1211    /// Set the load balance strategy to round-robin (default).
1212    pub fn round_robin(mut self) -> Self {
1213        self.config = LoadBalancerConfig::round_robin();
1214        self
1215    }
1216
1217    /// Set the load balance strategy to random selection.
1218    pub fn random(mut self) -> Self {
1219        self.config = LoadBalancerConfig::random();
1220        self
1221    }
1222
1223    /// Set the load balance strategy to weighted selection.
1224    ///
1225    /// Each endpoint is assigned a weight that determines its probability
1226    /// of being selected.
1227    pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1228        self.config = LoadBalancerConfig::weighted(weights);
1229        self
1230    }
1231
1232    /// Set the load balance strategy to failover.
1233    ///
1234    /// Exchanges are sent to the first endpoint; on failure, the next endpoint
1235    /// is tried.
1236    pub fn failover(mut self) -> Self {
1237        self.config = LoadBalancerConfig::failover();
1238        self
1239    }
1240
1241    /// Enable or disable parallel execution of endpoints.
1242    ///
1243    /// When enabled, all endpoints receive the exchange simultaneously.
1244    /// When disabled (default), only one endpoint is selected per exchange.
1245    pub fn parallel(mut self, parallel: bool) -> Self {
1246        self.config = self.config.parallel(parallel);
1247        self
1248    }
1249
1250    /// Close the load balance scope. Packages the accumulated sub-steps into a
1251    /// `BuilderStep::LoadBalance` and returns the parent `RouteBuilder`.
1252    pub fn end_load_balance(mut self) -> RouteBuilder {
1253        let step = BuilderStep::LoadBalance {
1254            config: self.config,
1255            steps: self.steps,
1256        };
1257        self.parent.steps.push(step);
1258        self.parent
1259    }
1260}
1261
1262impl StepAccumulator for LoadBalancerBuilder {
1263    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1264        &mut self.steps
1265    }
1266}
1267
1268// ---------------------------------------------------------------------------
1269// Tests
1270// ---------------------------------------------------------------------------
1271
1272#[cfg(test)]
1273mod tests {
1274    use super::*;
1275    use camel_api::error_handler::ErrorHandlerConfig;
1276    use camel_api::load_balancer::LoadBalanceStrategy;
1277    use camel_api::{Exchange, Message};
1278    use camel_core::route::BuilderStep;
1279    use std::sync::Arc;
1280    use std::time::Duration;
1281    use tower::{Service, ServiceExt};
1282
1283    #[test]
1284    fn test_builder_from_creates_definition() {
1285        let definition = RouteBuilder::from("timer:tick")
1286            .route_id("test-route")
1287            .build()
1288            .unwrap();
1289        assert_eq!(definition.from_uri(), "timer:tick");
1290    }
1291
1292    #[test]
1293    fn test_builder_empty_from_uri_errors() {
1294        let result = RouteBuilder::from("").route_id("test-route").build();
1295        assert!(result.is_err());
1296    }
1297
1298    #[test]
1299    fn test_builder_to_adds_step() {
1300        let definition = RouteBuilder::from("timer:tick")
1301            .route_id("test-route")
1302            .to("log:info")
1303            .build()
1304            .unwrap();
1305
1306        assert_eq!(definition.from_uri(), "timer:tick");
1307        // We can verify steps were added by checking the structure
1308        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1309    }
1310
1311    #[test]
1312    fn test_builder_filter_adds_filter_step() {
1313        let definition = RouteBuilder::from("timer:tick")
1314            .route_id("test-route")
1315            .filter(|_ex| true)
1316            .to("mock:result")
1317            .end_filter()
1318            .build()
1319            .unwrap();
1320
1321        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1322    }
1323
1324    #[test]
1325    fn test_builder_set_header_adds_processor_step() {
1326        let definition = RouteBuilder::from("timer:tick")
1327            .route_id("test-route")
1328            .set_header("key", Value::String("value".into()))
1329            .build()
1330            .unwrap();
1331
1332        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1333    }
1334
1335    #[test]
1336    fn test_builder_map_body_adds_processor_step() {
1337        let definition = RouteBuilder::from("timer:tick")
1338            .route_id("test-route")
1339            .map_body(|body| body)
1340            .build()
1341            .unwrap();
1342
1343        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1344    }
1345
1346    #[test]
1347    fn test_builder_process_adds_processor_step() {
1348        let definition = RouteBuilder::from("timer:tick")
1349            .route_id("test-route")
1350            .process(|ex| async move { Ok(ex) })
1351            .build()
1352            .unwrap();
1353
1354        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1355    }
1356
1357    #[test]
1358    fn test_builder_chain_multiple_steps() {
1359        let definition = RouteBuilder::from("timer:tick")
1360            .route_id("test-route")
1361            .set_header("source", Value::String("timer".into()))
1362            .filter(|ex| ex.input.header("source").is_some())
1363            .to("log:info")
1364            .end_filter()
1365            .to("mock:result")
1366            .build()
1367            .unwrap();
1368
1369        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
1370        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
1371        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
1372        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1373    }
1374
1375    // -----------------------------------------------------------------------
1376    // Processor behavior tests — exercise the real Tower services directly
1377    // -----------------------------------------------------------------------
1378
1379    #[tokio::test]
1380    async fn test_set_header_processor_works() {
1381        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1382        let exchange = Exchange::new(Message::new("test"));
1383        let result = svc.call(exchange).await.unwrap();
1384        assert_eq!(
1385            result.input.header("greeting"),
1386            Some(&Value::String("hello".into()))
1387        );
1388    }
1389
1390    #[tokio::test]
1391    async fn test_filter_processor_passes() {
1392        use camel_api::BoxProcessorExt;
1393        use camel_processor::FilterService;
1394
1395        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1396        let mut svc =
1397            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1398        let exchange = Exchange::new(Message::new("pass"));
1399        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1400        assert_eq!(result.input.body.as_text(), Some("pass"));
1401    }
1402
1403    #[tokio::test]
1404    async fn test_filter_processor_blocks() {
1405        use camel_api::BoxProcessorExt;
1406        use camel_processor::FilterService;
1407
1408        let sub = BoxProcessor::from_fn(|_ex| {
1409            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1410        });
1411        let mut svc =
1412            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1413        let exchange = Exchange::new(Message::new("reject"));
1414        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1415        assert_eq!(result.input.body.as_text(), Some("reject"));
1416    }
1417
1418    #[tokio::test]
1419    async fn test_map_body_processor_works() {
1420        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1421            if let Some(text) = body.as_text() {
1422                Body::Text(text.to_uppercase())
1423            } else {
1424                body
1425            }
1426        });
1427        let exchange = Exchange::new(Message::new("hello"));
1428        let result = mapper.oneshot(exchange).await.unwrap();
1429        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1430    }
1431
1432    #[tokio::test]
1433    async fn test_process_custom_processor_works() {
1434        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1435            ex.set_property("custom", Value::Bool(true));
1436            Ok(ex)
1437        });
1438        let exchange = Exchange::new(Message::default());
1439        let result = processor.oneshot(exchange).await.unwrap();
1440        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1441    }
1442
1443    // -----------------------------------------------------------------------
1444    // Sequential pipeline test
1445    // -----------------------------------------------------------------------
1446
1447    #[tokio::test]
1448    async fn test_compose_pipeline_runs_steps_in_order() {
1449        use camel_core::route::compose_pipeline;
1450
1451        let processors = vec![
1452            BoxProcessor::new(SetHeader::new(
1453                IdentityProcessor,
1454                "step",
1455                Value::String("one".into()),
1456            )),
1457            BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1458                if let Some(text) = body.as_text() {
1459                    Body::Text(format!("{}-processed", text))
1460                } else {
1461                    body
1462                }
1463            })),
1464        ];
1465
1466        let pipeline = compose_pipeline(processors);
1467        let exchange = Exchange::new(Message::new("hello"));
1468        let result = pipeline.oneshot(exchange).await.unwrap();
1469
1470        assert_eq!(
1471            result.input.header("step"),
1472            Some(&Value::String("one".into()))
1473        );
1474        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1475    }
1476
1477    #[tokio::test]
1478    async fn test_compose_pipeline_empty_is_identity() {
1479        use camel_core::route::compose_pipeline;
1480
1481        let pipeline = compose_pipeline(vec![]);
1482        let exchange = Exchange::new(Message::new("unchanged"));
1483        let result = pipeline.oneshot(exchange).await.unwrap();
1484        assert_eq!(result.input.body.as_text(), Some("unchanged"));
1485    }
1486
1487    // -----------------------------------------------------------------------
1488    // Circuit breaker builder tests
1489    // -----------------------------------------------------------------------
1490
1491    #[test]
1492    fn test_builder_circuit_breaker_sets_config() {
1493        use camel_api::circuit_breaker::CircuitBreakerConfig;
1494
1495        let config = CircuitBreakerConfig::new().failure_threshold(5);
1496        let definition = RouteBuilder::from("timer:tick")
1497            .route_id("test-route")
1498            .circuit_breaker(config)
1499            .build()
1500            .unwrap();
1501
1502        let cb = definition
1503            .circuit_breaker_config()
1504            .expect("circuit breaker should be set");
1505        assert_eq!(cb.failure_threshold, 5);
1506    }
1507
1508    #[test]
1509    fn test_builder_circuit_breaker_with_error_handler() {
1510        use camel_api::circuit_breaker::CircuitBreakerConfig;
1511        use camel_api::error_handler::ErrorHandlerConfig;
1512
1513        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1514        let eh_config = ErrorHandlerConfig::log_only();
1515
1516        let definition = RouteBuilder::from("timer:tick")
1517            .route_id("test-route")
1518            .to("log:info")
1519            .circuit_breaker(cb_config)
1520            .error_handler(eh_config)
1521            .build()
1522            .unwrap();
1523
1524        assert!(
1525            definition.circuit_breaker_config().is_some(),
1526            "circuit breaker config should be set"
1527        );
1528        // Route definition was built successfully with both configs.
1529    }
1530
1531    #[test]
1532    fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1533        let definition = RouteBuilder::from("direct:start")
1534            .route_id("test-route")
1535            .dead_letter_channel("log:dlc")
1536            .on_exception(|e| matches!(e, CamelError::Io(_)))
1537            .retry(3)
1538            .handled_by("log:io")
1539            .end_on_exception()
1540            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1541            .retry(1)
1542            .end_on_exception()
1543            .to("mock:out")
1544            .build()
1545            .expect("route should build");
1546
1547        let cfg = definition
1548            .error_handler_config()
1549            .expect("error handler should be set");
1550        assert_eq!(cfg.policies.len(), 2);
1551        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1552        assert_eq!(
1553            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1554            Some(3)
1555        );
1556        assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1557        assert_eq!(
1558            cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1559            Some(1)
1560        );
1561    }
1562
1563    #[test]
1564    fn test_builder_on_exception_mixed_mode_rejected() {
1565        let result = RouteBuilder::from("direct:start")
1566            .route_id("test-route")
1567            .error_handler(ErrorHandlerConfig::log_only())
1568            .on_exception(|_e| true)
1569            .end_on_exception()
1570            .to("mock:out")
1571            .build();
1572
1573        let err = result.err().expect("mixed mode should fail with an error");
1574
1575        assert!(
1576            format!("{err}").contains("mixed error handler modes"),
1577            "unexpected error: {err}"
1578        );
1579    }
1580
1581    #[test]
1582    fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1583        let definition = RouteBuilder::from("direct:start")
1584            .route_id("test-route")
1585            .on_exception(|_e| true)
1586            .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1587            .with_jitter(0.5)
1588            .end_on_exception()
1589            .to("mock:out")
1590            .build()
1591            .expect("route should build");
1592
1593        let cfg = definition
1594            .error_handler_config()
1595            .expect("error handler should be set");
1596        assert_eq!(cfg.policies.len(), 1);
1597        assert!(cfg.policies[0].retry.is_none());
1598    }
1599
1600    #[test]
1601    fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1602        let definition = RouteBuilder::from("direct:start")
1603            .route_id("test-route")
1604            .dead_letter_channel("log:dlc")
1605            .to("mock:out")
1606            .build()
1607            .expect("route should build");
1608
1609        let cfg = definition
1610            .error_handler_config()
1611            .expect("error handler should be set");
1612        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1613        assert!(cfg.policies.is_empty());
1614    }
1615
1616    #[test]
1617    fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1618        let definition = RouteBuilder::from("direct:start")
1619            .route_id("test-route")
1620            .dead_letter_channel("log:first")
1621            .on_exception(|e| matches!(e, CamelError::Io(_)))
1622            .retry(2)
1623            .end_on_exception()
1624            .dead_letter_channel("log:second")
1625            .to("mock:out")
1626            .build()
1627            .expect("route should build");
1628
1629        let cfg = definition
1630            .error_handler_config()
1631            .expect("error handler should be set");
1632        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1633        assert_eq!(cfg.policies.len(), 1);
1634        assert_eq!(
1635            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1636            Some(2)
1637        );
1638    }
1639
1640    #[test]
1641    fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
1642        let definition = RouteBuilder::from("direct:start")
1643            .route_id("test-route")
1644            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1645            .retry(1)
1646            .end_on_exception()
1647            .to("mock:out")
1648            .build()
1649            .expect("route should build");
1650
1651        let cfg = definition
1652            .error_handler_config()
1653            .expect("error handler should be set");
1654        assert!(cfg.dlc_uri.is_none());
1655        assert_eq!(cfg.policies.len(), 1);
1656    }
1657
1658    #[test]
1659    fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
1660        let first = ErrorHandlerConfig::dead_letter_channel("log:first");
1661        let second = ErrorHandlerConfig::dead_letter_channel("log:second");
1662
1663        let definition = RouteBuilder::from("direct:start")
1664            .route_id("test-route")
1665            .error_handler(first)
1666            .error_handler(second)
1667            .to("mock:out")
1668            .build()
1669            .expect("route should build");
1670
1671        let cfg = definition
1672            .error_handler_config()
1673            .expect("error handler should be set");
1674        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1675    }
1676
1677    // --- Splitter builder tests ---
1678
1679    #[test]
1680    fn test_split_builder_typestate() {
1681        use camel_api::splitter::{SplitterConfig, split_body_lines};
1682
1683        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
1684        let definition = RouteBuilder::from("timer:test?period=1000")
1685            .route_id("test-route")
1686            .split(SplitterConfig::new(split_body_lines()))
1687            .to("mock:per-fragment")
1688            .end_split()
1689            .to("mock:final")
1690            .build()
1691            .unwrap();
1692
1693        // Should have 2 top-level steps: Split + To("mock:final")
1694        assert_eq!(definition.steps().len(), 2);
1695    }
1696
1697    #[test]
1698    fn test_split_builder_steps_collected() {
1699        use camel_api::splitter::{SplitterConfig, split_body_lines};
1700
1701        let definition = RouteBuilder::from("timer:test?period=1000")
1702            .route_id("test-route")
1703            .split(SplitterConfig::new(split_body_lines()))
1704            .set_header("fragment", Value::String("yes".into()))
1705            .to("mock:per-fragment")
1706            .end_split()
1707            .build()
1708            .unwrap();
1709
1710        // Should have 1 top-level step: Split (containing 2 sub-steps)
1711        assert_eq!(definition.steps().len(), 1);
1712        match &definition.steps()[0] {
1713            BuilderStep::Split { steps, .. } => {
1714                assert_eq!(steps.len(), 2); // SetHeader + To
1715            }
1716            other => panic!("Expected Split, got {:?}", other),
1717        }
1718    }
1719
1720    #[test]
1721    fn test_split_builder_config_propagated() {
1722        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1723
1724        let definition = RouteBuilder::from("timer:test?period=1000")
1725            .route_id("test-route")
1726            .split(
1727                SplitterConfig::new(split_body_lines())
1728                    .parallel(true)
1729                    .parallel_limit(4)
1730                    .aggregation(AggregationStrategy::CollectAll),
1731            )
1732            .to("mock:per-fragment")
1733            .end_split()
1734            .build()
1735            .unwrap();
1736
1737        match &definition.steps()[0] {
1738            BuilderStep::Split { config, .. } => {
1739                assert!(config.parallel);
1740                assert_eq!(config.parallel_limit, Some(4));
1741                assert!(matches!(
1742                    config.aggregation,
1743                    AggregationStrategy::CollectAll
1744                ));
1745            }
1746            other => panic!("Expected Split, got {:?}", other),
1747        }
1748    }
1749
1750    #[test]
1751    fn test_aggregate_builder_adds_step() {
1752        use camel_api::aggregator::AggregatorConfig;
1753        use camel_core::route::BuilderStep;
1754
1755        let definition = RouteBuilder::from("timer:tick")
1756            .route_id("test-route")
1757            .aggregate(
1758                AggregatorConfig::correlate_by("key")
1759                    .complete_when_size(2)
1760                    .build(),
1761            )
1762            .build()
1763            .unwrap();
1764
1765        assert_eq!(definition.steps().len(), 1);
1766        assert!(matches!(
1767            definition.steps()[0],
1768            BuilderStep::Aggregate { .. }
1769        ));
1770    }
1771
1772    #[test]
1773    fn test_aggregate_in_split_builder() {
1774        use camel_api::aggregator::AggregatorConfig;
1775        use camel_api::splitter::{SplitterConfig, split_body_lines};
1776        use camel_core::route::BuilderStep;
1777
1778        let definition = RouteBuilder::from("timer:tick")
1779            .route_id("test-route")
1780            .split(SplitterConfig::new(split_body_lines()))
1781            .aggregate(
1782                AggregatorConfig::correlate_by("key")
1783                    .complete_when_size(1)
1784                    .build(),
1785            )
1786            .end_split()
1787            .build()
1788            .unwrap();
1789
1790        assert_eq!(definition.steps().len(), 1);
1791        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
1792            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
1793        } else {
1794            panic!("expected Split step");
1795        }
1796    }
1797
1798    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
1799
1800    #[test]
1801    fn test_builder_set_body_static_adds_processor() {
1802        let definition = RouteBuilder::from("timer:tick")
1803            .route_id("test-route")
1804            .set_body("fixed")
1805            .build()
1806            .unwrap();
1807        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1808    }
1809
1810    #[test]
1811    fn test_builder_set_body_fn_adds_processor() {
1812        let definition = RouteBuilder::from("timer:tick")
1813            .route_id("test-route")
1814            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
1815            .build()
1816            .unwrap();
1817        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1818    }
1819
1820    #[test]
1821    fn transform_alias_produces_same_as_set_body() {
1822        let route_transform = RouteBuilder::from("timer:tick")
1823            .route_id("test-route")
1824            .transform("hello")
1825            .build()
1826            .unwrap();
1827
1828        let route_set_body = RouteBuilder::from("timer:tick")
1829            .route_id("test-route")
1830            .set_body("hello")
1831            .build()
1832            .unwrap();
1833
1834        assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
1835    }
1836
1837    #[test]
1838    fn test_builder_set_header_fn_adds_processor() {
1839        let definition = RouteBuilder::from("timer:tick")
1840            .route_id("test-route")
1841            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
1842            .build()
1843            .unwrap();
1844        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1845    }
1846
1847    #[tokio::test]
1848    async fn test_set_body_static_processor_works() {
1849        use camel_core::route::compose_pipeline;
1850        let def = RouteBuilder::from("t:t")
1851            .route_id("test-route")
1852            .set_body("replaced")
1853            .build()
1854            .unwrap();
1855        let pipeline = compose_pipeline(
1856            def.steps()
1857                .iter()
1858                .filter_map(|s| {
1859                    if let BuilderStep::Processor(p) = s {
1860                        Some(p.clone())
1861                    } else {
1862                        None
1863                    }
1864                })
1865                .collect(),
1866        );
1867        let exchange = Exchange::new(Message::new("original"));
1868        let result = pipeline.oneshot(exchange).await.unwrap();
1869        assert_eq!(result.input.body.as_text(), Some("replaced"));
1870    }
1871
1872    #[tokio::test]
1873    async fn test_set_body_fn_processor_works() {
1874        use camel_core::route::compose_pipeline;
1875        let def = RouteBuilder::from("t:t")
1876            .route_id("test-route")
1877            .set_body_fn(|ex: &Exchange| {
1878                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
1879            })
1880            .build()
1881            .unwrap();
1882        let pipeline = compose_pipeline(
1883            def.steps()
1884                .iter()
1885                .filter_map(|s| {
1886                    if let BuilderStep::Processor(p) = s {
1887                        Some(p.clone())
1888                    } else {
1889                        None
1890                    }
1891                })
1892                .collect(),
1893        );
1894        let exchange = Exchange::new(Message::new("hello"));
1895        let result = pipeline.oneshot(exchange).await.unwrap();
1896        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1897    }
1898
1899    #[tokio::test]
1900    async fn test_set_header_fn_processor_works() {
1901        use camel_core::route::compose_pipeline;
1902        let def = RouteBuilder::from("t:t")
1903            .route_id("test-route")
1904            .set_header_fn("echo", |ex: &Exchange| {
1905                ex.input
1906                    .body
1907                    .as_text()
1908                    .map(|t| Value::String(t.into()))
1909                    .unwrap_or(Value::Null)
1910            })
1911            .build()
1912            .unwrap();
1913        let pipeline = compose_pipeline(
1914            def.steps()
1915                .iter()
1916                .filter_map(|s| {
1917                    if let BuilderStep::Processor(p) = s {
1918                        Some(p.clone())
1919                    } else {
1920                        None
1921                    }
1922                })
1923                .collect(),
1924        );
1925        let exchange = Exchange::new(Message::new("ping"));
1926        let result = pipeline.oneshot(exchange).await.unwrap();
1927        assert_eq!(
1928            result.input.header("echo"),
1929            Some(&Value::String("ping".into()))
1930        );
1931    }
1932
1933    // ── FilterBuilder typestate tests ─────────────────────────────────────
1934
1935    #[test]
1936    fn test_filter_builder_typestate() {
1937        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1938            .route_id("test-route")
1939            .filter(|_ex| true)
1940            .to("mock:inner")
1941            .end_filter()
1942            .to("mock:outer")
1943            .build();
1944        assert!(result.is_ok());
1945    }
1946
1947    #[test]
1948    fn test_filter_builder_steps_collected() {
1949        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1950            .route_id("test-route")
1951            .filter(|_ex| true)
1952            .to("mock:inner")
1953            .end_filter()
1954            .build()
1955            .unwrap();
1956
1957        assert_eq!(definition.steps().len(), 1);
1958        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1959    }
1960
1961    #[test]
1962    fn test_wire_tap_builder_adds_step() {
1963        let definition = RouteBuilder::from("timer:tick")
1964            .route_id("test-route")
1965            .wire_tap("mock:tap")
1966            .to("mock:result")
1967            .build()
1968            .unwrap();
1969
1970        assert_eq!(definition.steps().len(), 2);
1971        assert!(
1972            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
1973        );
1974        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
1975    }
1976
1977    // ── MulticastBuilder typestate tests ─────────────────────────────────────
1978
1979    #[test]
1980    fn test_multicast_builder_typestate() {
1981        let definition = RouteBuilder::from("timer:tick")
1982            .route_id("test-route")
1983            .multicast()
1984            .to("direct:a")
1985            .to("direct:b")
1986            .end_multicast()
1987            .to("mock:result")
1988            .build()
1989            .unwrap();
1990
1991        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
1992    }
1993
1994    #[test]
1995    fn test_multicast_builder_steps_collected() {
1996        let definition = RouteBuilder::from("timer:tick")
1997            .route_id("test-route")
1998            .multicast()
1999            .to("direct:a")
2000            .to("direct:b")
2001            .end_multicast()
2002            .build()
2003            .unwrap();
2004
2005        match &definition.steps()[0] {
2006            BuilderStep::Multicast { steps, .. } => {
2007                assert_eq!(steps.len(), 2);
2008            }
2009            other => panic!("Expected Multicast, got {:?}", other),
2010        }
2011    }
2012
2013    // ── Concurrency builder tests ─────────────────────────────────────
2014
2015    #[test]
2016    fn test_builder_concurrent_sets_concurrency() {
2017        use camel_component_api::ConcurrencyModel;
2018
2019        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2020            .route_id("test-route")
2021            .concurrent(16)
2022            .to("log:info")
2023            .build()
2024            .unwrap();
2025
2026        assert_eq!(
2027            definition.concurrency_override(),
2028            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2029        );
2030    }
2031
2032    #[test]
2033    fn test_builder_concurrent_zero_means_unbounded() {
2034        use camel_component_api::ConcurrencyModel;
2035
2036        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2037            .route_id("test-route")
2038            .concurrent(0)
2039            .to("log:info")
2040            .build()
2041            .unwrap();
2042
2043        assert_eq!(
2044            definition.concurrency_override(),
2045            Some(&ConcurrencyModel::Concurrent { max: None })
2046        );
2047    }
2048
2049    #[test]
2050    fn test_builder_sequential_sets_concurrency() {
2051        use camel_component_api::ConcurrencyModel;
2052
2053        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2054            .route_id("test-route")
2055            .sequential()
2056            .to("log:info")
2057            .build()
2058            .unwrap();
2059
2060        assert_eq!(
2061            definition.concurrency_override(),
2062            Some(&ConcurrencyModel::Sequential)
2063        );
2064    }
2065
2066    #[test]
2067    fn test_builder_default_concurrency_is_none() {
2068        let definition = RouteBuilder::from("timer:tick")
2069            .route_id("test-route")
2070            .to("log:info")
2071            .build()
2072            .unwrap();
2073
2074        assert_eq!(definition.concurrency_override(), None);
2075    }
2076
2077    // ── Route lifecycle builder tests ─────────────────────────────────────
2078
2079    #[test]
2080    fn test_builder_route_id_sets_id() {
2081        let definition = RouteBuilder::from("timer:tick")
2082            .route_id("my-route")
2083            .build()
2084            .unwrap();
2085
2086        assert_eq!(definition.route_id(), "my-route");
2087    }
2088
2089    #[test]
2090    fn test_build_without_route_id_fails() {
2091        let result = RouteBuilder::from("timer:tick?period=1000")
2092            .to("log:info")
2093            .build();
2094        let err = match result {
2095            Err(e) => e.to_string(),
2096            Ok(_) => panic!("build() should fail without route_id"),
2097        };
2098        assert!(
2099            err.contains("route_id"),
2100            "error should mention route_id, got: {}",
2101            err
2102        );
2103    }
2104
2105    #[test]
2106    fn test_builder_auto_startup_false() {
2107        let definition = RouteBuilder::from("timer:tick")
2108            .route_id("test-route")
2109            .auto_startup(false)
2110            .build()
2111            .unwrap();
2112
2113        assert!(!definition.auto_startup());
2114    }
2115
2116    #[test]
2117    fn test_builder_startup_order_custom() {
2118        let definition = RouteBuilder::from("timer:tick")
2119            .route_id("test-route")
2120            .startup_order(50)
2121            .build()
2122            .unwrap();
2123
2124        assert_eq!(definition.startup_order(), 50);
2125    }
2126
2127    #[test]
2128    fn test_builder_defaults() {
2129        let definition = RouteBuilder::from("timer:tick")
2130            .route_id("test-route")
2131            .build()
2132            .unwrap();
2133
2134        assert_eq!(definition.route_id(), "test-route");
2135        assert!(definition.auto_startup());
2136        assert_eq!(definition.startup_order(), 1000);
2137    }
2138
2139    // ── Choice typestate tests ──────────────────────────────────────────────────
2140
2141    #[test]
2142    fn test_choice_builder_single_when() {
2143        let definition = RouteBuilder::from("timer:tick")
2144            .route_id("test-route")
2145            .choice()
2146            .when(|ex: &Exchange| ex.input.header("type").is_some())
2147            .to("mock:typed")
2148            .end_when()
2149            .end_choice()
2150            .build()
2151            .unwrap();
2152        assert_eq!(definition.steps().len(), 1);
2153        assert!(
2154            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2155            if whens.len() == 1 && otherwise.is_none())
2156        );
2157    }
2158
2159    #[test]
2160    fn test_choice_builder_when_otherwise() {
2161        let definition = RouteBuilder::from("timer:tick")
2162            .route_id("test-route")
2163            .choice()
2164            .when(|ex: &Exchange| ex.input.header("a").is_some())
2165            .to("mock:a")
2166            .end_when()
2167            .otherwise()
2168            .to("mock:fallback")
2169            .end_otherwise()
2170            .end_choice()
2171            .build()
2172            .unwrap();
2173        assert!(
2174            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2175            if whens.len() == 1 && otherwise.is_some())
2176        );
2177    }
2178
2179    #[test]
2180    fn test_choice_builder_multiple_whens() {
2181        let definition = RouteBuilder::from("timer:tick")
2182            .route_id("test-route")
2183            .choice()
2184            .when(|ex: &Exchange| ex.input.header("a").is_some())
2185            .to("mock:a")
2186            .end_when()
2187            .when(|ex: &Exchange| ex.input.header("b").is_some())
2188            .to("mock:b")
2189            .end_when()
2190            .end_choice()
2191            .build()
2192            .unwrap();
2193        assert!(
2194            matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2195            if whens.len() == 2)
2196        );
2197    }
2198
2199    #[test]
2200    fn test_choice_step_after_choice() {
2201        // Steps after end_choice() are added to the outer pipeline, not inside choice.
2202        let definition = RouteBuilder::from("timer:tick")
2203            .route_id("test-route")
2204            .choice()
2205            .when(|_ex: &Exchange| true)
2206            .to("mock:inner")
2207            .end_when()
2208            .end_choice()
2209            .to("mock:outer") // must be step[1], not inside choice
2210            .build()
2211            .unwrap();
2212        assert_eq!(definition.steps().len(), 2);
2213        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2214    }
2215
2216    // ── Throttle typestate tests ──────────────────────────────────────────────────
2217
2218    #[test]
2219    fn test_throttle_builder_typestate() {
2220        let definition = RouteBuilder::from("timer:tick")
2221            .route_id("test-route")
2222            .throttle(10, std::time::Duration::from_secs(1))
2223            .to("mock:result")
2224            .end_throttle()
2225            .build()
2226            .unwrap();
2227
2228        assert_eq!(definition.steps().len(), 1);
2229        assert!(matches!(
2230            &definition.steps()[0],
2231            BuilderStep::Throttle { .. }
2232        ));
2233    }
2234
2235    #[test]
2236    fn test_throttle_builder_with_strategy() {
2237        let definition = RouteBuilder::from("timer:tick")
2238            .route_id("test-route")
2239            .throttle(10, std::time::Duration::from_secs(1))
2240            .strategy(ThrottleStrategy::Reject)
2241            .to("mock:result")
2242            .end_throttle()
2243            .build()
2244            .unwrap();
2245
2246        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2247            assert_eq!(config.strategy, ThrottleStrategy::Reject);
2248        } else {
2249            panic!("Expected Throttle step");
2250        }
2251    }
2252
2253    #[test]
2254    fn test_throttle_builder_steps_collected() {
2255        let definition = RouteBuilder::from("timer:tick")
2256            .route_id("test-route")
2257            .throttle(5, std::time::Duration::from_secs(1))
2258            .set_header("throttled", Value::Bool(true))
2259            .to("mock:throttled")
2260            .end_throttle()
2261            .build()
2262            .unwrap();
2263
2264        match &definition.steps()[0] {
2265            BuilderStep::Throttle { steps, .. } => {
2266                assert_eq!(steps.len(), 2); // SetHeader + To
2267            }
2268            other => panic!("Expected Throttle, got {:?}", other),
2269        }
2270    }
2271
2272    #[test]
2273    fn test_throttle_step_after_throttle() {
2274        // Steps after end_throttle() are added to the outer pipeline, not inside throttle.
2275        let definition = RouteBuilder::from("timer:tick")
2276            .route_id("test-route")
2277            .throttle(10, std::time::Duration::from_secs(1))
2278            .to("mock:inner")
2279            .end_throttle()
2280            .to("mock:outer")
2281            .build()
2282            .unwrap();
2283
2284        assert_eq!(definition.steps().len(), 2);
2285        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2286    }
2287
2288    // ── LoadBalance typestate tests ──────────────────────────────────────────────────
2289
2290    #[test]
2291    fn test_load_balance_builder_typestate() {
2292        let definition = RouteBuilder::from("timer:tick")
2293            .route_id("test-route")
2294            .load_balance()
2295            .round_robin()
2296            .to("mock:a")
2297            .to("mock:b")
2298            .end_load_balance()
2299            .build()
2300            .unwrap();
2301
2302        assert_eq!(definition.steps().len(), 1);
2303        assert!(matches!(
2304            &definition.steps()[0],
2305            BuilderStep::LoadBalance { .. }
2306        ));
2307    }
2308
2309    #[test]
2310    fn test_load_balance_builder_with_strategy() {
2311        let definition = RouteBuilder::from("timer:tick")
2312            .route_id("test-route")
2313            .load_balance()
2314            .random()
2315            .to("mock:result")
2316            .end_load_balance()
2317            .build()
2318            .unwrap();
2319
2320        if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2321            assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2322        } else {
2323            panic!("Expected LoadBalance step");
2324        }
2325    }
2326
2327    #[test]
2328    fn test_load_balance_builder_steps_collected() {
2329        let definition = RouteBuilder::from("timer:tick")
2330            .route_id("test-route")
2331            .load_balance()
2332            .set_header("lb", Value::Bool(true))
2333            .to("mock:a")
2334            .end_load_balance()
2335            .build()
2336            .unwrap();
2337
2338        match &definition.steps()[0] {
2339            BuilderStep::LoadBalance { steps, .. } => {
2340                assert_eq!(steps.len(), 2); // SetHeader + To
2341            }
2342            other => panic!("Expected LoadBalance, got {:?}", other),
2343        }
2344    }
2345
2346    #[test]
2347    fn test_load_balance_step_after_load_balance() {
2348        // Steps after end_load_balance() are added to the outer pipeline, not inside load_balance.
2349        let definition = RouteBuilder::from("timer:tick")
2350            .route_id("test-route")
2351            .load_balance()
2352            .to("mock:inner")
2353            .end_load_balance()
2354            .to("mock:outer")
2355            .build()
2356            .unwrap();
2357
2358        assert_eq!(definition.steps().len(), 2);
2359        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2360    }
2361
2362    // ── DynamicRouter typestate tests ──────────────────────────────────────────────────
2363
2364    #[test]
2365    fn test_dynamic_router_builder() {
2366        let definition = RouteBuilder::from("timer:tick")
2367            .route_id("test-route")
2368            .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2369            .build()
2370            .unwrap();
2371
2372        assert_eq!(definition.steps().len(), 1);
2373        assert!(matches!(
2374            &definition.steps()[0],
2375            BuilderStep::DynamicRouter { .. }
2376        ));
2377    }
2378
2379    #[test]
2380    fn test_dynamic_router_builder_with_config() {
2381        let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2382            .max_iterations(100)
2383            .cache_size(500);
2384
2385        let definition = RouteBuilder::from("timer:tick")
2386            .route_id("test-route")
2387            .dynamic_router_with_config(config)
2388            .build()
2389            .unwrap();
2390
2391        assert_eq!(definition.steps().len(), 1);
2392        if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2393            assert_eq!(config.max_iterations, 100);
2394            assert_eq!(config.cache_size, 500);
2395        } else {
2396            panic!("Expected DynamicRouter step");
2397        }
2398    }
2399
2400    #[test]
2401    fn test_dynamic_router_step_after_router() {
2402        // Steps after dynamic_router() are added to the outer pipeline.
2403        let definition = RouteBuilder::from("timer:tick")
2404            .route_id("test-route")
2405            .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2406            .to("mock:outer")
2407            .build()
2408            .unwrap();
2409
2410        assert_eq!(definition.steps().len(), 2);
2411        assert!(matches!(
2412            &definition.steps()[0],
2413            BuilderStep::DynamicRouter { .. }
2414        ));
2415        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2416    }
2417
2418    #[test]
2419    fn routing_slip_builder_creates_step() {
2420        use camel_api::RoutingSlipExpression;
2421
2422        let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2423
2424        let route = RouteBuilder::from("direct:start")
2425            .route_id("routing-slip-test")
2426            .routing_slip(expression)
2427            .build()
2428            .unwrap();
2429
2430        assert!(
2431            matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2432            "Expected RoutingSlip step"
2433        );
2434    }
2435
2436    #[test]
2437    fn routing_slip_with_config_builder_creates_step() {
2438        use camel_api::RoutingSlipConfig;
2439
2440        let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2441            .uri_delimiter("|")
2442            .cache_size(50)
2443            .ignore_invalid_endpoints(true);
2444
2445        let route = RouteBuilder::from("direct:start")
2446            .route_id("routing-slip-config-test")
2447            .routing_slip_with_config(config)
2448            .build()
2449            .unwrap();
2450
2451        if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2452            assert_eq!(config.uri_delimiter, "|");
2453            assert_eq!(config.cache_size, 50);
2454            assert!(config.ignore_invalid_endpoints);
2455        } else {
2456            panic!("Expected RoutingSlip step");
2457        }
2458    }
2459
2460    #[test]
2461    fn test_builder_marshal_adds_processor_step() {
2462        let definition = RouteBuilder::from("timer:tick")
2463            .route_id("test-route")
2464            .marshal("json")
2465            .build()
2466            .unwrap();
2467        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2468    }
2469
2470    #[test]
2471    fn test_builder_unmarshal_adds_processor_step() {
2472        let definition = RouteBuilder::from("timer:tick")
2473            .route_id("test-route")
2474            .unmarshal("json")
2475            .build()
2476            .unwrap();
2477        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2478    }
2479
2480    #[test]
2481    #[should_panic(expected = "unknown data format: 'csv'")]
2482    fn test_builder_marshal_panics_on_unknown_format() {
2483        let _ = RouteBuilder::from("timer:tick")
2484            .route_id("test-route")
2485            .marshal("csv")
2486            .build();
2487    }
2488
2489    #[test]
2490    #[should_panic(expected = "unknown data format: 'csv'")]
2491    fn test_builder_unmarshal_panics_on_unknown_format() {
2492        let _ = RouteBuilder::from("timer:tick")
2493            .route_id("test-route")
2494            .unmarshal("csv")
2495            .build();
2496    }
2497}