Skip to main content

camel_builder/
lib.rs

1use camel_api::aggregator::{
2    AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
3};
4use camel_api::body::Body;
5use camel_api::body_converter::BodyType;
6use camel_api::circuit_breaker::CircuitBreakerConfig;
7use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
8use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
9use camel_api::load_balancer::LoadBalancerConfig;
10use camel_api::multicast::{MulticastConfig, MulticastStrategy};
11use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
12use camel_api::splitter::SplitterConfig;
13use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
14use camel_api::{
15    BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
16    ProcessorFn, Value,
17    runtime::{
18        CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
19        CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
20        CanonicalWhenSpec,
21    },
22};
23use camel_component::ConcurrencyModel;
24use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
25use camel_processor::{
26    ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
27    UnmarshalService, builtin_data_format,
28};
29
30/// Shared step-accumulation methods for all builder types.
31///
32/// Implementors provide `steps_mut()` and get step-adding methods for free.
33/// `filter()` and other branching methods are NOT included — they return
34/// different types per builder and stay as per-builder methods.
35pub trait StepAccumulator: Sized {
36    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
37
38    fn to(mut self, endpoint: impl Into<String>) -> Self {
39        self.steps_mut().push(BuilderStep::To(endpoint.into()));
40        self
41    }
42
43    fn process<F, Fut>(mut self, f: F) -> Self
44    where
45        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
46        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
47    {
48        let svc = ProcessorFn::new(f);
49        self.steps_mut()
50            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
51        self
52    }
53
54    fn process_fn(mut self, processor: BoxProcessor) -> Self {
55        self.steps_mut().push(BuilderStep::Processor(processor));
56        self
57    }
58
59    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
60        let svc = SetHeader::new(IdentityProcessor, key, value);
61        self.steps_mut()
62            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
63        self
64    }
65
66    fn map_body<F>(mut self, mapper: F) -> Self
67    where
68        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
69    {
70        let svc = MapBody::new(IdentityProcessor, mapper);
71        self.steps_mut()
72            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
73        self
74    }
75
76    fn set_body<B>(mut self, body: B) -> Self
77    where
78        B: Into<Body> + Clone + Send + Sync + 'static,
79    {
80        let body: Body = body.into();
81        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
82        self.steps_mut()
83            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
84        self
85    }
86
87    /// Apache Camel-compatible alias for [`set_body`](Self::set_body).
88    ///
89    /// Transforms the message body using the given value. Semantically identical
90    /// to `set_body` — provided for familiarity with Apache Camel route DSLs.
91    fn transform<B>(self, body: B) -> Self
92    where
93        B: Into<Body> + Clone + Send + Sync + 'static,
94    {
95        self.set_body(body)
96    }
97
98    fn set_body_fn<F>(mut self, expr: F) -> Self
99    where
100        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
101    {
102        let svc = SetBody::new(IdentityProcessor, expr);
103        self.steps_mut()
104            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
105        self
106    }
107
108    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
109    where
110        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
111    {
112        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
113        self.steps_mut()
114            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
115        self
116    }
117
118    fn aggregate(mut self, config: AggregatorConfig) -> Self {
119        self.steps_mut().push(BuilderStep::Aggregate { config });
120        self
121    }
122
123    /// Stop processing this exchange immediately. No further steps in the
124    /// current pipeline will run.
125    ///
126    /// Can be used at any point in the route: directly on RouteBuilder,
127    /// inside `.filter()`, inside `.split()`, etc.
128    fn stop(mut self) -> Self {
129        self.steps_mut().push(BuilderStep::Stop);
130        self
131    }
132
133    /// Log a message at the specified level.
134    ///
135    /// The message will be logged when an exchange passes through this step.
136    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
137        self.steps_mut().push(BuilderStep::Log {
138            level,
139            message: message.into(),
140        });
141        self
142    }
143
144    /// Convert the message body to the target type.
145    ///
146    /// Supported: Text ↔ Json ↔ Bytes. `Body::Stream` always fails.
147    /// Returns `TypeConversionFailed` if conversion is not possible.
148    ///
149    /// # Example
150    /// ```ignore
151    /// route.set_body(Value::String(r#"{"x":1}"#.into()))
152    ///      .convert_body_to(BodyType::Json)
153    ///      .to("direct:next")
154    /// ```
155    fn convert_body_to(mut self, target: BodyType) -> Self {
156        let svc = ConvertBodyTo::new(IdentityProcessor, target);
157        self.steps_mut()
158            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
159        self
160    }
161
162    /// Marshal the message body using the specified data format.
163    ///
164    /// Supported formats: `"json"`, `"xml"`. Panics if the format name is unknown.
165    /// Converts a structured body (e.g., `Body::Json`) to a wire-format body (e.g., `Body::Text`).
166    ///
167    /// # Example
168    /// ```ignore
169    /// route.marshal("json").to("direct:next")
170    /// ```
171    fn marshal(mut self, format: impl Into<String>) -> Self {
172        let name = format.into();
173        let df =
174            builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
175        let svc = MarshalService::new(IdentityProcessor, df);
176        self.steps_mut()
177            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
178        self
179    }
180
181    /// Unmarshal the message body using the specified data format.
182    ///
183    /// Supported formats: `"json"`, `"xml"`. Panics if the format name is unknown.
184    /// Converts a wire-format body (e.g., `Body::Text`) to a structured body (e.g., `Body::Json`).
185    ///
186    /// # Example
187    /// ```ignore
188    /// route.unmarshal("json").to("direct:next")
189    /// ```
190    fn unmarshal(mut self, format: impl Into<String>) -> Self {
191        let name = format.into();
192        let df =
193            builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
194        let svc = UnmarshalService::new(IdentityProcessor, df);
195        self.steps_mut()
196            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
197        self
198    }
199
200    /// Execute a script that can modify the exchange (headers, properties, body).
201    ///
202    /// The script has access to `headers`, `properties`, and `body` variables
203    /// and can modify them with assignment syntax: `headers["k"] = v`.
204    ///
205    /// # Example
206    /// ```ignore
207    /// // ignore: requires full CamelContext setup with registered language
208    /// route.script("rhai", r#"headers["tenant"] = "acme"; body = body + "_processed""#)
209    /// ```
210    fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
211        self.steps_mut().push(BuilderStep::Script {
212            language: language.into(),
213            script: script.into(),
214        });
215        self
216    }
217}
218
219/// A fluent builder for constructing routes.
220///
221/// # Example
222///
223/// ```ignore
224/// let definition = RouteBuilder::from("timer:tick?period=1000")
225///     .set_header("source", Value::String("timer".into()))
226///     .filter(|ex| ex.input.body.as_text().is_some())
227///     .to("log:info?showHeaders=true")
228///     .build()?;
229/// ```
230pub struct RouteBuilder {
231    from_uri: String,
232    steps: Vec<BuilderStep>,
233    error_handler: Option<ErrorHandlerConfig>,
234    error_handler_mode: ErrorHandlerMode,
235    circuit_breaker_config: Option<CircuitBreakerConfig>,
236    concurrency: Option<ConcurrencyModel>,
237    route_id: Option<String>,
238    auto_startup: Option<bool>,
239    startup_order: Option<i32>,
240}
241
242#[derive(Default)]
243enum ErrorHandlerMode {
244    #[default]
245    None,
246    ExplicitConfig,
247    Shorthand {
248        dlc_uri: Option<String>,
249        specs: Vec<OnExceptionSpec>,
250    },
251    Mixed,
252}
253
254#[derive(Clone)]
255struct OnExceptionSpec {
256    matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
257    retry: Option<RedeliveryPolicy>,
258    handled_by: Option<String>,
259}
260
261impl RouteBuilder {
262    /// Start building a route from the given source endpoint URI.
263    pub fn from(endpoint: &str) -> Self {
264        Self {
265            from_uri: endpoint.to_string(),
266            steps: Vec::new(),
267            error_handler: None,
268            error_handler_mode: ErrorHandlerMode::None,
269            circuit_breaker_config: None,
270            concurrency: None,
271            route_id: None,
272            auto_startup: None,
273            startup_order: None,
274        }
275    }
276
277    /// Open a filter scope. Only exchanges matching `predicate` will be processed
278    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
279    /// and continue to steps after `.end_filter()`.
280    pub fn filter<F>(self, predicate: F) -> FilterBuilder
281    where
282        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
283    {
284        FilterBuilder {
285            parent: self,
286            predicate: std::sync::Arc::new(predicate),
287            steps: vec![],
288        }
289    }
290
291    /// Open a choice scope for content-based routing.
292    ///
293    /// Within the choice, you can define multiple `.when()` clauses and an
294    /// optional `.otherwise()` clause. The first matching `when` predicate
295    /// determines which sub-pipeline executes.
296    pub fn choice(self) -> ChoiceBuilder {
297        ChoiceBuilder {
298            parent: self,
299            whens: vec![],
300            _otherwise: None,
301        }
302    }
303
304    /// Add a WireTap step that sends a clone of the exchange to the given
305    /// endpoint URI (fire-and-forget). The original exchange continues
306    /// downstream unchanged.
307    pub fn wire_tap(mut self, endpoint: &str) -> Self {
308        self.steps.push(BuilderStep::WireTap {
309            uri: endpoint.to_string(),
310        });
311        self
312    }
313
314    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
315    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
316        self.error_handler_mode = match self.error_handler_mode {
317            ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
318                ErrorHandlerMode::ExplicitConfig
319            }
320            ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
321        };
322        self.error_handler = Some(config);
323        self
324    }
325
326    /// Set a dead letter channel URI for shorthand error handler mode.
327    pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
328        let uri = uri.into();
329        self.error_handler_mode = match self.error_handler_mode {
330            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
331                dlc_uri: Some(uri),
332                specs: Vec::new(),
333            },
334            ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
335                dlc_uri: Some(uri),
336                specs,
337            },
338            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
339        };
340        self
341    }
342
343    /// Add a shorthand exception policy scope. Call `.end_on_exception()` to return to route builder.
344    pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
345    where
346        F: Fn(&CamelError) -> bool + Send + Sync + 'static,
347    {
348        self.error_handler_mode = match self.error_handler_mode {
349            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
350                dlc_uri: None,
351                specs: Vec::new(),
352            },
353            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
354            shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
355        };
356
357        OnExceptionBuilder {
358            parent: self,
359            policy: OnExceptionSpec {
360                matches: std::sync::Arc::new(matches),
361                retry: None,
362                handled_by: None,
363            },
364        }
365    }
366
367    /// Set a circuit breaker for this route.
368    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
369        self.circuit_breaker_config = Some(config);
370        self
371    }
372
373    /// Override the consumer's default concurrency model.
374    ///
375    /// When set, the pipeline spawns a task per exchange, processing them
376    /// concurrently. `max` limits the number of simultaneously active
377    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
378    ///
379    /// # Example
380    /// ```ignore
381    /// RouteBuilder::from("http://0.0.0.0:8080/api")
382    ///     .concurrent(16)  // max 16 in-flight pipeline executions
383    ///     .process(handle_request)
384    ///     .build()
385    /// ```
386    pub fn concurrent(mut self, max: usize) -> Self {
387        let max = if max == 0 { None } else { Some(max) };
388        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
389        self
390    }
391
392    /// Force sequential processing, overriding a concurrent-capable consumer.
393    ///
394    /// Useful for HTTP routes that mutate shared state and need ordering
395    /// guarantees.
396    pub fn sequential(mut self) -> Self {
397        self.concurrency = Some(ConcurrencyModel::Sequential);
398        self
399    }
400
401    /// Set the route ID for this route.
402    ///
403    /// If not set, the route will be assigned an auto-generated ID.
404    pub fn route_id(mut self, id: impl Into<String>) -> Self {
405        self.route_id = Some(id.into());
406        self
407    }
408
409    /// Set whether this route should automatically start when the context starts.
410    ///
411    /// Default is `true`.
412    pub fn auto_startup(mut self, auto: bool) -> Self {
413        self.auto_startup = Some(auto);
414        self
415    }
416
417    /// Set the startup order for this route.
418    ///
419    /// Routes with lower values start first. Default is 1000.
420    pub fn startup_order(mut self, order: i32) -> Self {
421        self.startup_order = Some(order);
422        self
423    }
424
425    /// Begin a Splitter sub-pipeline. Steps added after this call (until
426    /// `.end_split()`) will be executed per-fragment.
427    ///
428    /// Returns a `SplitBuilder` — you cannot call `.build()` until
429    /// `.end_split()` closes the split scope (enforced by the type system).
430    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
431        SplitBuilder {
432            parent: self,
433            config,
434            steps: Vec::new(),
435        }
436    }
437
438    /// Begin a Multicast sub-pipeline. Steps added after this call (until
439    /// `.end_multicast()`) will each receive a copy of the exchange.
440    ///
441    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
442    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
443    pub fn multicast(self) -> MulticastBuilder {
444        MulticastBuilder {
445            parent: self,
446            steps: Vec::new(),
447            config: MulticastConfig::new(),
448        }
449    }
450
451    /// Begin a Throttle sub-pipeline. Rate limits message processing to at most
452    /// `max_requests` per `period`. Steps inside the throttle scope are only
453    /// executed when the rate limit allows.
454    ///
455    /// Returns a `ThrottleBuilder` — you cannot call `.build()` until
456    /// `.end_throttle()` closes the throttle scope (enforced by the type system).
457    pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
458        ThrottleBuilder {
459            parent: self,
460            config: ThrottlerConfig::new(max_requests, period),
461            steps: Vec::new(),
462        }
463    }
464
465    /// Begin a LoadBalance sub-pipeline. Distributes exchanges across multiple
466    /// endpoints using a configurable strategy (round-robin, random, weighted, failover).
467    ///
468    /// Returns a `LoadBalancerBuilder` — you cannot call `.build()` until
469    /// `.end_load_balance()` closes the load balance scope (enforced by the type system).
470    pub fn load_balance(self) -> LoadBalancerBuilder {
471        LoadBalancerBuilder {
472            parent: self,
473            config: LoadBalancerConfig::round_robin(),
474            steps: Vec::new(),
475        }
476    }
477
478    /// Add a dynamic router step that routes exchanges dynamically based on
479    /// expression evaluation at runtime.
480    ///
481    /// The expression receives the exchange and returns `Some(uri)` to route to
482    /// the next endpoint, or `None` to stop routing.
483    ///
484    /// # Example
485    /// ```ignore
486    /// RouteBuilder::from("timer:tick")
487    ///     .route_id("test-route")
488    ///     .dynamic_router(|ex| {
489    ///         ex.input.header("dest").and_then(|v| v.as_str().map(|s| s.to_string()))
490    ///     })
491    ///     .build()
492    /// ```
493    pub fn dynamic_router(self, expression: RouterExpression) -> Self {
494        self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
495    }
496
497    /// Add a dynamic router step with full configuration.
498    ///
499    /// Allows customization of URI delimiter, cache size, timeout, and other options.
500    pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
501        self.steps.push(BuilderStep::DynamicRouter { config });
502        self
503    }
504
505    pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
506        self.routing_slip_with_config(RoutingSlipConfig::new(expression))
507    }
508
509    pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
510        self.steps.push(BuilderStep::RoutingSlip { config });
511        self
512    }
513
514    /// Consume the builder and produce a [`RouteDefinition`].
515    pub fn build(self) -> Result<RouteDefinition, CamelError> {
516        if self.from_uri.is_empty() {
517            return Err(CamelError::RouteError(
518                "route must have a 'from' URI".to_string(),
519            ));
520        }
521        let route_id = self.route_id.ok_or_else(|| {
522            CamelError::RouteError(
523                "route must have a 'route_id' — call .route_id(\"name\") on the builder"
524                    .to_string(),
525            )
526        })?;
527        let resolved_error_handler = match self.error_handler_mode {
528            ErrorHandlerMode::None => self.error_handler,
529            ErrorHandlerMode::ExplicitConfig => self.error_handler,
530            ErrorHandlerMode::Mixed => {
531                return Err(CamelError::RouteError(
532                    "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
533                ));
534            }
535            ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
536                let mut config = if let Some(uri) = dlc_uri {
537                    ErrorHandlerConfig::dead_letter_channel(uri)
538                } else {
539                    ErrorHandlerConfig::log_only()
540                };
541
542                for spec in specs {
543                    let matcher = spec.matches.clone();
544                    let mut builder = config.on_exception(move |e| matcher(e));
545
546                    if let Some(retry) = spec.retry {
547                        builder = builder.retry(retry.max_attempts).with_backoff(
548                            retry.initial_delay,
549                            retry.multiplier,
550                            retry.max_delay,
551                        );
552                        if retry.jitter_factor > 0.0 {
553                            builder = builder.with_jitter(retry.jitter_factor);
554                        }
555                    }
556
557                    if let Some(uri) = spec.handled_by {
558                        builder = builder.handled_by(uri);
559                    }
560
561                    config = builder.build();
562                }
563
564                Some(config)
565            }
566        };
567
568        let definition = RouteDefinition::new(self.from_uri, self.steps);
569        let definition = if let Some(eh) = resolved_error_handler {
570            definition.with_error_handler(eh)
571        } else {
572            definition
573        };
574        let definition = if let Some(cb) = self.circuit_breaker_config {
575            definition.with_circuit_breaker(cb)
576        } else {
577            definition
578        };
579        let definition = if let Some(concurrency) = self.concurrency {
580            definition.with_concurrency(concurrency)
581        } else {
582            definition
583        };
584        let definition = definition.with_route_id(route_id);
585        let definition = if let Some(auto) = self.auto_startup {
586            definition.with_auto_startup(auto)
587        } else {
588            definition
589        };
590        let definition = if let Some(order) = self.startup_order {
591            definition.with_startup_order(order)
592        } else {
593            definition
594        };
595        Ok(definition)
596    }
597
598    /// Compile this builder route into canonical v1 spec.
599    pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
600        if self.from_uri.is_empty() {
601            return Err(CamelError::RouteError(
602                "route must have a 'from' URI".to_string(),
603            ));
604        }
605        let route_id = self.route_id.ok_or_else(|| {
606            CamelError::RouteError(
607                "route must have a 'route_id' — call .route_id(\"name\") on the builder"
608                    .to_string(),
609            )
610        })?;
611
612        let steps = canonicalize_steps(self.steps)?;
613        let circuit_breaker = self
614            .circuit_breaker_config
615            .map(canonicalize_circuit_breaker);
616
617        let spec = CanonicalRouteSpec {
618            route_id,
619            from: self.from_uri,
620            steps,
621            circuit_breaker,
622            version: camel_api::CANONICAL_CONTRACT_VERSION,
623        };
624        spec.validate_contract()?;
625        Ok(spec)
626    }
627}
628
629pub struct OnExceptionBuilder {
630    parent: RouteBuilder,
631    policy: OnExceptionSpec,
632}
633
634impl OnExceptionBuilder {
635    pub fn retry(mut self, max_attempts: u32) -> Self {
636        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
637        self
638    }
639
640    pub fn with_backoff(
641        mut self,
642        initial: std::time::Duration,
643        multiplier: f64,
644        max: std::time::Duration,
645    ) -> Self {
646        if let Some(ref mut retry) = self.policy.retry {
647            retry.initial_delay = initial;
648            retry.multiplier = multiplier;
649            retry.max_delay = max;
650        }
651        self
652    }
653
654    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
655        if let Some(ref mut retry) = self.policy.retry {
656            retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
657        }
658        self
659    }
660
661    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
662        self.policy.handled_by = Some(uri.into());
663        self
664    }
665
666    pub fn end_on_exception(mut self) -> RouteBuilder {
667        if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
668            specs.push(self.policy);
669        }
670        self.parent
671    }
672}
673
674fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
675    let mut canonical = Vec::with_capacity(steps.len());
676    for step in steps {
677        canonical.push(canonicalize_step(step)?);
678    }
679    Ok(canonical)
680}
681
682fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
683    match step {
684        BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
685        BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
686        BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
687        BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
688        BuilderStep::DeclarativeScript { expression } => {
689            Ok(CanonicalStepSpec::Script { expression })
690        }
691        BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
692            predicate,
693            steps: canonicalize_steps(steps)?,
694        }),
695        BuilderStep::DeclarativeChoice { whens, otherwise } => {
696            let mut canonical_whens = Vec::with_capacity(whens.len());
697            for DeclarativeWhenStep { predicate, steps } in whens {
698                canonical_whens.push(CanonicalWhenSpec {
699                    predicate,
700                    steps: canonicalize_steps(steps)?,
701                });
702            }
703            let otherwise = match otherwise {
704                Some(steps) => Some(canonicalize_steps(steps)?),
705                None => None,
706            };
707            Ok(CanonicalStepSpec::Choice {
708                whens: canonical_whens,
709                otherwise,
710            })
711        }
712        BuilderStep::DeclarativeSplit {
713            expression,
714            aggregation,
715            parallel,
716            parallel_limit,
717            stop_on_exception,
718            steps,
719        } => Ok(CanonicalStepSpec::Split {
720            expression: CanonicalSplitExpressionSpec::Language(expression),
721            aggregation: canonicalize_split_aggregation(aggregation)?,
722            parallel,
723            parallel_limit,
724            stop_on_exception,
725            steps: canonicalize_steps(steps)?,
726        }),
727        BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate {
728            config: canonicalize_aggregate(config)?,
729        }),
730        other => {
731            let step_name = canonical_step_name(&other);
732            let detail = camel_api::canonical_contract_rejection_reason(step_name)
733                .unwrap_or("not included in canonical v1");
734            Err(CamelError::RouteError(format!(
735                "canonical v1 does not support step `{step_name}`: {detail}"
736            )))
737        }
738    }
739}
740
741fn canonicalize_split_aggregation(
742    strategy: camel_api::splitter::AggregationStrategy,
743) -> Result<CanonicalSplitAggregationSpec, CamelError> {
744    match strategy {
745        camel_api::splitter::AggregationStrategy::LastWins => {
746            Ok(CanonicalSplitAggregationSpec::LastWins)
747        }
748        camel_api::splitter::AggregationStrategy::CollectAll => {
749            Ok(CanonicalSplitAggregationSpec::CollectAll)
750        }
751        camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
752            "canonical v1 does not support custom split aggregation".to_string(),
753        )),
754        camel_api::splitter::AggregationStrategy::Original => {
755            Ok(CanonicalSplitAggregationSpec::Original)
756        }
757    }
758}
759
760fn extract_completion_fields(
761    mode: &CompletionMode,
762) -> Result<(Option<usize>, Option<u64>), CamelError> {
763    match mode {
764        CompletionMode::Single(cond) => match cond {
765            CompletionCondition::Size(n) => Ok((Some(*n), None)),
766            CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
767            CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
768                "canonical v1 does not support aggregate predicate completion".to_string(),
769            )),
770        },
771        CompletionMode::Any(conds) => {
772            let mut size = None;
773            let mut timeout_ms = None;
774            for cond in conds {
775                match cond {
776                    CompletionCondition::Size(n) => size = Some(*n),
777                    CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
778                    CompletionCondition::Predicate(_) => {
779                        return Err(CamelError::RouteError(
780                            "canonical v1 does not support aggregate predicate completion"
781                                .to_string(),
782                        ));
783                    }
784                }
785            }
786            Ok((size, timeout_ms))
787        }
788    }
789}
790
791fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
792    let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
793
794    let header = match &config.correlation {
795        CorrelationStrategy::HeaderName(h) => h.clone(),
796        CorrelationStrategy::Expression { expr, .. } => expr.clone(),
797        CorrelationStrategy::Fn(_) => {
798            return Err(CamelError::RouteError(
799                "canonical v1 does not support Fn correlation strategy".to_string(),
800            ));
801        }
802    };
803
804    let correlation_key = match &config.correlation {
805        CorrelationStrategy::HeaderName(_) => None,
806        CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
807        CorrelationStrategy::Fn(_) => unreachable!(),
808    };
809
810    let strategy = match config.strategy {
811        AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
812        AggregationStrategy::Custom(_) => {
813            return Err(CamelError::RouteError(
814                "canonical v1 does not support custom aggregate strategy".to_string(),
815            ));
816        }
817    };
818    let bucket_ttl_ms = config
819        .bucket_ttl
820        .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
821
822    Ok(CanonicalAggregateSpec {
823        header,
824        completion_size,
825        completion_timeout_ms,
826        correlation_key,
827        force_completion_on_stop: if config.force_completion_on_stop {
828            Some(true)
829        } else {
830            None
831        },
832        discard_on_timeout: if config.discard_on_timeout {
833            Some(true)
834        } else {
835            None
836        },
837        strategy,
838        max_buckets: config.max_buckets,
839        bucket_ttl_ms,
840    })
841}
842
843fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
844    CanonicalCircuitBreakerSpec {
845        failure_threshold: config.failure_threshold,
846        open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
847    }
848}
849
850fn canonical_step_name(step: &BuilderStep) -> &'static str {
851    match step {
852        BuilderStep::Processor(_) => "processor",
853        BuilderStep::To(_) => "to",
854        BuilderStep::Stop => "stop",
855        BuilderStep::Log { .. } => "log",
856        BuilderStep::DeclarativeSetHeader { .. } => "set_header",
857        BuilderStep::DeclarativeSetBody { .. } => "set_body",
858        BuilderStep::DeclarativeFilter { .. } => "filter",
859        BuilderStep::DeclarativeChoice { .. } => "choice",
860        BuilderStep::DeclarativeScript { .. } => "script",
861        BuilderStep::DeclarativeSplit { .. } => "split",
862        BuilderStep::Split { .. } => "split",
863        BuilderStep::Aggregate { .. } => "aggregate",
864        BuilderStep::Filter { .. } => "filter",
865        BuilderStep::Choice { .. } => "choice",
866        BuilderStep::WireTap { .. } => "wire_tap",
867        BuilderStep::Multicast { .. } => "multicast",
868        BuilderStep::DeclarativeLog { .. } => "log",
869        BuilderStep::Bean { .. } => "bean",
870        BuilderStep::Script { .. } => "script",
871        BuilderStep::Throttle { .. } => "throttle",
872        BuilderStep::LoadBalance { .. } => "load_balancer",
873        BuilderStep::DynamicRouter { .. } => "dynamic_router",
874        BuilderStep::RoutingSlip { .. } => "routing_slip",
875    }
876}
877
878impl StepAccumulator for RouteBuilder {
879    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
880        &mut self.steps
881    }
882}
883
884/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
885///
886/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
887/// but NOT `.build()` and NOT `.split()` (no nested splits).
888///
889/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
890/// and returns the parent `RouteBuilder`.
891pub struct SplitBuilder {
892    parent: RouteBuilder,
893    config: SplitterConfig,
894    steps: Vec<BuilderStep>,
895}
896
897impl SplitBuilder {
898    /// Open a filter scope within the split sub-pipeline.
899    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
900    where
901        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
902    {
903        FilterInSplitBuilder {
904            parent: self,
905            predicate: std::sync::Arc::new(predicate),
906            steps: vec![],
907        }
908    }
909
910    /// Close the split scope. Packages the accumulated sub-steps into a
911    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
912    pub fn end_split(mut self) -> RouteBuilder {
913        let split_step = BuilderStep::Split {
914            config: self.config,
915            steps: self.steps,
916        };
917        self.parent.steps.push(split_step);
918        self.parent
919    }
920}
921
922impl StepAccumulator for SplitBuilder {
923    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
924        &mut self.steps
925    }
926}
927
928/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
929pub struct FilterBuilder {
930    parent: RouteBuilder,
931    predicate: FilterPredicate,
932    steps: Vec<BuilderStep>,
933}
934
935impl FilterBuilder {
936    /// Close the filter scope. Packages the accumulated sub-steps into a
937    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
938    pub fn end_filter(mut self) -> RouteBuilder {
939        let step = BuilderStep::Filter {
940            predicate: self.predicate,
941            steps: self.steps,
942        };
943        self.parent.steps.push(step);
944        self.parent
945    }
946}
947
948impl StepAccumulator for FilterBuilder {
949    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
950        &mut self.steps
951    }
952}
953
954/// Builder for a filter scope nested inside a `.split()` block.
955pub struct FilterInSplitBuilder {
956    parent: SplitBuilder,
957    predicate: FilterPredicate,
958    steps: Vec<BuilderStep>,
959}
960
961impl FilterInSplitBuilder {
962    /// Close the filter scope and return the parent `SplitBuilder`.
963    pub fn end_filter(mut self) -> SplitBuilder {
964        let step = BuilderStep::Filter {
965            predicate: self.predicate,
966            steps: self.steps,
967        };
968        self.parent.steps.push(step);
969        self.parent
970    }
971}
972
973impl StepAccumulator for FilterInSplitBuilder {
974    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
975        &mut self.steps
976    }
977}
978
979// ── Choice/When/Otherwise builders ─────────────────────────────────────────
980
981/// Builder for a `.choice()` ... `.end_choice()` block.
982///
983/// Accumulates `when` clauses and an optional `otherwise` clause.
984/// Cannot call `.build()` until `.end_choice()` is called.
985pub struct ChoiceBuilder {
986    parent: RouteBuilder,
987    whens: Vec<WhenStep>,
988    _otherwise: Option<Vec<BuilderStep>>,
989}
990
991impl ChoiceBuilder {
992    /// Open a `when` clause. Only exchanges matching `predicate` will be
993    /// processed by the steps inside the `.when()` ... `.end_when()` scope.
994    pub fn when<F>(self, predicate: F) -> WhenBuilder
995    where
996        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
997    {
998        WhenBuilder {
999            parent: self,
1000            predicate: std::sync::Arc::new(predicate),
1001            steps: vec![],
1002        }
1003    }
1004
1005    /// Open an `otherwise` clause. Executed when no `when` predicate matched.
1006    ///
1007    /// Only one `otherwise` is allowed per `choice`. Call this after all `.when()` clauses.
1008    pub fn otherwise(self) -> OtherwiseBuilder {
1009        OtherwiseBuilder {
1010            parent: self,
1011            steps: vec![],
1012        }
1013    }
1014
1015    /// Close the choice scope. Packages all accumulated `when` clauses and
1016    /// optional `otherwise` into a `BuilderStep::Choice` and returns the
1017    /// parent `RouteBuilder`.
1018    pub fn end_choice(mut self) -> RouteBuilder {
1019        let step = BuilderStep::Choice {
1020            whens: self.whens,
1021            otherwise: self._otherwise,
1022        };
1023        self.parent.steps.push(step);
1024        self.parent
1025    }
1026}
1027
1028/// Builder for the sub-pipeline within a `.when()` ... `.end_when()` block.
1029pub struct WhenBuilder {
1030    parent: ChoiceBuilder,
1031    predicate: camel_api::FilterPredicate,
1032    steps: Vec<BuilderStep>,
1033}
1034
1035impl WhenBuilder {
1036    /// Close the when scope. Packages the accumulated sub-steps into a
1037    /// `WhenStep` and returns the parent `ChoiceBuilder`.
1038    pub fn end_when(mut self) -> ChoiceBuilder {
1039        self.parent.whens.push(WhenStep {
1040            predicate: self.predicate,
1041            steps: self.steps,
1042        });
1043        self.parent
1044    }
1045}
1046
1047impl StepAccumulator for WhenBuilder {
1048    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1049        &mut self.steps
1050    }
1051}
1052
1053/// Builder for the sub-pipeline within an `.otherwise()` ... `.end_otherwise()` block.
1054pub struct OtherwiseBuilder {
1055    parent: ChoiceBuilder,
1056    steps: Vec<BuilderStep>,
1057}
1058
1059impl OtherwiseBuilder {
1060    /// Close the otherwise scope and return the parent `ChoiceBuilder`.
1061    pub fn end_otherwise(self) -> ChoiceBuilder {
1062        let OtherwiseBuilder { mut parent, steps } = self;
1063        parent._otherwise = Some(steps);
1064        parent
1065    }
1066}
1067
1068impl StepAccumulator for OtherwiseBuilder {
1069    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1070        &mut self.steps
1071    }
1072}
1073
1074/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
1075///
1076/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1077/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
1078///
1079/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
1080/// and returns the parent `RouteBuilder`.
1081pub struct MulticastBuilder {
1082    parent: RouteBuilder,
1083    steps: Vec<BuilderStep>,
1084    config: MulticastConfig,
1085}
1086
1087impl MulticastBuilder {
1088    pub fn parallel(mut self, parallel: bool) -> Self {
1089        self.config = self.config.parallel(parallel);
1090        self
1091    }
1092
1093    pub fn parallel_limit(mut self, limit: usize) -> Self {
1094        self.config = self.config.parallel_limit(limit);
1095        self
1096    }
1097
1098    pub fn stop_on_exception(mut self, stop: bool) -> Self {
1099        self.config = self.config.stop_on_exception(stop);
1100        self
1101    }
1102
1103    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1104        self.config = self.config.timeout(duration);
1105        self
1106    }
1107
1108    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1109        self.config = self.config.aggregation(strategy);
1110        self
1111    }
1112
1113    pub fn end_multicast(mut self) -> RouteBuilder {
1114        let step = BuilderStep::Multicast {
1115            steps: self.steps,
1116            config: self.config,
1117        };
1118        self.parent.steps.push(step);
1119        self.parent
1120    }
1121}
1122
1123impl StepAccumulator for MulticastBuilder {
1124    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1125        &mut self.steps
1126    }
1127}
1128
1129/// Builder for the sub-pipeline within a `.throttle()` ... `.end_throttle()` block.
1130///
1131/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1132/// but NOT `.build()` and NOT `.throttle()` (no nested throttles).
1133///
1134/// Calling `.end_throttle()` packages the sub-steps into a `BuilderStep::Throttle`
1135/// and returns the parent `RouteBuilder`.
1136pub struct ThrottleBuilder {
1137    parent: RouteBuilder,
1138    config: ThrottlerConfig,
1139    steps: Vec<BuilderStep>,
1140}
1141
1142impl ThrottleBuilder {
1143    /// Set the throttle strategy. Default is `Delay`.
1144    ///
1145    /// - `Delay`: Queue messages until capacity available
1146    /// - `Reject`: Return error immediately when throttled
1147    /// - `Drop`: Silently discard excess messages
1148    pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1149        self.config = self.config.strategy(strategy);
1150        self
1151    }
1152
1153    /// Close the throttle scope. Packages the accumulated sub-steps into a
1154    /// `BuilderStep::Throttle` and returns the parent `RouteBuilder`.
1155    pub fn end_throttle(mut self) -> RouteBuilder {
1156        let step = BuilderStep::Throttle {
1157            config: self.config,
1158            steps: self.steps,
1159        };
1160        self.parent.steps.push(step);
1161        self.parent
1162    }
1163}
1164
1165impl StepAccumulator for ThrottleBuilder {
1166    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1167        &mut self.steps
1168    }
1169}
1170
1171/// Builder for the sub-pipeline within a `.load_balance()` ... `.end_load_balance()` block.
1172///
1173/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1174/// but NOT `.build()` and NOT `.load_balance()` (no nested load balancers).
1175///
1176/// Calling `.end_load_balance()` packages the sub-steps into a `BuilderStep::LoadBalance`
1177/// and returns the parent `RouteBuilder`.
1178pub struct LoadBalancerBuilder {
1179    parent: RouteBuilder,
1180    config: LoadBalancerConfig,
1181    steps: Vec<BuilderStep>,
1182}
1183
1184impl LoadBalancerBuilder {
1185    /// Set the load balance strategy to round-robin (default).
1186    pub fn round_robin(mut self) -> Self {
1187        self.config = LoadBalancerConfig::round_robin();
1188        self
1189    }
1190
1191    /// Set the load balance strategy to random selection.
1192    pub fn random(mut self) -> Self {
1193        self.config = LoadBalancerConfig::random();
1194        self
1195    }
1196
1197    /// Set the load balance strategy to weighted selection.
1198    ///
1199    /// Each endpoint is assigned a weight that determines its probability
1200    /// of being selected.
1201    pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1202        self.config = LoadBalancerConfig::weighted(weights);
1203        self
1204    }
1205
1206    /// Set the load balance strategy to failover.
1207    ///
1208    /// Exchanges are sent to the first endpoint; on failure, the next endpoint
1209    /// is tried.
1210    pub fn failover(mut self) -> Self {
1211        self.config = LoadBalancerConfig::failover();
1212        self
1213    }
1214
1215    /// Enable or disable parallel execution of endpoints.
1216    ///
1217    /// When enabled, all endpoints receive the exchange simultaneously.
1218    /// When disabled (default), only one endpoint is selected per exchange.
1219    pub fn parallel(mut self, parallel: bool) -> Self {
1220        self.config = self.config.parallel(parallel);
1221        self
1222    }
1223
1224    /// Close the load balance scope. Packages the accumulated sub-steps into a
1225    /// `BuilderStep::LoadBalance` and returns the parent `RouteBuilder`.
1226    pub fn end_load_balance(mut self) -> RouteBuilder {
1227        let step = BuilderStep::LoadBalance {
1228            config: self.config,
1229            steps: self.steps,
1230        };
1231        self.parent.steps.push(step);
1232        self.parent
1233    }
1234}
1235
1236impl StepAccumulator for LoadBalancerBuilder {
1237    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1238        &mut self.steps
1239    }
1240}
1241
1242// ---------------------------------------------------------------------------
1243// Tests
1244// ---------------------------------------------------------------------------
1245
1246#[cfg(test)]
1247mod tests {
1248    use super::*;
1249    use camel_api::error_handler::ErrorHandlerConfig;
1250    use camel_api::load_balancer::LoadBalanceStrategy;
1251    use camel_api::{Exchange, Message};
1252    use camel_core::route::BuilderStep;
1253    use std::sync::Arc;
1254    use std::time::Duration;
1255    use tower::{Service, ServiceExt};
1256
1257    #[test]
1258    fn test_builder_from_creates_definition() {
1259        let definition = RouteBuilder::from("timer:tick")
1260            .route_id("test-route")
1261            .build()
1262            .unwrap();
1263        assert_eq!(definition.from_uri(), "timer:tick");
1264    }
1265
1266    #[test]
1267    fn test_builder_empty_from_uri_errors() {
1268        let result = RouteBuilder::from("").route_id("test-route").build();
1269        assert!(result.is_err());
1270    }
1271
1272    #[test]
1273    fn test_builder_to_adds_step() {
1274        let definition = RouteBuilder::from("timer:tick")
1275            .route_id("test-route")
1276            .to("log:info")
1277            .build()
1278            .unwrap();
1279
1280        assert_eq!(definition.from_uri(), "timer:tick");
1281        // We can verify steps were added by checking the structure
1282        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1283    }
1284
1285    #[test]
1286    fn test_builder_filter_adds_filter_step() {
1287        let definition = RouteBuilder::from("timer:tick")
1288            .route_id("test-route")
1289            .filter(|_ex| true)
1290            .to("mock:result")
1291            .end_filter()
1292            .build()
1293            .unwrap();
1294
1295        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1296    }
1297
1298    #[test]
1299    fn test_builder_set_header_adds_processor_step() {
1300        let definition = RouteBuilder::from("timer:tick")
1301            .route_id("test-route")
1302            .set_header("key", Value::String("value".into()))
1303            .build()
1304            .unwrap();
1305
1306        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1307    }
1308
1309    #[test]
1310    fn test_builder_map_body_adds_processor_step() {
1311        let definition = RouteBuilder::from("timer:tick")
1312            .route_id("test-route")
1313            .map_body(|body| body)
1314            .build()
1315            .unwrap();
1316
1317        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1318    }
1319
1320    #[test]
1321    fn test_builder_process_adds_processor_step() {
1322        let definition = RouteBuilder::from("timer:tick")
1323            .route_id("test-route")
1324            .process(|ex| async move { Ok(ex) })
1325            .build()
1326            .unwrap();
1327
1328        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1329    }
1330
1331    #[test]
1332    fn test_builder_chain_multiple_steps() {
1333        let definition = RouteBuilder::from("timer:tick")
1334            .route_id("test-route")
1335            .set_header("source", Value::String("timer".into()))
1336            .filter(|ex| ex.input.header("source").is_some())
1337            .to("log:info")
1338            .end_filter()
1339            .to("mock:result")
1340            .build()
1341            .unwrap();
1342
1343        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
1344        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
1345        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
1346        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1347    }
1348
1349    // -----------------------------------------------------------------------
1350    // Processor behavior tests — exercise the real Tower services directly
1351    // -----------------------------------------------------------------------
1352
1353    #[tokio::test]
1354    async fn test_set_header_processor_works() {
1355        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1356        let exchange = Exchange::new(Message::new("test"));
1357        let result = svc.call(exchange).await.unwrap();
1358        assert_eq!(
1359            result.input.header("greeting"),
1360            Some(&Value::String("hello".into()))
1361        );
1362    }
1363
1364    #[tokio::test]
1365    async fn test_filter_processor_passes() {
1366        use camel_api::BoxProcessorExt;
1367        use camel_processor::FilterService;
1368
1369        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1370        let mut svc =
1371            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1372        let exchange = Exchange::new(Message::new("pass"));
1373        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1374        assert_eq!(result.input.body.as_text(), Some("pass"));
1375    }
1376
1377    #[tokio::test]
1378    async fn test_filter_processor_blocks() {
1379        use camel_api::BoxProcessorExt;
1380        use camel_processor::FilterService;
1381
1382        let sub = BoxProcessor::from_fn(|_ex| {
1383            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1384        });
1385        let mut svc =
1386            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1387        let exchange = Exchange::new(Message::new("reject"));
1388        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1389        assert_eq!(result.input.body.as_text(), Some("reject"));
1390    }
1391
1392    #[tokio::test]
1393    async fn test_map_body_processor_works() {
1394        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1395            if let Some(text) = body.as_text() {
1396                Body::Text(text.to_uppercase())
1397            } else {
1398                body
1399            }
1400        });
1401        let exchange = Exchange::new(Message::new("hello"));
1402        let result = mapper.oneshot(exchange).await.unwrap();
1403        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1404    }
1405
1406    #[tokio::test]
1407    async fn test_process_custom_processor_works() {
1408        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1409            ex.set_property("custom", Value::Bool(true));
1410            Ok(ex)
1411        });
1412        let exchange = Exchange::new(Message::default());
1413        let result = processor.oneshot(exchange).await.unwrap();
1414        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1415    }
1416
1417    // -----------------------------------------------------------------------
1418    // Sequential pipeline test
1419    // -----------------------------------------------------------------------
1420
1421    #[tokio::test]
1422    async fn test_compose_pipeline_runs_steps_in_order() {
1423        use camel_core::route::compose_pipeline;
1424
1425        let processors = vec![
1426            BoxProcessor::new(SetHeader::new(
1427                IdentityProcessor,
1428                "step",
1429                Value::String("one".into()),
1430            )),
1431            BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1432                if let Some(text) = body.as_text() {
1433                    Body::Text(format!("{}-processed", text))
1434                } else {
1435                    body
1436                }
1437            })),
1438        ];
1439
1440        let pipeline = compose_pipeline(processors);
1441        let exchange = Exchange::new(Message::new("hello"));
1442        let result = pipeline.oneshot(exchange).await.unwrap();
1443
1444        assert_eq!(
1445            result.input.header("step"),
1446            Some(&Value::String("one".into()))
1447        );
1448        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1449    }
1450
1451    #[tokio::test]
1452    async fn test_compose_pipeline_empty_is_identity() {
1453        use camel_core::route::compose_pipeline;
1454
1455        let pipeline = compose_pipeline(vec![]);
1456        let exchange = Exchange::new(Message::new("unchanged"));
1457        let result = pipeline.oneshot(exchange).await.unwrap();
1458        assert_eq!(result.input.body.as_text(), Some("unchanged"));
1459    }
1460
1461    // -----------------------------------------------------------------------
1462    // Circuit breaker builder tests
1463    // -----------------------------------------------------------------------
1464
1465    #[test]
1466    fn test_builder_circuit_breaker_sets_config() {
1467        use camel_api::circuit_breaker::CircuitBreakerConfig;
1468
1469        let config = CircuitBreakerConfig::new().failure_threshold(5);
1470        let definition = RouteBuilder::from("timer:tick")
1471            .route_id("test-route")
1472            .circuit_breaker(config)
1473            .build()
1474            .unwrap();
1475
1476        let cb = definition
1477            .circuit_breaker_config()
1478            .expect("circuit breaker should be set");
1479        assert_eq!(cb.failure_threshold, 5);
1480    }
1481
1482    #[test]
1483    fn test_builder_circuit_breaker_with_error_handler() {
1484        use camel_api::circuit_breaker::CircuitBreakerConfig;
1485        use camel_api::error_handler::ErrorHandlerConfig;
1486
1487        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1488        let eh_config = ErrorHandlerConfig::log_only();
1489
1490        let definition = RouteBuilder::from("timer:tick")
1491            .route_id("test-route")
1492            .to("log:info")
1493            .circuit_breaker(cb_config)
1494            .error_handler(eh_config)
1495            .build()
1496            .unwrap();
1497
1498        assert!(
1499            definition.circuit_breaker_config().is_some(),
1500            "circuit breaker config should be set"
1501        );
1502        // Route definition was built successfully with both configs.
1503    }
1504
1505    #[test]
1506    fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1507        let definition = RouteBuilder::from("direct:start")
1508            .route_id("test-route")
1509            .dead_letter_channel("log:dlc")
1510            .on_exception(|e| matches!(e, CamelError::Io(_)))
1511            .retry(3)
1512            .handled_by("log:io")
1513            .end_on_exception()
1514            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1515            .retry(1)
1516            .end_on_exception()
1517            .to("mock:out")
1518            .build()
1519            .expect("route should build");
1520
1521        let cfg = definition
1522            .error_handler_config()
1523            .expect("error handler should be set");
1524        assert_eq!(cfg.policies.len(), 2);
1525        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1526        assert_eq!(
1527            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1528            Some(3)
1529        );
1530        assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1531        assert_eq!(
1532            cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1533            Some(1)
1534        );
1535    }
1536
1537    #[test]
1538    fn test_builder_on_exception_mixed_mode_rejected() {
1539        let result = RouteBuilder::from("direct:start")
1540            .route_id("test-route")
1541            .error_handler(ErrorHandlerConfig::log_only())
1542            .on_exception(|_e| true)
1543            .end_on_exception()
1544            .to("mock:out")
1545            .build();
1546
1547        let err = result.err().expect("mixed mode should fail with an error");
1548
1549        assert!(
1550            format!("{err}").contains("mixed error handler modes"),
1551            "unexpected error: {err}"
1552        );
1553    }
1554
1555    #[test]
1556    fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1557        let definition = RouteBuilder::from("direct:start")
1558            .route_id("test-route")
1559            .on_exception(|_e| true)
1560            .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1561            .with_jitter(0.5)
1562            .end_on_exception()
1563            .to("mock:out")
1564            .build()
1565            .expect("route should build");
1566
1567        let cfg = definition
1568            .error_handler_config()
1569            .expect("error handler should be set");
1570        assert_eq!(cfg.policies.len(), 1);
1571        assert!(cfg.policies[0].retry.is_none());
1572    }
1573
1574    #[test]
1575    fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1576        let definition = RouteBuilder::from("direct:start")
1577            .route_id("test-route")
1578            .dead_letter_channel("log:dlc")
1579            .to("mock:out")
1580            .build()
1581            .expect("route should build");
1582
1583        let cfg = definition
1584            .error_handler_config()
1585            .expect("error handler should be set");
1586        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1587        assert!(cfg.policies.is_empty());
1588    }
1589
1590    #[test]
1591    fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1592        let definition = RouteBuilder::from("direct:start")
1593            .route_id("test-route")
1594            .dead_letter_channel("log:first")
1595            .on_exception(|e| matches!(e, CamelError::Io(_)))
1596            .retry(2)
1597            .end_on_exception()
1598            .dead_letter_channel("log:second")
1599            .to("mock:out")
1600            .build()
1601            .expect("route should build");
1602
1603        let cfg = definition
1604            .error_handler_config()
1605            .expect("error handler should be set");
1606        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1607        assert_eq!(cfg.policies.len(), 1);
1608        assert_eq!(
1609            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1610            Some(2)
1611        );
1612    }
1613
1614    #[test]
1615    fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
1616        let definition = RouteBuilder::from("direct:start")
1617            .route_id("test-route")
1618            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1619            .retry(1)
1620            .end_on_exception()
1621            .to("mock:out")
1622            .build()
1623            .expect("route should build");
1624
1625        let cfg = definition
1626            .error_handler_config()
1627            .expect("error handler should be set");
1628        assert!(cfg.dlc_uri.is_none());
1629        assert_eq!(cfg.policies.len(), 1);
1630    }
1631
1632    #[test]
1633    fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
1634        let first = ErrorHandlerConfig::dead_letter_channel("log:first");
1635        let second = ErrorHandlerConfig::dead_letter_channel("log:second");
1636
1637        let definition = RouteBuilder::from("direct:start")
1638            .route_id("test-route")
1639            .error_handler(first)
1640            .error_handler(second)
1641            .to("mock:out")
1642            .build()
1643            .expect("route should build");
1644
1645        let cfg = definition
1646            .error_handler_config()
1647            .expect("error handler should be set");
1648        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1649    }
1650
1651    // --- Splitter builder tests ---
1652
1653    #[test]
1654    fn test_split_builder_typestate() {
1655        use camel_api::splitter::{SplitterConfig, split_body_lines};
1656
1657        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
1658        let definition = RouteBuilder::from("timer:test?period=1000")
1659            .route_id("test-route")
1660            .split(SplitterConfig::new(split_body_lines()))
1661            .to("mock:per-fragment")
1662            .end_split()
1663            .to("mock:final")
1664            .build()
1665            .unwrap();
1666
1667        // Should have 2 top-level steps: Split + To("mock:final")
1668        assert_eq!(definition.steps().len(), 2);
1669    }
1670
1671    #[test]
1672    fn test_split_builder_steps_collected() {
1673        use camel_api::splitter::{SplitterConfig, split_body_lines};
1674
1675        let definition = RouteBuilder::from("timer:test?period=1000")
1676            .route_id("test-route")
1677            .split(SplitterConfig::new(split_body_lines()))
1678            .set_header("fragment", Value::String("yes".into()))
1679            .to("mock:per-fragment")
1680            .end_split()
1681            .build()
1682            .unwrap();
1683
1684        // Should have 1 top-level step: Split (containing 2 sub-steps)
1685        assert_eq!(definition.steps().len(), 1);
1686        match &definition.steps()[0] {
1687            BuilderStep::Split { steps, .. } => {
1688                assert_eq!(steps.len(), 2); // SetHeader + To
1689            }
1690            other => panic!("Expected Split, got {:?}", other),
1691        }
1692    }
1693
1694    #[test]
1695    fn test_split_builder_config_propagated() {
1696        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1697
1698        let definition = RouteBuilder::from("timer:test?period=1000")
1699            .route_id("test-route")
1700            .split(
1701                SplitterConfig::new(split_body_lines())
1702                    .parallel(true)
1703                    .parallel_limit(4)
1704                    .aggregation(AggregationStrategy::CollectAll),
1705            )
1706            .to("mock:per-fragment")
1707            .end_split()
1708            .build()
1709            .unwrap();
1710
1711        match &definition.steps()[0] {
1712            BuilderStep::Split { config, .. } => {
1713                assert!(config.parallel);
1714                assert_eq!(config.parallel_limit, Some(4));
1715                assert!(matches!(
1716                    config.aggregation,
1717                    AggregationStrategy::CollectAll
1718                ));
1719            }
1720            other => panic!("Expected Split, got {:?}", other),
1721        }
1722    }
1723
1724    #[test]
1725    fn test_aggregate_builder_adds_step() {
1726        use camel_api::aggregator::AggregatorConfig;
1727        use camel_core::route::BuilderStep;
1728
1729        let definition = RouteBuilder::from("timer:tick")
1730            .route_id("test-route")
1731            .aggregate(
1732                AggregatorConfig::correlate_by("key")
1733                    .complete_when_size(2)
1734                    .build(),
1735            )
1736            .build()
1737            .unwrap();
1738
1739        assert_eq!(definition.steps().len(), 1);
1740        assert!(matches!(
1741            definition.steps()[0],
1742            BuilderStep::Aggregate { .. }
1743        ));
1744    }
1745
1746    #[test]
1747    fn test_aggregate_in_split_builder() {
1748        use camel_api::aggregator::AggregatorConfig;
1749        use camel_api::splitter::{SplitterConfig, split_body_lines};
1750        use camel_core::route::BuilderStep;
1751
1752        let definition = RouteBuilder::from("timer:tick")
1753            .route_id("test-route")
1754            .split(SplitterConfig::new(split_body_lines()))
1755            .aggregate(
1756                AggregatorConfig::correlate_by("key")
1757                    .complete_when_size(1)
1758                    .build(),
1759            )
1760            .end_split()
1761            .build()
1762            .unwrap();
1763
1764        assert_eq!(definition.steps().len(), 1);
1765        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
1766            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
1767        } else {
1768            panic!("expected Split step");
1769        }
1770    }
1771
1772    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
1773
1774    #[test]
1775    fn test_builder_set_body_static_adds_processor() {
1776        let definition = RouteBuilder::from("timer:tick")
1777            .route_id("test-route")
1778            .set_body("fixed")
1779            .build()
1780            .unwrap();
1781        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1782    }
1783
1784    #[test]
1785    fn test_builder_set_body_fn_adds_processor() {
1786        let definition = RouteBuilder::from("timer:tick")
1787            .route_id("test-route")
1788            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
1789            .build()
1790            .unwrap();
1791        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1792    }
1793
1794    #[test]
1795    fn transform_alias_produces_same_as_set_body() {
1796        let route_transform = RouteBuilder::from("timer:tick")
1797            .route_id("test-route")
1798            .transform("hello")
1799            .build()
1800            .unwrap();
1801
1802        let route_set_body = RouteBuilder::from("timer:tick")
1803            .route_id("test-route")
1804            .set_body("hello")
1805            .build()
1806            .unwrap();
1807
1808        assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
1809    }
1810
1811    #[test]
1812    fn test_builder_set_header_fn_adds_processor() {
1813        let definition = RouteBuilder::from("timer:tick")
1814            .route_id("test-route")
1815            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
1816            .build()
1817            .unwrap();
1818        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1819    }
1820
1821    #[tokio::test]
1822    async fn test_set_body_static_processor_works() {
1823        use camel_core::route::compose_pipeline;
1824        let def = RouteBuilder::from("t:t")
1825            .route_id("test-route")
1826            .set_body("replaced")
1827            .build()
1828            .unwrap();
1829        let pipeline = compose_pipeline(
1830            def.steps()
1831                .iter()
1832                .filter_map(|s| {
1833                    if let BuilderStep::Processor(p) = s {
1834                        Some(p.clone())
1835                    } else {
1836                        None
1837                    }
1838                })
1839                .collect(),
1840        );
1841        let exchange = Exchange::new(Message::new("original"));
1842        let result = pipeline.oneshot(exchange).await.unwrap();
1843        assert_eq!(result.input.body.as_text(), Some("replaced"));
1844    }
1845
1846    #[tokio::test]
1847    async fn test_set_body_fn_processor_works() {
1848        use camel_core::route::compose_pipeline;
1849        let def = RouteBuilder::from("t:t")
1850            .route_id("test-route")
1851            .set_body_fn(|ex: &Exchange| {
1852                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
1853            })
1854            .build()
1855            .unwrap();
1856        let pipeline = compose_pipeline(
1857            def.steps()
1858                .iter()
1859                .filter_map(|s| {
1860                    if let BuilderStep::Processor(p) = s {
1861                        Some(p.clone())
1862                    } else {
1863                        None
1864                    }
1865                })
1866                .collect(),
1867        );
1868        let exchange = Exchange::new(Message::new("hello"));
1869        let result = pipeline.oneshot(exchange).await.unwrap();
1870        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1871    }
1872
1873    #[tokio::test]
1874    async fn test_set_header_fn_processor_works() {
1875        use camel_core::route::compose_pipeline;
1876        let def = RouteBuilder::from("t:t")
1877            .route_id("test-route")
1878            .set_header_fn("echo", |ex: &Exchange| {
1879                ex.input
1880                    .body
1881                    .as_text()
1882                    .map(|t| Value::String(t.into()))
1883                    .unwrap_or(Value::Null)
1884            })
1885            .build()
1886            .unwrap();
1887        let pipeline = compose_pipeline(
1888            def.steps()
1889                .iter()
1890                .filter_map(|s| {
1891                    if let BuilderStep::Processor(p) = s {
1892                        Some(p.clone())
1893                    } else {
1894                        None
1895                    }
1896                })
1897                .collect(),
1898        );
1899        let exchange = Exchange::new(Message::new("ping"));
1900        let result = pipeline.oneshot(exchange).await.unwrap();
1901        assert_eq!(
1902            result.input.header("echo"),
1903            Some(&Value::String("ping".into()))
1904        );
1905    }
1906
1907    // ── FilterBuilder typestate tests ─────────────────────────────────────
1908
1909    #[test]
1910    fn test_filter_builder_typestate() {
1911        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1912            .route_id("test-route")
1913            .filter(|_ex| true)
1914            .to("mock:inner")
1915            .end_filter()
1916            .to("mock:outer")
1917            .build();
1918        assert!(result.is_ok());
1919    }
1920
1921    #[test]
1922    fn test_filter_builder_steps_collected() {
1923        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1924            .route_id("test-route")
1925            .filter(|_ex| true)
1926            .to("mock:inner")
1927            .end_filter()
1928            .build()
1929            .unwrap();
1930
1931        assert_eq!(definition.steps().len(), 1);
1932        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1933    }
1934
1935    #[test]
1936    fn test_wire_tap_builder_adds_step() {
1937        let definition = RouteBuilder::from("timer:tick")
1938            .route_id("test-route")
1939            .wire_tap("mock:tap")
1940            .to("mock:result")
1941            .build()
1942            .unwrap();
1943
1944        assert_eq!(definition.steps().len(), 2);
1945        assert!(
1946            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
1947        );
1948        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
1949    }
1950
1951    // ── MulticastBuilder typestate tests ─────────────────────────────────────
1952
1953    #[test]
1954    fn test_multicast_builder_typestate() {
1955        let definition = RouteBuilder::from("timer:tick")
1956            .route_id("test-route")
1957            .multicast()
1958            .to("direct:a")
1959            .to("direct:b")
1960            .end_multicast()
1961            .to("mock:result")
1962            .build()
1963            .unwrap();
1964
1965        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
1966    }
1967
1968    #[test]
1969    fn test_multicast_builder_steps_collected() {
1970        let definition = RouteBuilder::from("timer:tick")
1971            .route_id("test-route")
1972            .multicast()
1973            .to("direct:a")
1974            .to("direct:b")
1975            .end_multicast()
1976            .build()
1977            .unwrap();
1978
1979        match &definition.steps()[0] {
1980            BuilderStep::Multicast { steps, .. } => {
1981                assert_eq!(steps.len(), 2);
1982            }
1983            other => panic!("Expected Multicast, got {:?}", other),
1984        }
1985    }
1986
1987    // ── Concurrency builder tests ─────────────────────────────────────
1988
1989    #[test]
1990    fn test_builder_concurrent_sets_concurrency() {
1991        use camel_component::ConcurrencyModel;
1992
1993        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1994            .route_id("test-route")
1995            .concurrent(16)
1996            .to("log:info")
1997            .build()
1998            .unwrap();
1999
2000        assert_eq!(
2001            definition.concurrency_override(),
2002            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2003        );
2004    }
2005
2006    #[test]
2007    fn test_builder_concurrent_zero_means_unbounded() {
2008        use camel_component::ConcurrencyModel;
2009
2010        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2011            .route_id("test-route")
2012            .concurrent(0)
2013            .to("log:info")
2014            .build()
2015            .unwrap();
2016
2017        assert_eq!(
2018            definition.concurrency_override(),
2019            Some(&ConcurrencyModel::Concurrent { max: None })
2020        );
2021    }
2022
2023    #[test]
2024    fn test_builder_sequential_sets_concurrency() {
2025        use camel_component::ConcurrencyModel;
2026
2027        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2028            .route_id("test-route")
2029            .sequential()
2030            .to("log:info")
2031            .build()
2032            .unwrap();
2033
2034        assert_eq!(
2035            definition.concurrency_override(),
2036            Some(&ConcurrencyModel::Sequential)
2037        );
2038    }
2039
2040    #[test]
2041    fn test_builder_default_concurrency_is_none() {
2042        let definition = RouteBuilder::from("timer:tick")
2043            .route_id("test-route")
2044            .to("log:info")
2045            .build()
2046            .unwrap();
2047
2048        assert_eq!(definition.concurrency_override(), None);
2049    }
2050
2051    // ── Route lifecycle builder tests ─────────────────────────────────────
2052
2053    #[test]
2054    fn test_builder_route_id_sets_id() {
2055        let definition = RouteBuilder::from("timer:tick")
2056            .route_id("my-route")
2057            .build()
2058            .unwrap();
2059
2060        assert_eq!(definition.route_id(), "my-route");
2061    }
2062
2063    #[test]
2064    fn test_build_without_route_id_fails() {
2065        let result = RouteBuilder::from("timer:tick?period=1000")
2066            .to("log:info")
2067            .build();
2068        let err = match result {
2069            Err(e) => e.to_string(),
2070            Ok(_) => panic!("build() should fail without route_id"),
2071        };
2072        assert!(
2073            err.contains("route_id"),
2074            "error should mention route_id, got: {}",
2075            err
2076        );
2077    }
2078
2079    #[test]
2080    fn test_builder_auto_startup_false() {
2081        let definition = RouteBuilder::from("timer:tick")
2082            .route_id("test-route")
2083            .auto_startup(false)
2084            .build()
2085            .unwrap();
2086
2087        assert!(!definition.auto_startup());
2088    }
2089
2090    #[test]
2091    fn test_builder_startup_order_custom() {
2092        let definition = RouteBuilder::from("timer:tick")
2093            .route_id("test-route")
2094            .startup_order(50)
2095            .build()
2096            .unwrap();
2097
2098        assert_eq!(definition.startup_order(), 50);
2099    }
2100
2101    #[test]
2102    fn test_builder_defaults() {
2103        let definition = RouteBuilder::from("timer:tick")
2104            .route_id("test-route")
2105            .build()
2106            .unwrap();
2107
2108        assert_eq!(definition.route_id(), "test-route");
2109        assert!(definition.auto_startup());
2110        assert_eq!(definition.startup_order(), 1000);
2111    }
2112
2113    // ── Choice typestate tests ──────────────────────────────────────────────────
2114
2115    #[test]
2116    fn test_choice_builder_single_when() {
2117        let definition = RouteBuilder::from("timer:tick")
2118            .route_id("test-route")
2119            .choice()
2120            .when(|ex: &Exchange| ex.input.header("type").is_some())
2121            .to("mock:typed")
2122            .end_when()
2123            .end_choice()
2124            .build()
2125            .unwrap();
2126        assert_eq!(definition.steps().len(), 1);
2127        assert!(
2128            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2129            if whens.len() == 1 && otherwise.is_none())
2130        );
2131    }
2132
2133    #[test]
2134    fn test_choice_builder_when_otherwise() {
2135        let definition = RouteBuilder::from("timer:tick")
2136            .route_id("test-route")
2137            .choice()
2138            .when(|ex: &Exchange| ex.input.header("a").is_some())
2139            .to("mock:a")
2140            .end_when()
2141            .otherwise()
2142            .to("mock:fallback")
2143            .end_otherwise()
2144            .end_choice()
2145            .build()
2146            .unwrap();
2147        assert!(
2148            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2149            if whens.len() == 1 && otherwise.is_some())
2150        );
2151    }
2152
2153    #[test]
2154    fn test_choice_builder_multiple_whens() {
2155        let definition = RouteBuilder::from("timer:tick")
2156            .route_id("test-route")
2157            .choice()
2158            .when(|ex: &Exchange| ex.input.header("a").is_some())
2159            .to("mock:a")
2160            .end_when()
2161            .when(|ex: &Exchange| ex.input.header("b").is_some())
2162            .to("mock:b")
2163            .end_when()
2164            .end_choice()
2165            .build()
2166            .unwrap();
2167        assert!(
2168            matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2169            if whens.len() == 2)
2170        );
2171    }
2172
2173    #[test]
2174    fn test_choice_step_after_choice() {
2175        // Steps after end_choice() are added to the outer pipeline, not inside choice.
2176        let definition = RouteBuilder::from("timer:tick")
2177            .route_id("test-route")
2178            .choice()
2179            .when(|_ex: &Exchange| true)
2180            .to("mock:inner")
2181            .end_when()
2182            .end_choice()
2183            .to("mock:outer") // must be step[1], not inside choice
2184            .build()
2185            .unwrap();
2186        assert_eq!(definition.steps().len(), 2);
2187        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2188    }
2189
2190    // ── Throttle typestate tests ──────────────────────────────────────────────────
2191
2192    #[test]
2193    fn test_throttle_builder_typestate() {
2194        let definition = RouteBuilder::from("timer:tick")
2195            .route_id("test-route")
2196            .throttle(10, std::time::Duration::from_secs(1))
2197            .to("mock:result")
2198            .end_throttle()
2199            .build()
2200            .unwrap();
2201
2202        assert_eq!(definition.steps().len(), 1);
2203        assert!(matches!(
2204            &definition.steps()[0],
2205            BuilderStep::Throttle { .. }
2206        ));
2207    }
2208
2209    #[test]
2210    fn test_throttle_builder_with_strategy() {
2211        let definition = RouteBuilder::from("timer:tick")
2212            .route_id("test-route")
2213            .throttle(10, std::time::Duration::from_secs(1))
2214            .strategy(ThrottleStrategy::Reject)
2215            .to("mock:result")
2216            .end_throttle()
2217            .build()
2218            .unwrap();
2219
2220        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2221            assert_eq!(config.strategy, ThrottleStrategy::Reject);
2222        } else {
2223            panic!("Expected Throttle step");
2224        }
2225    }
2226
2227    #[test]
2228    fn test_throttle_builder_steps_collected() {
2229        let definition = RouteBuilder::from("timer:tick")
2230            .route_id("test-route")
2231            .throttle(5, std::time::Duration::from_secs(1))
2232            .set_header("throttled", Value::Bool(true))
2233            .to("mock:throttled")
2234            .end_throttle()
2235            .build()
2236            .unwrap();
2237
2238        match &definition.steps()[0] {
2239            BuilderStep::Throttle { steps, .. } => {
2240                assert_eq!(steps.len(), 2); // SetHeader + To
2241            }
2242            other => panic!("Expected Throttle, got {:?}", other),
2243        }
2244    }
2245
2246    #[test]
2247    fn test_throttle_step_after_throttle() {
2248        // Steps after end_throttle() are added to the outer pipeline, not inside throttle.
2249        let definition = RouteBuilder::from("timer:tick")
2250            .route_id("test-route")
2251            .throttle(10, std::time::Duration::from_secs(1))
2252            .to("mock:inner")
2253            .end_throttle()
2254            .to("mock:outer")
2255            .build()
2256            .unwrap();
2257
2258        assert_eq!(definition.steps().len(), 2);
2259        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2260    }
2261
2262    // ── LoadBalance typestate tests ──────────────────────────────────────────────────
2263
2264    #[test]
2265    fn test_load_balance_builder_typestate() {
2266        let definition = RouteBuilder::from("timer:tick")
2267            .route_id("test-route")
2268            .load_balance()
2269            .round_robin()
2270            .to("mock:a")
2271            .to("mock:b")
2272            .end_load_balance()
2273            .build()
2274            .unwrap();
2275
2276        assert_eq!(definition.steps().len(), 1);
2277        assert!(matches!(
2278            &definition.steps()[0],
2279            BuilderStep::LoadBalance { .. }
2280        ));
2281    }
2282
2283    #[test]
2284    fn test_load_balance_builder_with_strategy() {
2285        let definition = RouteBuilder::from("timer:tick")
2286            .route_id("test-route")
2287            .load_balance()
2288            .random()
2289            .to("mock:result")
2290            .end_load_balance()
2291            .build()
2292            .unwrap();
2293
2294        if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2295            assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2296        } else {
2297            panic!("Expected LoadBalance step");
2298        }
2299    }
2300
2301    #[test]
2302    fn test_load_balance_builder_steps_collected() {
2303        let definition = RouteBuilder::from("timer:tick")
2304            .route_id("test-route")
2305            .load_balance()
2306            .set_header("lb", Value::Bool(true))
2307            .to("mock:a")
2308            .end_load_balance()
2309            .build()
2310            .unwrap();
2311
2312        match &definition.steps()[0] {
2313            BuilderStep::LoadBalance { steps, .. } => {
2314                assert_eq!(steps.len(), 2); // SetHeader + To
2315            }
2316            other => panic!("Expected LoadBalance, got {:?}", other),
2317        }
2318    }
2319
2320    #[test]
2321    fn test_load_balance_step_after_load_balance() {
2322        // Steps after end_load_balance() are added to the outer pipeline, not inside load_balance.
2323        let definition = RouteBuilder::from("timer:tick")
2324            .route_id("test-route")
2325            .load_balance()
2326            .to("mock:inner")
2327            .end_load_balance()
2328            .to("mock:outer")
2329            .build()
2330            .unwrap();
2331
2332        assert_eq!(definition.steps().len(), 2);
2333        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2334    }
2335
2336    // ── DynamicRouter typestate tests ──────────────────────────────────────────────────
2337
2338    #[test]
2339    fn test_dynamic_router_builder() {
2340        let definition = RouteBuilder::from("timer:tick")
2341            .route_id("test-route")
2342            .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2343            .build()
2344            .unwrap();
2345
2346        assert_eq!(definition.steps().len(), 1);
2347        assert!(matches!(
2348            &definition.steps()[0],
2349            BuilderStep::DynamicRouter { .. }
2350        ));
2351    }
2352
2353    #[test]
2354    fn test_dynamic_router_builder_with_config() {
2355        let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2356            .max_iterations(100)
2357            .cache_size(500);
2358
2359        let definition = RouteBuilder::from("timer:tick")
2360            .route_id("test-route")
2361            .dynamic_router_with_config(config)
2362            .build()
2363            .unwrap();
2364
2365        assert_eq!(definition.steps().len(), 1);
2366        if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2367            assert_eq!(config.max_iterations, 100);
2368            assert_eq!(config.cache_size, 500);
2369        } else {
2370            panic!("Expected DynamicRouter step");
2371        }
2372    }
2373
2374    #[test]
2375    fn test_dynamic_router_step_after_router() {
2376        // Steps after dynamic_router() are added to the outer pipeline.
2377        let definition = RouteBuilder::from("timer:tick")
2378            .route_id("test-route")
2379            .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2380            .to("mock:outer")
2381            .build()
2382            .unwrap();
2383
2384        assert_eq!(definition.steps().len(), 2);
2385        assert!(matches!(
2386            &definition.steps()[0],
2387            BuilderStep::DynamicRouter { .. }
2388        ));
2389        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2390    }
2391
2392    #[test]
2393    fn routing_slip_builder_creates_step() {
2394        use camel_api::RoutingSlipExpression;
2395
2396        let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2397
2398        let route = RouteBuilder::from("direct:start")
2399            .route_id("routing-slip-test")
2400            .routing_slip(expression)
2401            .build()
2402            .unwrap();
2403
2404        assert!(
2405            matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2406            "Expected RoutingSlip step"
2407        );
2408    }
2409
2410    #[test]
2411    fn routing_slip_with_config_builder_creates_step() {
2412        use camel_api::RoutingSlipConfig;
2413
2414        let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2415            .uri_delimiter("|")
2416            .cache_size(50)
2417            .ignore_invalid_endpoints(true);
2418
2419        let route = RouteBuilder::from("direct:start")
2420            .route_id("routing-slip-config-test")
2421            .routing_slip_with_config(config)
2422            .build()
2423            .unwrap();
2424
2425        if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2426            assert_eq!(config.uri_delimiter, "|");
2427            assert_eq!(config.cache_size, 50);
2428            assert!(config.ignore_invalid_endpoints);
2429        } else {
2430            panic!("Expected RoutingSlip step");
2431        }
2432    }
2433
2434    #[test]
2435    fn test_builder_marshal_adds_processor_step() {
2436        let definition = RouteBuilder::from("timer:tick")
2437            .route_id("test-route")
2438            .marshal("json")
2439            .build()
2440            .unwrap();
2441        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2442    }
2443
2444    #[test]
2445    fn test_builder_unmarshal_adds_processor_step() {
2446        let definition = RouteBuilder::from("timer:tick")
2447            .route_id("test-route")
2448            .unmarshal("json")
2449            .build()
2450            .unwrap();
2451        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2452    }
2453
2454    #[test]
2455    #[should_panic(expected = "unknown data format: 'csv'")]
2456    fn test_builder_marshal_panics_on_unknown_format() {
2457        let _ = RouteBuilder::from("timer:tick")
2458            .route_id("test-route")
2459            .marshal("csv")
2460            .build();
2461    }
2462
2463    #[test]
2464    #[should_panic(expected = "unknown data format: 'csv'")]
2465    fn test_builder_unmarshal_panics_on_unknown_format() {
2466        let _ = RouteBuilder::from("timer:tick")
2467            .route_id("test-route")
2468            .unmarshal("csv")
2469            .build();
2470    }
2471}