Skip to main content

camel_builder/
lib.rs

1use camel_api::aggregator::{AggregationStrategy, AggregatorConfig, CompletionCondition};
2use camel_api::body::Body;
3use camel_api::body_converter::BodyType;
4use camel_api::circuit_breaker::CircuitBreakerConfig;
5use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
6use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
7use camel_api::load_balancer::LoadBalancerConfig;
8use camel_api::multicast::{MulticastConfig, MulticastStrategy};
9use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
10use camel_api::splitter::SplitterConfig;
11use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
12use camel_api::{
13    BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
14    ProcessorFn, Value,
15    runtime::{
16        CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
17        CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
18        CanonicalWhenSpec,
19    },
20};
21use camel_component::ConcurrencyModel;
22use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
23use camel_processor::{
24    ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
25    UnmarshalService, builtin_data_format,
26};
27
28/// Shared step-accumulation methods for all builder types.
29///
30/// Implementors provide `steps_mut()` and get step-adding methods for free.
31/// `filter()` and other branching methods are NOT included — they return
32/// different types per builder and stay as per-builder methods.
33pub trait StepAccumulator: Sized {
34    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
35
36    fn to(mut self, endpoint: impl Into<String>) -> Self {
37        self.steps_mut().push(BuilderStep::To(endpoint.into()));
38        self
39    }
40
41    fn process<F, Fut>(mut self, f: F) -> Self
42    where
43        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
44        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
45    {
46        let svc = ProcessorFn::new(f);
47        self.steps_mut()
48            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
49        self
50    }
51
52    fn process_fn(mut self, processor: BoxProcessor) -> Self {
53        self.steps_mut().push(BuilderStep::Processor(processor));
54        self
55    }
56
57    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
58        let svc = SetHeader::new(IdentityProcessor, key, value);
59        self.steps_mut()
60            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
61        self
62    }
63
64    fn map_body<F>(mut self, mapper: F) -> Self
65    where
66        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
67    {
68        let svc = MapBody::new(IdentityProcessor, mapper);
69        self.steps_mut()
70            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
71        self
72    }
73
74    fn set_body<B>(mut self, body: B) -> Self
75    where
76        B: Into<Body> + Clone + Send + Sync + 'static,
77    {
78        let body: Body = body.into();
79        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
80        self.steps_mut()
81            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
82        self
83    }
84
85    /// Apache Camel-compatible alias for [`set_body`](Self::set_body).
86    ///
87    /// Transforms the message body using the given value. Semantically identical
88    /// to `set_body` — provided for familiarity with Apache Camel route DSLs.
89    fn transform<B>(self, body: B) -> Self
90    where
91        B: Into<Body> + Clone + Send + Sync + 'static,
92    {
93        self.set_body(body)
94    }
95
96    fn set_body_fn<F>(mut self, expr: F) -> Self
97    where
98        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
99    {
100        let svc = SetBody::new(IdentityProcessor, expr);
101        self.steps_mut()
102            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
103        self
104    }
105
106    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
107    where
108        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
109    {
110        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
111        self.steps_mut()
112            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
113        self
114    }
115
116    fn aggregate(mut self, config: AggregatorConfig) -> Self {
117        self.steps_mut().push(BuilderStep::Aggregate { config });
118        self
119    }
120
121    /// Stop processing this exchange immediately. No further steps in the
122    /// current pipeline will run.
123    ///
124    /// Can be used at any point in the route: directly on RouteBuilder,
125    /// inside `.filter()`, inside `.split()`, etc.
126    fn stop(mut self) -> Self {
127        self.steps_mut().push(BuilderStep::Stop);
128        self
129    }
130
131    /// Log a message at the specified level.
132    ///
133    /// The message will be logged when an exchange passes through this step.
134    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
135        self.steps_mut().push(BuilderStep::Log {
136            level,
137            message: message.into(),
138        });
139        self
140    }
141
142    /// Convert the message body to the target type.
143    ///
144    /// Supported: Text ↔ Json ↔ Bytes. `Body::Stream` always fails.
145    /// Returns `TypeConversionFailed` if conversion is not possible.
146    ///
147    /// # Example
148    /// ```ignore
149    /// route.set_body(Value::String(r#"{"x":1}"#.into()))
150    ///      .convert_body_to(BodyType::Json)
151    ///      .to("direct:next")
152    /// ```
153    fn convert_body_to(mut self, target: BodyType) -> Self {
154        let svc = ConvertBodyTo::new(IdentityProcessor, target);
155        self.steps_mut()
156            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
157        self
158    }
159
160    /// Marshal the message body using the specified data format.
161    ///
162    /// Supported formats: `"json"`, `"xml"`. Panics if the format name is unknown.
163    /// Converts a structured body (e.g., `Body::Json`) to a wire-format body (e.g., `Body::Text`).
164    ///
165    /// # Example
166    /// ```ignore
167    /// route.marshal("json").to("direct:next")
168    /// ```
169    fn marshal(mut self, format: impl Into<String>) -> Self {
170        let name = format.into();
171        let df =
172            builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
173        let svc = MarshalService::new(IdentityProcessor, df);
174        self.steps_mut()
175            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
176        self
177    }
178
179    /// Unmarshal the message body using the specified data format.
180    ///
181    /// Supported formats: `"json"`, `"xml"`. Panics if the format name is unknown.
182    /// Converts a wire-format body (e.g., `Body::Text`) to a structured body (e.g., `Body::Json`).
183    ///
184    /// # Example
185    /// ```ignore
186    /// route.unmarshal("json").to("direct:next")
187    /// ```
188    fn unmarshal(mut self, format: impl Into<String>) -> Self {
189        let name = format.into();
190        let df =
191            builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
192        let svc = UnmarshalService::new(IdentityProcessor, df);
193        self.steps_mut()
194            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
195        self
196    }
197
198    /// Execute a script that can modify the exchange (headers, properties, body).
199    ///
200    /// The script has access to `headers`, `properties`, and `body` variables
201    /// and can modify them with assignment syntax: `headers["k"] = v`.
202    ///
203    /// # Example
204    /// ```ignore
205    /// // ignore: requires full CamelContext setup with registered language
206    /// route.script("rhai", r#"headers["tenant"] = "acme"; body = body + "_processed""#)
207    /// ```
208    fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
209        self.steps_mut().push(BuilderStep::Script {
210            language: language.into(),
211            script: script.into(),
212        });
213        self
214    }
215}
216
217/// A fluent builder for constructing routes.
218///
219/// # Example
220///
221/// ```ignore
222/// let definition = RouteBuilder::from("timer:tick?period=1000")
223///     .set_header("source", Value::String("timer".into()))
224///     .filter(|ex| ex.input.body.as_text().is_some())
225///     .to("log:info?showHeaders=true")
226///     .build()?;
227/// ```
228pub struct RouteBuilder {
229    from_uri: String,
230    steps: Vec<BuilderStep>,
231    error_handler: Option<ErrorHandlerConfig>,
232    error_handler_mode: ErrorHandlerMode,
233    circuit_breaker_config: Option<CircuitBreakerConfig>,
234    concurrency: Option<ConcurrencyModel>,
235    route_id: Option<String>,
236    auto_startup: Option<bool>,
237    startup_order: Option<i32>,
238}
239
240#[derive(Default)]
241enum ErrorHandlerMode {
242    #[default]
243    None,
244    ExplicitConfig,
245    Shorthand {
246        dlc_uri: Option<String>,
247        specs: Vec<OnExceptionSpec>,
248    },
249    Mixed,
250}
251
252#[derive(Clone)]
253struct OnExceptionSpec {
254    matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
255    retry: Option<RedeliveryPolicy>,
256    handled_by: Option<String>,
257}
258
259impl RouteBuilder {
260    /// Start building a route from the given source endpoint URI.
261    pub fn from(endpoint: &str) -> Self {
262        Self {
263            from_uri: endpoint.to_string(),
264            steps: Vec::new(),
265            error_handler: None,
266            error_handler_mode: ErrorHandlerMode::None,
267            circuit_breaker_config: None,
268            concurrency: None,
269            route_id: None,
270            auto_startup: None,
271            startup_order: None,
272        }
273    }
274
275    /// Open a filter scope. Only exchanges matching `predicate` will be processed
276    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
277    /// and continue to steps after `.end_filter()`.
278    pub fn filter<F>(self, predicate: F) -> FilterBuilder
279    where
280        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
281    {
282        FilterBuilder {
283            parent: self,
284            predicate: std::sync::Arc::new(predicate),
285            steps: vec![],
286        }
287    }
288
289    /// Open a choice scope for content-based routing.
290    ///
291    /// Within the choice, you can define multiple `.when()` clauses and an
292    /// optional `.otherwise()` clause. The first matching `when` predicate
293    /// determines which sub-pipeline executes.
294    pub fn choice(self) -> ChoiceBuilder {
295        ChoiceBuilder {
296            parent: self,
297            whens: vec![],
298            _otherwise: None,
299        }
300    }
301
302    /// Add a WireTap step that sends a clone of the exchange to the given
303    /// endpoint URI (fire-and-forget). The original exchange continues
304    /// downstream unchanged.
305    pub fn wire_tap(mut self, endpoint: &str) -> Self {
306        self.steps.push(BuilderStep::WireTap {
307            uri: endpoint.to_string(),
308        });
309        self
310    }
311
312    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
313    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
314        self.error_handler_mode = match self.error_handler_mode {
315            ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
316                ErrorHandlerMode::ExplicitConfig
317            }
318            ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
319        };
320        self.error_handler = Some(config);
321        self
322    }
323
324    /// Set a dead letter channel URI for shorthand error handler mode.
325    pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
326        let uri = uri.into();
327        self.error_handler_mode = match self.error_handler_mode {
328            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
329                dlc_uri: Some(uri),
330                specs: Vec::new(),
331            },
332            ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
333                dlc_uri: Some(uri),
334                specs,
335            },
336            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
337        };
338        self
339    }
340
341    /// Add a shorthand exception policy scope. Call `.end_on_exception()` to return to route builder.
342    pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
343    where
344        F: Fn(&CamelError) -> bool + Send + Sync + 'static,
345    {
346        self.error_handler_mode = match self.error_handler_mode {
347            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
348                dlc_uri: None,
349                specs: Vec::new(),
350            },
351            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
352            shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
353        };
354
355        OnExceptionBuilder {
356            parent: self,
357            policy: OnExceptionSpec {
358                matches: std::sync::Arc::new(matches),
359                retry: None,
360                handled_by: None,
361            },
362        }
363    }
364
365    /// Set a circuit breaker for this route.
366    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
367        self.circuit_breaker_config = Some(config);
368        self
369    }
370
371    /// Override the consumer's default concurrency model.
372    ///
373    /// When set, the pipeline spawns a task per exchange, processing them
374    /// concurrently. `max` limits the number of simultaneously active
375    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
376    ///
377    /// # Example
378    /// ```ignore
379    /// RouteBuilder::from("http://0.0.0.0:8080/api")
380    ///     .concurrent(16)  // max 16 in-flight pipeline executions
381    ///     .process(handle_request)
382    ///     .build()
383    /// ```
384    pub fn concurrent(mut self, max: usize) -> Self {
385        let max = if max == 0 { None } else { Some(max) };
386        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
387        self
388    }
389
390    /// Force sequential processing, overriding a concurrent-capable consumer.
391    ///
392    /// Useful for HTTP routes that mutate shared state and need ordering
393    /// guarantees.
394    pub fn sequential(mut self) -> Self {
395        self.concurrency = Some(ConcurrencyModel::Sequential);
396        self
397    }
398
399    /// Set the route ID for this route.
400    ///
401    /// If not set, the route will be assigned an auto-generated ID.
402    pub fn route_id(mut self, id: impl Into<String>) -> Self {
403        self.route_id = Some(id.into());
404        self
405    }
406
407    /// Set whether this route should automatically start when the context starts.
408    ///
409    /// Default is `true`.
410    pub fn auto_startup(mut self, auto: bool) -> Self {
411        self.auto_startup = Some(auto);
412        self
413    }
414
415    /// Set the startup order for this route.
416    ///
417    /// Routes with lower values start first. Default is 1000.
418    pub fn startup_order(mut self, order: i32) -> Self {
419        self.startup_order = Some(order);
420        self
421    }
422
423    /// Begin a Splitter sub-pipeline. Steps added after this call (until
424    /// `.end_split()`) will be executed per-fragment.
425    ///
426    /// Returns a `SplitBuilder` — you cannot call `.build()` until
427    /// `.end_split()` closes the split scope (enforced by the type system).
428    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
429        SplitBuilder {
430            parent: self,
431            config,
432            steps: Vec::new(),
433        }
434    }
435
436    /// Begin a Multicast sub-pipeline. Steps added after this call (until
437    /// `.end_multicast()`) will each receive a copy of the exchange.
438    ///
439    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
440    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
441    pub fn multicast(self) -> MulticastBuilder {
442        MulticastBuilder {
443            parent: self,
444            steps: Vec::new(),
445            config: MulticastConfig::new(),
446        }
447    }
448
449    /// Begin a Throttle sub-pipeline. Rate limits message processing to at most
450    /// `max_requests` per `period`. Steps inside the throttle scope are only
451    /// executed when the rate limit allows.
452    ///
453    /// Returns a `ThrottleBuilder` — you cannot call `.build()` until
454    /// `.end_throttle()` closes the throttle scope (enforced by the type system).
455    pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
456        ThrottleBuilder {
457            parent: self,
458            config: ThrottlerConfig::new(max_requests, period),
459            steps: Vec::new(),
460        }
461    }
462
463    /// Begin a LoadBalance sub-pipeline. Distributes exchanges across multiple
464    /// endpoints using a configurable strategy (round-robin, random, weighted, failover).
465    ///
466    /// Returns a `LoadBalancerBuilder` — you cannot call `.build()` until
467    /// `.end_load_balance()` closes the load balance scope (enforced by the type system).
468    pub fn load_balance(self) -> LoadBalancerBuilder {
469        LoadBalancerBuilder {
470            parent: self,
471            config: LoadBalancerConfig::round_robin(),
472            steps: Vec::new(),
473        }
474    }
475
476    /// Add a dynamic router step that routes exchanges dynamically based on
477    /// expression evaluation at runtime.
478    ///
479    /// The expression receives the exchange and returns `Some(uri)` to route to
480    /// the next endpoint, or `None` to stop routing.
481    ///
482    /// # Example
483    /// ```ignore
484    /// RouteBuilder::from("timer:tick")
485    ///     .route_id("test-route")
486    ///     .dynamic_router(|ex| {
487    ///         ex.input.header("dest").and_then(|v| v.as_str().map(|s| s.to_string()))
488    ///     })
489    ///     .build()
490    /// ```
491    pub fn dynamic_router(self, expression: RouterExpression) -> Self {
492        self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
493    }
494
495    /// Add a dynamic router step with full configuration.
496    ///
497    /// Allows customization of URI delimiter, cache size, timeout, and other options.
498    pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
499        self.steps.push(BuilderStep::DynamicRouter { config });
500        self
501    }
502
503    pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
504        self.routing_slip_with_config(RoutingSlipConfig::new(expression))
505    }
506
507    pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
508        self.steps.push(BuilderStep::RoutingSlip { config });
509        self
510    }
511
512    /// Consume the builder and produce a [`RouteDefinition`].
513    pub fn build(self) -> Result<RouteDefinition, CamelError> {
514        if self.from_uri.is_empty() {
515            return Err(CamelError::RouteError(
516                "route must have a 'from' URI".to_string(),
517            ));
518        }
519        let route_id = self.route_id.ok_or_else(|| {
520            CamelError::RouteError(
521                "route must have a 'route_id' — call .route_id(\"name\") on the builder"
522                    .to_string(),
523            )
524        })?;
525        let resolved_error_handler = match self.error_handler_mode {
526            ErrorHandlerMode::None => self.error_handler,
527            ErrorHandlerMode::ExplicitConfig => self.error_handler,
528            ErrorHandlerMode::Mixed => {
529                return Err(CamelError::RouteError(
530                    "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
531                ));
532            }
533            ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
534                let mut config = if let Some(uri) = dlc_uri {
535                    ErrorHandlerConfig::dead_letter_channel(uri)
536                } else {
537                    ErrorHandlerConfig::log_only()
538                };
539
540                for spec in specs {
541                    let matcher = spec.matches.clone();
542                    let mut builder = config.on_exception(move |e| matcher(e));
543
544                    if let Some(retry) = spec.retry {
545                        builder = builder.retry(retry.max_attempts).with_backoff(
546                            retry.initial_delay,
547                            retry.multiplier,
548                            retry.max_delay,
549                        );
550                        if retry.jitter_factor > 0.0 {
551                            builder = builder.with_jitter(retry.jitter_factor);
552                        }
553                    }
554
555                    if let Some(uri) = spec.handled_by {
556                        builder = builder.handled_by(uri);
557                    }
558
559                    config = builder.build();
560                }
561
562                Some(config)
563            }
564        };
565
566        let definition = RouteDefinition::new(self.from_uri, self.steps);
567        let definition = if let Some(eh) = resolved_error_handler {
568            definition.with_error_handler(eh)
569        } else {
570            definition
571        };
572        let definition = if let Some(cb) = self.circuit_breaker_config {
573            definition.with_circuit_breaker(cb)
574        } else {
575            definition
576        };
577        let definition = if let Some(concurrency) = self.concurrency {
578            definition.with_concurrency(concurrency)
579        } else {
580            definition
581        };
582        let definition = definition.with_route_id(route_id);
583        let definition = if let Some(auto) = self.auto_startup {
584            definition.with_auto_startup(auto)
585        } else {
586            definition
587        };
588        let definition = if let Some(order) = self.startup_order {
589            definition.with_startup_order(order)
590        } else {
591            definition
592        };
593        Ok(definition)
594    }
595
596    /// Compile this builder route into canonical v1 spec.
597    pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
598        if self.from_uri.is_empty() {
599            return Err(CamelError::RouteError(
600                "route must have a 'from' URI".to_string(),
601            ));
602        }
603        let route_id = self.route_id.ok_or_else(|| {
604            CamelError::RouteError(
605                "route must have a 'route_id' — call .route_id(\"name\") on the builder"
606                    .to_string(),
607            )
608        })?;
609
610        let steps = canonicalize_steps(self.steps)?;
611        let circuit_breaker = self
612            .circuit_breaker_config
613            .map(canonicalize_circuit_breaker);
614
615        let spec = CanonicalRouteSpec {
616            route_id,
617            from: self.from_uri,
618            steps,
619            circuit_breaker,
620            version: camel_api::CANONICAL_CONTRACT_VERSION,
621        };
622        spec.validate_contract()?;
623        Ok(spec)
624    }
625}
626
627pub struct OnExceptionBuilder {
628    parent: RouteBuilder,
629    policy: OnExceptionSpec,
630}
631
632impl OnExceptionBuilder {
633    pub fn retry(mut self, max_attempts: u32) -> Self {
634        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
635        self
636    }
637
638    pub fn with_backoff(
639        mut self,
640        initial: std::time::Duration,
641        multiplier: f64,
642        max: std::time::Duration,
643    ) -> Self {
644        if let Some(ref mut retry) = self.policy.retry {
645            retry.initial_delay = initial;
646            retry.multiplier = multiplier;
647            retry.max_delay = max;
648        }
649        self
650    }
651
652    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
653        if let Some(ref mut retry) = self.policy.retry {
654            retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
655        }
656        self
657    }
658
659    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
660        self.policy.handled_by = Some(uri.into());
661        self
662    }
663
664    pub fn end_on_exception(mut self) -> RouteBuilder {
665        if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
666            specs.push(self.policy);
667        }
668        self.parent
669    }
670}
671
672fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
673    let mut canonical = Vec::with_capacity(steps.len());
674    for step in steps {
675        canonical.push(canonicalize_step(step)?);
676    }
677    Ok(canonical)
678}
679
680fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
681    match step {
682        BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
683        BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
684        BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
685        BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
686        BuilderStep::DeclarativeScript { expression } => {
687            Ok(CanonicalStepSpec::Script { expression })
688        }
689        BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
690            predicate,
691            steps: canonicalize_steps(steps)?,
692        }),
693        BuilderStep::DeclarativeChoice { whens, otherwise } => {
694            let mut canonical_whens = Vec::with_capacity(whens.len());
695            for DeclarativeWhenStep { predicate, steps } in whens {
696                canonical_whens.push(CanonicalWhenSpec {
697                    predicate,
698                    steps: canonicalize_steps(steps)?,
699                });
700            }
701            let otherwise = match otherwise {
702                Some(steps) => Some(canonicalize_steps(steps)?),
703                None => None,
704            };
705            Ok(CanonicalStepSpec::Choice {
706                whens: canonical_whens,
707                otherwise,
708            })
709        }
710        BuilderStep::DeclarativeSplit {
711            expression,
712            aggregation,
713            parallel,
714            parallel_limit,
715            stop_on_exception,
716            steps,
717        } => Ok(CanonicalStepSpec::Split {
718            expression: CanonicalSplitExpressionSpec::Language(expression),
719            aggregation: canonicalize_split_aggregation(aggregation)?,
720            parallel,
721            parallel_limit,
722            stop_on_exception,
723            steps: canonicalize_steps(steps)?,
724        }),
725        BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate {
726            config: canonicalize_aggregate(config)?,
727        }),
728        other => {
729            let step_name = canonical_step_name(&other);
730            let detail = camel_api::canonical_contract_rejection_reason(step_name)
731                .unwrap_or("not included in canonical v1");
732            Err(CamelError::RouteError(format!(
733                "canonical v1 does not support step `{step_name}`: {detail}"
734            )))
735        }
736    }
737}
738
739fn canonicalize_split_aggregation(
740    strategy: camel_api::splitter::AggregationStrategy,
741) -> Result<CanonicalSplitAggregationSpec, CamelError> {
742    match strategy {
743        camel_api::splitter::AggregationStrategy::LastWins => {
744            Ok(CanonicalSplitAggregationSpec::LastWins)
745        }
746        camel_api::splitter::AggregationStrategy::CollectAll => {
747            Ok(CanonicalSplitAggregationSpec::CollectAll)
748        }
749        camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
750            "canonical v1 does not support custom split aggregation".to_string(),
751        )),
752        camel_api::splitter::AggregationStrategy::Original => {
753            Ok(CanonicalSplitAggregationSpec::Original)
754        }
755    }
756}
757
758fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
759    let completion_size = match config.completion {
760        CompletionCondition::Size(size) => Some(size),
761        CompletionCondition::Predicate(_) => {
762            return Err(CamelError::RouteError(
763                "canonical v1 does not support aggregate predicate completion".to_string(),
764            ));
765        }
766    };
767    let strategy = match config.strategy {
768        AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
769        AggregationStrategy::Custom(_) => {
770            return Err(CamelError::RouteError(
771                "canonical v1 does not support custom aggregate strategy".to_string(),
772            ));
773        }
774    };
775    let bucket_ttl_ms = config
776        .bucket_ttl
777        .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
778
779    Ok(CanonicalAggregateSpec {
780        header: config.header_name,
781        completion_size,
782        strategy,
783        max_buckets: config.max_buckets,
784        bucket_ttl_ms,
785    })
786}
787
788fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
789    CanonicalCircuitBreakerSpec {
790        failure_threshold: config.failure_threshold,
791        open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
792    }
793}
794
795fn canonical_step_name(step: &BuilderStep) -> &'static str {
796    match step {
797        BuilderStep::Processor(_) => "processor",
798        BuilderStep::To(_) => "to",
799        BuilderStep::Stop => "stop",
800        BuilderStep::Log { .. } => "log",
801        BuilderStep::DeclarativeSetHeader { .. } => "set_header",
802        BuilderStep::DeclarativeSetBody { .. } => "set_body",
803        BuilderStep::DeclarativeFilter { .. } => "filter",
804        BuilderStep::DeclarativeChoice { .. } => "choice",
805        BuilderStep::DeclarativeScript { .. } => "script",
806        BuilderStep::DeclarativeSplit { .. } => "split",
807        BuilderStep::Split { .. } => "split",
808        BuilderStep::Aggregate { .. } => "aggregate",
809        BuilderStep::Filter { .. } => "filter",
810        BuilderStep::Choice { .. } => "choice",
811        BuilderStep::WireTap { .. } => "wire_tap",
812        BuilderStep::Multicast { .. } => "multicast",
813        BuilderStep::DeclarativeLog { .. } => "log",
814        BuilderStep::Bean { .. } => "bean",
815        BuilderStep::Script { .. } => "script",
816        BuilderStep::Throttle { .. } => "throttle",
817        BuilderStep::LoadBalance { .. } => "load_balancer",
818        BuilderStep::DynamicRouter { .. } => "dynamic_router",
819        BuilderStep::RoutingSlip { .. } => "routing_slip",
820    }
821}
822
823impl StepAccumulator for RouteBuilder {
824    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
825        &mut self.steps
826    }
827}
828
829/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
830///
831/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
832/// but NOT `.build()` and NOT `.split()` (no nested splits).
833///
834/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
835/// and returns the parent `RouteBuilder`.
836pub struct SplitBuilder {
837    parent: RouteBuilder,
838    config: SplitterConfig,
839    steps: Vec<BuilderStep>,
840}
841
842impl SplitBuilder {
843    /// Open a filter scope within the split sub-pipeline.
844    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
845    where
846        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
847    {
848        FilterInSplitBuilder {
849            parent: self,
850            predicate: std::sync::Arc::new(predicate),
851            steps: vec![],
852        }
853    }
854
855    /// Close the split scope. Packages the accumulated sub-steps into a
856    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
857    pub fn end_split(mut self) -> RouteBuilder {
858        let split_step = BuilderStep::Split {
859            config: self.config,
860            steps: self.steps,
861        };
862        self.parent.steps.push(split_step);
863        self.parent
864    }
865}
866
867impl StepAccumulator for SplitBuilder {
868    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
869        &mut self.steps
870    }
871}
872
873/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
874pub struct FilterBuilder {
875    parent: RouteBuilder,
876    predicate: FilterPredicate,
877    steps: Vec<BuilderStep>,
878}
879
880impl FilterBuilder {
881    /// Close the filter scope. Packages the accumulated sub-steps into a
882    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
883    pub fn end_filter(mut self) -> RouteBuilder {
884        let step = BuilderStep::Filter {
885            predicate: self.predicate,
886            steps: self.steps,
887        };
888        self.parent.steps.push(step);
889        self.parent
890    }
891}
892
893impl StepAccumulator for FilterBuilder {
894    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
895        &mut self.steps
896    }
897}
898
899/// Builder for a filter scope nested inside a `.split()` block.
900pub struct FilterInSplitBuilder {
901    parent: SplitBuilder,
902    predicate: FilterPredicate,
903    steps: Vec<BuilderStep>,
904}
905
906impl FilterInSplitBuilder {
907    /// Close the filter scope and return the parent `SplitBuilder`.
908    pub fn end_filter(mut self) -> SplitBuilder {
909        let step = BuilderStep::Filter {
910            predicate: self.predicate,
911            steps: self.steps,
912        };
913        self.parent.steps.push(step);
914        self.parent
915    }
916}
917
918impl StepAccumulator for FilterInSplitBuilder {
919    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
920        &mut self.steps
921    }
922}
923
924// ── Choice/When/Otherwise builders ─────────────────────────────────────────
925
926/// Builder for a `.choice()` ... `.end_choice()` block.
927///
928/// Accumulates `when` clauses and an optional `otherwise` clause.
929/// Cannot call `.build()` until `.end_choice()` is called.
930pub struct ChoiceBuilder {
931    parent: RouteBuilder,
932    whens: Vec<WhenStep>,
933    _otherwise: Option<Vec<BuilderStep>>,
934}
935
936impl ChoiceBuilder {
937    /// Open a `when` clause. Only exchanges matching `predicate` will be
938    /// processed by the steps inside the `.when()` ... `.end_when()` scope.
939    pub fn when<F>(self, predicate: F) -> WhenBuilder
940    where
941        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
942    {
943        WhenBuilder {
944            parent: self,
945            predicate: std::sync::Arc::new(predicate),
946            steps: vec![],
947        }
948    }
949
950    /// Open an `otherwise` clause. Executed when no `when` predicate matched.
951    ///
952    /// Only one `otherwise` is allowed per `choice`. Call this after all `.when()` clauses.
953    pub fn otherwise(self) -> OtherwiseBuilder {
954        OtherwiseBuilder {
955            parent: self,
956            steps: vec![],
957        }
958    }
959
960    /// Close the choice scope. Packages all accumulated `when` clauses and
961    /// optional `otherwise` into a `BuilderStep::Choice` and returns the
962    /// parent `RouteBuilder`.
963    pub fn end_choice(mut self) -> RouteBuilder {
964        let step = BuilderStep::Choice {
965            whens: self.whens,
966            otherwise: self._otherwise,
967        };
968        self.parent.steps.push(step);
969        self.parent
970    }
971}
972
973/// Builder for the sub-pipeline within a `.when()` ... `.end_when()` block.
974pub struct WhenBuilder {
975    parent: ChoiceBuilder,
976    predicate: camel_api::FilterPredicate,
977    steps: Vec<BuilderStep>,
978}
979
980impl WhenBuilder {
981    /// Close the when scope. Packages the accumulated sub-steps into a
982    /// `WhenStep` and returns the parent `ChoiceBuilder`.
983    pub fn end_when(mut self) -> ChoiceBuilder {
984        self.parent.whens.push(WhenStep {
985            predicate: self.predicate,
986            steps: self.steps,
987        });
988        self.parent
989    }
990}
991
992impl StepAccumulator for WhenBuilder {
993    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
994        &mut self.steps
995    }
996}
997
998/// Builder for the sub-pipeline within an `.otherwise()` ... `.end_otherwise()` block.
999pub struct OtherwiseBuilder {
1000    parent: ChoiceBuilder,
1001    steps: Vec<BuilderStep>,
1002}
1003
1004impl OtherwiseBuilder {
1005    /// Close the otherwise scope and return the parent `ChoiceBuilder`.
1006    pub fn end_otherwise(self) -> ChoiceBuilder {
1007        let OtherwiseBuilder { mut parent, steps } = self;
1008        parent._otherwise = Some(steps);
1009        parent
1010    }
1011}
1012
1013impl StepAccumulator for OtherwiseBuilder {
1014    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1015        &mut self.steps
1016    }
1017}
1018
1019/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
1020///
1021/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1022/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
1023///
1024/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
1025/// and returns the parent `RouteBuilder`.
1026pub struct MulticastBuilder {
1027    parent: RouteBuilder,
1028    steps: Vec<BuilderStep>,
1029    config: MulticastConfig,
1030}
1031
1032impl MulticastBuilder {
1033    pub fn parallel(mut self, parallel: bool) -> Self {
1034        self.config = self.config.parallel(parallel);
1035        self
1036    }
1037
1038    pub fn parallel_limit(mut self, limit: usize) -> Self {
1039        self.config = self.config.parallel_limit(limit);
1040        self
1041    }
1042
1043    pub fn stop_on_exception(mut self, stop: bool) -> Self {
1044        self.config = self.config.stop_on_exception(stop);
1045        self
1046    }
1047
1048    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1049        self.config = self.config.timeout(duration);
1050        self
1051    }
1052
1053    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1054        self.config = self.config.aggregation(strategy);
1055        self
1056    }
1057
1058    pub fn end_multicast(mut self) -> RouteBuilder {
1059        let step = BuilderStep::Multicast {
1060            steps: self.steps,
1061            config: self.config,
1062        };
1063        self.parent.steps.push(step);
1064        self.parent
1065    }
1066}
1067
1068impl StepAccumulator for MulticastBuilder {
1069    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1070        &mut self.steps
1071    }
1072}
1073
1074/// Builder for the sub-pipeline within a `.throttle()` ... `.end_throttle()` block.
1075///
1076/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1077/// but NOT `.build()` and NOT `.throttle()` (no nested throttles).
1078///
1079/// Calling `.end_throttle()` packages the sub-steps into a `BuilderStep::Throttle`
1080/// and returns the parent `RouteBuilder`.
1081pub struct ThrottleBuilder {
1082    parent: RouteBuilder,
1083    config: ThrottlerConfig,
1084    steps: Vec<BuilderStep>,
1085}
1086
1087impl ThrottleBuilder {
1088    /// Set the throttle strategy. Default is `Delay`.
1089    ///
1090    /// - `Delay`: Queue messages until capacity available
1091    /// - `Reject`: Return error immediately when throttled
1092    /// - `Drop`: Silently discard excess messages
1093    pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1094        self.config = self.config.strategy(strategy);
1095        self
1096    }
1097
1098    /// Close the throttle scope. Packages the accumulated sub-steps into a
1099    /// `BuilderStep::Throttle` and returns the parent `RouteBuilder`.
1100    pub fn end_throttle(mut self) -> RouteBuilder {
1101        let step = BuilderStep::Throttle {
1102            config: self.config,
1103            steps: self.steps,
1104        };
1105        self.parent.steps.push(step);
1106        self.parent
1107    }
1108}
1109
1110impl StepAccumulator for ThrottleBuilder {
1111    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1112        &mut self.steps
1113    }
1114}
1115
1116/// Builder for the sub-pipeline within a `.load_balance()` ... `.end_load_balance()` block.
1117///
1118/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1119/// but NOT `.build()` and NOT `.load_balance()` (no nested load balancers).
1120///
1121/// Calling `.end_load_balance()` packages the sub-steps into a `BuilderStep::LoadBalance`
1122/// and returns the parent `RouteBuilder`.
1123pub struct LoadBalancerBuilder {
1124    parent: RouteBuilder,
1125    config: LoadBalancerConfig,
1126    steps: Vec<BuilderStep>,
1127}
1128
1129impl LoadBalancerBuilder {
1130    /// Set the load balance strategy to round-robin (default).
1131    pub fn round_robin(mut self) -> Self {
1132        self.config = LoadBalancerConfig::round_robin();
1133        self
1134    }
1135
1136    /// Set the load balance strategy to random selection.
1137    pub fn random(mut self) -> Self {
1138        self.config = LoadBalancerConfig::random();
1139        self
1140    }
1141
1142    /// Set the load balance strategy to weighted selection.
1143    ///
1144    /// Each endpoint is assigned a weight that determines its probability
1145    /// of being selected.
1146    pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1147        self.config = LoadBalancerConfig::weighted(weights);
1148        self
1149    }
1150
1151    /// Set the load balance strategy to failover.
1152    ///
1153    /// Exchanges are sent to the first endpoint; on failure, the next endpoint
1154    /// is tried.
1155    pub fn failover(mut self) -> Self {
1156        self.config = LoadBalancerConfig::failover();
1157        self
1158    }
1159
1160    /// Enable or disable parallel execution of endpoints.
1161    ///
1162    /// When enabled, all endpoints receive the exchange simultaneously.
1163    /// When disabled (default), only one endpoint is selected per exchange.
1164    pub fn parallel(mut self, parallel: bool) -> Self {
1165        self.config = self.config.parallel(parallel);
1166        self
1167    }
1168
1169    /// Close the load balance scope. Packages the accumulated sub-steps into a
1170    /// `BuilderStep::LoadBalance` and returns the parent `RouteBuilder`.
1171    pub fn end_load_balance(mut self) -> RouteBuilder {
1172        let step = BuilderStep::LoadBalance {
1173            config: self.config,
1174            steps: self.steps,
1175        };
1176        self.parent.steps.push(step);
1177        self.parent
1178    }
1179}
1180
1181impl StepAccumulator for LoadBalancerBuilder {
1182    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1183        &mut self.steps
1184    }
1185}
1186
1187// ---------------------------------------------------------------------------
1188// Tests
1189// ---------------------------------------------------------------------------
1190
1191#[cfg(test)]
1192mod tests {
1193    use super::*;
1194    use camel_api::error_handler::ErrorHandlerConfig;
1195    use camel_api::load_balancer::LoadBalanceStrategy;
1196    use camel_api::{Exchange, Message};
1197    use camel_core::route::BuilderStep;
1198    use std::sync::Arc;
1199    use std::time::Duration;
1200    use tower::{Service, ServiceExt};
1201
1202    #[test]
1203    fn test_builder_from_creates_definition() {
1204        let definition = RouteBuilder::from("timer:tick")
1205            .route_id("test-route")
1206            .build()
1207            .unwrap();
1208        assert_eq!(definition.from_uri(), "timer:tick");
1209    }
1210
1211    #[test]
1212    fn test_builder_empty_from_uri_errors() {
1213        let result = RouteBuilder::from("").route_id("test-route").build();
1214        assert!(result.is_err());
1215    }
1216
1217    #[test]
1218    fn test_builder_to_adds_step() {
1219        let definition = RouteBuilder::from("timer:tick")
1220            .route_id("test-route")
1221            .to("log:info")
1222            .build()
1223            .unwrap();
1224
1225        assert_eq!(definition.from_uri(), "timer:tick");
1226        // We can verify steps were added by checking the structure
1227        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1228    }
1229
1230    #[test]
1231    fn test_builder_filter_adds_filter_step() {
1232        let definition = RouteBuilder::from("timer:tick")
1233            .route_id("test-route")
1234            .filter(|_ex| true)
1235            .to("mock:result")
1236            .end_filter()
1237            .build()
1238            .unwrap();
1239
1240        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1241    }
1242
1243    #[test]
1244    fn test_builder_set_header_adds_processor_step() {
1245        let definition = RouteBuilder::from("timer:tick")
1246            .route_id("test-route")
1247            .set_header("key", Value::String("value".into()))
1248            .build()
1249            .unwrap();
1250
1251        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1252    }
1253
1254    #[test]
1255    fn test_builder_map_body_adds_processor_step() {
1256        let definition = RouteBuilder::from("timer:tick")
1257            .route_id("test-route")
1258            .map_body(|body| body)
1259            .build()
1260            .unwrap();
1261
1262        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1263    }
1264
1265    #[test]
1266    fn test_builder_process_adds_processor_step() {
1267        let definition = RouteBuilder::from("timer:tick")
1268            .route_id("test-route")
1269            .process(|ex| async move { Ok(ex) })
1270            .build()
1271            .unwrap();
1272
1273        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1274    }
1275
1276    #[test]
1277    fn test_builder_chain_multiple_steps() {
1278        let definition = RouteBuilder::from("timer:tick")
1279            .route_id("test-route")
1280            .set_header("source", Value::String("timer".into()))
1281            .filter(|ex| ex.input.header("source").is_some())
1282            .to("log:info")
1283            .end_filter()
1284            .to("mock:result")
1285            .build()
1286            .unwrap();
1287
1288        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
1289        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
1290        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
1291        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1292    }
1293
1294    // -----------------------------------------------------------------------
1295    // Processor behavior tests — exercise the real Tower services directly
1296    // -----------------------------------------------------------------------
1297
1298    #[tokio::test]
1299    async fn test_set_header_processor_works() {
1300        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1301        let exchange = Exchange::new(Message::new("test"));
1302        let result = svc.call(exchange).await.unwrap();
1303        assert_eq!(
1304            result.input.header("greeting"),
1305            Some(&Value::String("hello".into()))
1306        );
1307    }
1308
1309    #[tokio::test]
1310    async fn test_filter_processor_passes() {
1311        use camel_api::BoxProcessorExt;
1312        use camel_processor::FilterService;
1313
1314        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1315        let mut svc =
1316            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1317        let exchange = Exchange::new(Message::new("pass"));
1318        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1319        assert_eq!(result.input.body.as_text(), Some("pass"));
1320    }
1321
1322    #[tokio::test]
1323    async fn test_filter_processor_blocks() {
1324        use camel_api::BoxProcessorExt;
1325        use camel_processor::FilterService;
1326
1327        let sub = BoxProcessor::from_fn(|_ex| {
1328            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1329        });
1330        let mut svc =
1331            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1332        let exchange = Exchange::new(Message::new("reject"));
1333        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1334        assert_eq!(result.input.body.as_text(), Some("reject"));
1335    }
1336
1337    #[tokio::test]
1338    async fn test_map_body_processor_works() {
1339        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1340            if let Some(text) = body.as_text() {
1341                Body::Text(text.to_uppercase())
1342            } else {
1343                body
1344            }
1345        });
1346        let exchange = Exchange::new(Message::new("hello"));
1347        let result = mapper.oneshot(exchange).await.unwrap();
1348        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1349    }
1350
1351    #[tokio::test]
1352    async fn test_process_custom_processor_works() {
1353        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1354            ex.set_property("custom", Value::Bool(true));
1355            Ok(ex)
1356        });
1357        let exchange = Exchange::new(Message::default());
1358        let result = processor.oneshot(exchange).await.unwrap();
1359        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1360    }
1361
1362    // -----------------------------------------------------------------------
1363    // Sequential pipeline test
1364    // -----------------------------------------------------------------------
1365
1366    #[tokio::test]
1367    async fn test_compose_pipeline_runs_steps_in_order() {
1368        use camel_core::route::compose_pipeline;
1369
1370        let processors = vec![
1371            BoxProcessor::new(SetHeader::new(
1372                IdentityProcessor,
1373                "step",
1374                Value::String("one".into()),
1375            )),
1376            BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1377                if let Some(text) = body.as_text() {
1378                    Body::Text(format!("{}-processed", text))
1379                } else {
1380                    body
1381                }
1382            })),
1383        ];
1384
1385        let pipeline = compose_pipeline(processors);
1386        let exchange = Exchange::new(Message::new("hello"));
1387        let result = pipeline.oneshot(exchange).await.unwrap();
1388
1389        assert_eq!(
1390            result.input.header("step"),
1391            Some(&Value::String("one".into()))
1392        );
1393        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1394    }
1395
1396    #[tokio::test]
1397    async fn test_compose_pipeline_empty_is_identity() {
1398        use camel_core::route::compose_pipeline;
1399
1400        let pipeline = compose_pipeline(vec![]);
1401        let exchange = Exchange::new(Message::new("unchanged"));
1402        let result = pipeline.oneshot(exchange).await.unwrap();
1403        assert_eq!(result.input.body.as_text(), Some("unchanged"));
1404    }
1405
1406    // -----------------------------------------------------------------------
1407    // Circuit breaker builder tests
1408    // -----------------------------------------------------------------------
1409
1410    #[test]
1411    fn test_builder_circuit_breaker_sets_config() {
1412        use camel_api::circuit_breaker::CircuitBreakerConfig;
1413
1414        let config = CircuitBreakerConfig::new().failure_threshold(5);
1415        let definition = RouteBuilder::from("timer:tick")
1416            .route_id("test-route")
1417            .circuit_breaker(config)
1418            .build()
1419            .unwrap();
1420
1421        let cb = definition
1422            .circuit_breaker_config()
1423            .expect("circuit breaker should be set");
1424        assert_eq!(cb.failure_threshold, 5);
1425    }
1426
1427    #[test]
1428    fn test_builder_circuit_breaker_with_error_handler() {
1429        use camel_api::circuit_breaker::CircuitBreakerConfig;
1430        use camel_api::error_handler::ErrorHandlerConfig;
1431
1432        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1433        let eh_config = ErrorHandlerConfig::log_only();
1434
1435        let definition = RouteBuilder::from("timer:tick")
1436            .route_id("test-route")
1437            .to("log:info")
1438            .circuit_breaker(cb_config)
1439            .error_handler(eh_config)
1440            .build()
1441            .unwrap();
1442
1443        assert!(
1444            definition.circuit_breaker_config().is_some(),
1445            "circuit breaker config should be set"
1446        );
1447        // Route definition was built successfully with both configs.
1448    }
1449
1450    #[test]
1451    fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1452        let definition = RouteBuilder::from("direct:start")
1453            .route_id("test-route")
1454            .dead_letter_channel("log:dlc")
1455            .on_exception(|e| matches!(e, CamelError::Io(_)))
1456            .retry(3)
1457            .handled_by("log:io")
1458            .end_on_exception()
1459            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1460            .retry(1)
1461            .end_on_exception()
1462            .to("mock:out")
1463            .build()
1464            .expect("route should build");
1465
1466        let cfg = definition
1467            .error_handler_config()
1468            .expect("error handler should be set");
1469        assert_eq!(cfg.policies.len(), 2);
1470        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1471        assert_eq!(
1472            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1473            Some(3)
1474        );
1475        assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1476        assert_eq!(
1477            cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1478            Some(1)
1479        );
1480    }
1481
1482    #[test]
1483    fn test_builder_on_exception_mixed_mode_rejected() {
1484        let result = RouteBuilder::from("direct:start")
1485            .route_id("test-route")
1486            .error_handler(ErrorHandlerConfig::log_only())
1487            .on_exception(|_e| true)
1488            .end_on_exception()
1489            .to("mock:out")
1490            .build();
1491
1492        let err = result.err().expect("mixed mode should fail with an error");
1493
1494        assert!(
1495            format!("{err}").contains("mixed error handler modes"),
1496            "unexpected error: {err}"
1497        );
1498    }
1499
1500    #[test]
1501    fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1502        let definition = RouteBuilder::from("direct:start")
1503            .route_id("test-route")
1504            .on_exception(|_e| true)
1505            .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1506            .with_jitter(0.5)
1507            .end_on_exception()
1508            .to("mock:out")
1509            .build()
1510            .expect("route should build");
1511
1512        let cfg = definition
1513            .error_handler_config()
1514            .expect("error handler should be set");
1515        assert_eq!(cfg.policies.len(), 1);
1516        assert!(cfg.policies[0].retry.is_none());
1517    }
1518
1519    #[test]
1520    fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1521        let definition = RouteBuilder::from("direct:start")
1522            .route_id("test-route")
1523            .dead_letter_channel("log:dlc")
1524            .to("mock:out")
1525            .build()
1526            .expect("route should build");
1527
1528        let cfg = definition
1529            .error_handler_config()
1530            .expect("error handler should be set");
1531        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1532        assert!(cfg.policies.is_empty());
1533    }
1534
1535    #[test]
1536    fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1537        let definition = RouteBuilder::from("direct:start")
1538            .route_id("test-route")
1539            .dead_letter_channel("log:first")
1540            .on_exception(|e| matches!(e, CamelError::Io(_)))
1541            .retry(2)
1542            .end_on_exception()
1543            .dead_letter_channel("log:second")
1544            .to("mock:out")
1545            .build()
1546            .expect("route should build");
1547
1548        let cfg = definition
1549            .error_handler_config()
1550            .expect("error handler should be set");
1551        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1552        assert_eq!(cfg.policies.len(), 1);
1553        assert_eq!(
1554            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1555            Some(2)
1556        );
1557    }
1558
1559    #[test]
1560    fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
1561        let definition = RouteBuilder::from("direct:start")
1562            .route_id("test-route")
1563            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1564            .retry(1)
1565            .end_on_exception()
1566            .to("mock:out")
1567            .build()
1568            .expect("route should build");
1569
1570        let cfg = definition
1571            .error_handler_config()
1572            .expect("error handler should be set");
1573        assert!(cfg.dlc_uri.is_none());
1574        assert_eq!(cfg.policies.len(), 1);
1575    }
1576
1577    #[test]
1578    fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
1579        let first = ErrorHandlerConfig::dead_letter_channel("log:first");
1580        let second = ErrorHandlerConfig::dead_letter_channel("log:second");
1581
1582        let definition = RouteBuilder::from("direct:start")
1583            .route_id("test-route")
1584            .error_handler(first)
1585            .error_handler(second)
1586            .to("mock:out")
1587            .build()
1588            .expect("route should build");
1589
1590        let cfg = definition
1591            .error_handler_config()
1592            .expect("error handler should be set");
1593        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1594    }
1595
1596    // --- Splitter builder tests ---
1597
1598    #[test]
1599    fn test_split_builder_typestate() {
1600        use camel_api::splitter::{SplitterConfig, split_body_lines};
1601
1602        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
1603        let definition = RouteBuilder::from("timer:test?period=1000")
1604            .route_id("test-route")
1605            .split(SplitterConfig::new(split_body_lines()))
1606            .to("mock:per-fragment")
1607            .end_split()
1608            .to("mock:final")
1609            .build()
1610            .unwrap();
1611
1612        // Should have 2 top-level steps: Split + To("mock:final")
1613        assert_eq!(definition.steps().len(), 2);
1614    }
1615
1616    #[test]
1617    fn test_split_builder_steps_collected() {
1618        use camel_api::splitter::{SplitterConfig, split_body_lines};
1619
1620        let definition = RouteBuilder::from("timer:test?period=1000")
1621            .route_id("test-route")
1622            .split(SplitterConfig::new(split_body_lines()))
1623            .set_header("fragment", Value::String("yes".into()))
1624            .to("mock:per-fragment")
1625            .end_split()
1626            .build()
1627            .unwrap();
1628
1629        // Should have 1 top-level step: Split (containing 2 sub-steps)
1630        assert_eq!(definition.steps().len(), 1);
1631        match &definition.steps()[0] {
1632            BuilderStep::Split { steps, .. } => {
1633                assert_eq!(steps.len(), 2); // SetHeader + To
1634            }
1635            other => panic!("Expected Split, got {:?}", other),
1636        }
1637    }
1638
1639    #[test]
1640    fn test_split_builder_config_propagated() {
1641        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1642
1643        let definition = RouteBuilder::from("timer:test?period=1000")
1644            .route_id("test-route")
1645            .split(
1646                SplitterConfig::new(split_body_lines())
1647                    .parallel(true)
1648                    .parallel_limit(4)
1649                    .aggregation(AggregationStrategy::CollectAll),
1650            )
1651            .to("mock:per-fragment")
1652            .end_split()
1653            .build()
1654            .unwrap();
1655
1656        match &definition.steps()[0] {
1657            BuilderStep::Split { config, .. } => {
1658                assert!(config.parallel);
1659                assert_eq!(config.parallel_limit, Some(4));
1660                assert!(matches!(
1661                    config.aggregation,
1662                    AggregationStrategy::CollectAll
1663                ));
1664            }
1665            other => panic!("Expected Split, got {:?}", other),
1666        }
1667    }
1668
1669    #[test]
1670    fn test_aggregate_builder_adds_step() {
1671        use camel_api::aggregator::AggregatorConfig;
1672        use camel_core::route::BuilderStep;
1673
1674        let definition = RouteBuilder::from("timer:tick")
1675            .route_id("test-route")
1676            .aggregate(
1677                AggregatorConfig::correlate_by("key")
1678                    .complete_when_size(2)
1679                    .build(),
1680            )
1681            .build()
1682            .unwrap();
1683
1684        assert_eq!(definition.steps().len(), 1);
1685        assert!(matches!(
1686            definition.steps()[0],
1687            BuilderStep::Aggregate { .. }
1688        ));
1689    }
1690
1691    #[test]
1692    fn test_aggregate_in_split_builder() {
1693        use camel_api::aggregator::AggregatorConfig;
1694        use camel_api::splitter::{SplitterConfig, split_body_lines};
1695        use camel_core::route::BuilderStep;
1696
1697        let definition = RouteBuilder::from("timer:tick")
1698            .route_id("test-route")
1699            .split(SplitterConfig::new(split_body_lines()))
1700            .aggregate(
1701                AggregatorConfig::correlate_by("key")
1702                    .complete_when_size(1)
1703                    .build(),
1704            )
1705            .end_split()
1706            .build()
1707            .unwrap();
1708
1709        assert_eq!(definition.steps().len(), 1);
1710        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
1711            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
1712        } else {
1713            panic!("expected Split step");
1714        }
1715    }
1716
1717    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
1718
1719    #[test]
1720    fn test_builder_set_body_static_adds_processor() {
1721        let definition = RouteBuilder::from("timer:tick")
1722            .route_id("test-route")
1723            .set_body("fixed")
1724            .build()
1725            .unwrap();
1726        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1727    }
1728
1729    #[test]
1730    fn test_builder_set_body_fn_adds_processor() {
1731        let definition = RouteBuilder::from("timer:tick")
1732            .route_id("test-route")
1733            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
1734            .build()
1735            .unwrap();
1736        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1737    }
1738
1739    #[test]
1740    fn transform_alias_produces_same_as_set_body() {
1741        let route_transform = RouteBuilder::from("timer:tick")
1742            .route_id("test-route")
1743            .transform("hello")
1744            .build()
1745            .unwrap();
1746
1747        let route_set_body = RouteBuilder::from("timer:tick")
1748            .route_id("test-route")
1749            .set_body("hello")
1750            .build()
1751            .unwrap();
1752
1753        assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
1754    }
1755
1756    #[test]
1757    fn test_builder_set_header_fn_adds_processor() {
1758        let definition = RouteBuilder::from("timer:tick")
1759            .route_id("test-route")
1760            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
1761            .build()
1762            .unwrap();
1763        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1764    }
1765
1766    #[tokio::test]
1767    async fn test_set_body_static_processor_works() {
1768        use camel_core::route::compose_pipeline;
1769        let def = RouteBuilder::from("t:t")
1770            .route_id("test-route")
1771            .set_body("replaced")
1772            .build()
1773            .unwrap();
1774        let pipeline = compose_pipeline(
1775            def.steps()
1776                .iter()
1777                .filter_map(|s| {
1778                    if let BuilderStep::Processor(p) = s {
1779                        Some(p.clone())
1780                    } else {
1781                        None
1782                    }
1783                })
1784                .collect(),
1785        );
1786        let exchange = Exchange::new(Message::new("original"));
1787        let result = pipeline.oneshot(exchange).await.unwrap();
1788        assert_eq!(result.input.body.as_text(), Some("replaced"));
1789    }
1790
1791    #[tokio::test]
1792    async fn test_set_body_fn_processor_works() {
1793        use camel_core::route::compose_pipeline;
1794        let def = RouteBuilder::from("t:t")
1795            .route_id("test-route")
1796            .set_body_fn(|ex: &Exchange| {
1797                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
1798            })
1799            .build()
1800            .unwrap();
1801        let pipeline = compose_pipeline(
1802            def.steps()
1803                .iter()
1804                .filter_map(|s| {
1805                    if let BuilderStep::Processor(p) = s {
1806                        Some(p.clone())
1807                    } else {
1808                        None
1809                    }
1810                })
1811                .collect(),
1812        );
1813        let exchange = Exchange::new(Message::new("hello"));
1814        let result = pipeline.oneshot(exchange).await.unwrap();
1815        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1816    }
1817
1818    #[tokio::test]
1819    async fn test_set_header_fn_processor_works() {
1820        use camel_core::route::compose_pipeline;
1821        let def = RouteBuilder::from("t:t")
1822            .route_id("test-route")
1823            .set_header_fn("echo", |ex: &Exchange| {
1824                ex.input
1825                    .body
1826                    .as_text()
1827                    .map(|t| Value::String(t.into()))
1828                    .unwrap_or(Value::Null)
1829            })
1830            .build()
1831            .unwrap();
1832        let pipeline = compose_pipeline(
1833            def.steps()
1834                .iter()
1835                .filter_map(|s| {
1836                    if let BuilderStep::Processor(p) = s {
1837                        Some(p.clone())
1838                    } else {
1839                        None
1840                    }
1841                })
1842                .collect(),
1843        );
1844        let exchange = Exchange::new(Message::new("ping"));
1845        let result = pipeline.oneshot(exchange).await.unwrap();
1846        assert_eq!(
1847            result.input.header("echo"),
1848            Some(&Value::String("ping".into()))
1849        );
1850    }
1851
1852    // ── FilterBuilder typestate tests ─────────────────────────────────────
1853
1854    #[test]
1855    fn test_filter_builder_typestate() {
1856        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1857            .route_id("test-route")
1858            .filter(|_ex| true)
1859            .to("mock:inner")
1860            .end_filter()
1861            .to("mock:outer")
1862            .build();
1863        assert!(result.is_ok());
1864    }
1865
1866    #[test]
1867    fn test_filter_builder_steps_collected() {
1868        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1869            .route_id("test-route")
1870            .filter(|_ex| true)
1871            .to("mock:inner")
1872            .end_filter()
1873            .build()
1874            .unwrap();
1875
1876        assert_eq!(definition.steps().len(), 1);
1877        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1878    }
1879
1880    #[test]
1881    fn test_wire_tap_builder_adds_step() {
1882        let definition = RouteBuilder::from("timer:tick")
1883            .route_id("test-route")
1884            .wire_tap("mock:tap")
1885            .to("mock:result")
1886            .build()
1887            .unwrap();
1888
1889        assert_eq!(definition.steps().len(), 2);
1890        assert!(
1891            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
1892        );
1893        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
1894    }
1895
1896    // ── MulticastBuilder typestate tests ─────────────────────────────────────
1897
1898    #[test]
1899    fn test_multicast_builder_typestate() {
1900        let definition = RouteBuilder::from("timer:tick")
1901            .route_id("test-route")
1902            .multicast()
1903            .to("direct:a")
1904            .to("direct:b")
1905            .end_multicast()
1906            .to("mock:result")
1907            .build()
1908            .unwrap();
1909
1910        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
1911    }
1912
1913    #[test]
1914    fn test_multicast_builder_steps_collected() {
1915        let definition = RouteBuilder::from("timer:tick")
1916            .route_id("test-route")
1917            .multicast()
1918            .to("direct:a")
1919            .to("direct:b")
1920            .end_multicast()
1921            .build()
1922            .unwrap();
1923
1924        match &definition.steps()[0] {
1925            BuilderStep::Multicast { steps, .. } => {
1926                assert_eq!(steps.len(), 2);
1927            }
1928            other => panic!("Expected Multicast, got {:?}", other),
1929        }
1930    }
1931
1932    // ── Concurrency builder tests ─────────────────────────────────────
1933
1934    #[test]
1935    fn test_builder_concurrent_sets_concurrency() {
1936        use camel_component::ConcurrencyModel;
1937
1938        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1939            .route_id("test-route")
1940            .concurrent(16)
1941            .to("log:info")
1942            .build()
1943            .unwrap();
1944
1945        assert_eq!(
1946            definition.concurrency_override(),
1947            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
1948        );
1949    }
1950
1951    #[test]
1952    fn test_builder_concurrent_zero_means_unbounded() {
1953        use camel_component::ConcurrencyModel;
1954
1955        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1956            .route_id("test-route")
1957            .concurrent(0)
1958            .to("log:info")
1959            .build()
1960            .unwrap();
1961
1962        assert_eq!(
1963            definition.concurrency_override(),
1964            Some(&ConcurrencyModel::Concurrent { max: None })
1965        );
1966    }
1967
1968    #[test]
1969    fn test_builder_sequential_sets_concurrency() {
1970        use camel_component::ConcurrencyModel;
1971
1972        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1973            .route_id("test-route")
1974            .sequential()
1975            .to("log:info")
1976            .build()
1977            .unwrap();
1978
1979        assert_eq!(
1980            definition.concurrency_override(),
1981            Some(&ConcurrencyModel::Sequential)
1982        );
1983    }
1984
1985    #[test]
1986    fn test_builder_default_concurrency_is_none() {
1987        let definition = RouteBuilder::from("timer:tick")
1988            .route_id("test-route")
1989            .to("log:info")
1990            .build()
1991            .unwrap();
1992
1993        assert_eq!(definition.concurrency_override(), None);
1994    }
1995
1996    // ── Route lifecycle builder tests ─────────────────────────────────────
1997
1998    #[test]
1999    fn test_builder_route_id_sets_id() {
2000        let definition = RouteBuilder::from("timer:tick")
2001            .route_id("my-route")
2002            .build()
2003            .unwrap();
2004
2005        assert_eq!(definition.route_id(), "my-route");
2006    }
2007
2008    #[test]
2009    fn test_build_without_route_id_fails() {
2010        let result = RouteBuilder::from("timer:tick?period=1000")
2011            .to("log:info")
2012            .build();
2013        let err = match result {
2014            Err(e) => e.to_string(),
2015            Ok(_) => panic!("build() should fail without route_id"),
2016        };
2017        assert!(
2018            err.contains("route_id"),
2019            "error should mention route_id, got: {}",
2020            err
2021        );
2022    }
2023
2024    #[test]
2025    fn test_builder_auto_startup_false() {
2026        let definition = RouteBuilder::from("timer:tick")
2027            .route_id("test-route")
2028            .auto_startup(false)
2029            .build()
2030            .unwrap();
2031
2032        assert!(!definition.auto_startup());
2033    }
2034
2035    #[test]
2036    fn test_builder_startup_order_custom() {
2037        let definition = RouteBuilder::from("timer:tick")
2038            .route_id("test-route")
2039            .startup_order(50)
2040            .build()
2041            .unwrap();
2042
2043        assert_eq!(definition.startup_order(), 50);
2044    }
2045
2046    #[test]
2047    fn test_builder_defaults() {
2048        let definition = RouteBuilder::from("timer:tick")
2049            .route_id("test-route")
2050            .build()
2051            .unwrap();
2052
2053        assert_eq!(definition.route_id(), "test-route");
2054        assert!(definition.auto_startup());
2055        assert_eq!(definition.startup_order(), 1000);
2056    }
2057
2058    // ── Choice typestate tests ──────────────────────────────────────────────────
2059
2060    #[test]
2061    fn test_choice_builder_single_when() {
2062        let definition = RouteBuilder::from("timer:tick")
2063            .route_id("test-route")
2064            .choice()
2065            .when(|ex: &Exchange| ex.input.header("type").is_some())
2066            .to("mock:typed")
2067            .end_when()
2068            .end_choice()
2069            .build()
2070            .unwrap();
2071        assert_eq!(definition.steps().len(), 1);
2072        assert!(
2073            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2074            if whens.len() == 1 && otherwise.is_none())
2075        );
2076    }
2077
2078    #[test]
2079    fn test_choice_builder_when_otherwise() {
2080        let definition = RouteBuilder::from("timer:tick")
2081            .route_id("test-route")
2082            .choice()
2083            .when(|ex: &Exchange| ex.input.header("a").is_some())
2084            .to("mock:a")
2085            .end_when()
2086            .otherwise()
2087            .to("mock:fallback")
2088            .end_otherwise()
2089            .end_choice()
2090            .build()
2091            .unwrap();
2092        assert!(
2093            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2094            if whens.len() == 1 && otherwise.is_some())
2095        );
2096    }
2097
2098    #[test]
2099    fn test_choice_builder_multiple_whens() {
2100        let definition = RouteBuilder::from("timer:tick")
2101            .route_id("test-route")
2102            .choice()
2103            .when(|ex: &Exchange| ex.input.header("a").is_some())
2104            .to("mock:a")
2105            .end_when()
2106            .when(|ex: &Exchange| ex.input.header("b").is_some())
2107            .to("mock:b")
2108            .end_when()
2109            .end_choice()
2110            .build()
2111            .unwrap();
2112        assert!(
2113            matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2114            if whens.len() == 2)
2115        );
2116    }
2117
2118    #[test]
2119    fn test_choice_step_after_choice() {
2120        // Steps after end_choice() are added to the outer pipeline, not inside choice.
2121        let definition = RouteBuilder::from("timer:tick")
2122            .route_id("test-route")
2123            .choice()
2124            .when(|_ex: &Exchange| true)
2125            .to("mock:inner")
2126            .end_when()
2127            .end_choice()
2128            .to("mock:outer") // must be step[1], not inside choice
2129            .build()
2130            .unwrap();
2131        assert_eq!(definition.steps().len(), 2);
2132        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2133    }
2134
2135    // ── Throttle typestate tests ──────────────────────────────────────────────────
2136
2137    #[test]
2138    fn test_throttle_builder_typestate() {
2139        let definition = RouteBuilder::from("timer:tick")
2140            .route_id("test-route")
2141            .throttle(10, std::time::Duration::from_secs(1))
2142            .to("mock:result")
2143            .end_throttle()
2144            .build()
2145            .unwrap();
2146
2147        assert_eq!(definition.steps().len(), 1);
2148        assert!(matches!(
2149            &definition.steps()[0],
2150            BuilderStep::Throttle { .. }
2151        ));
2152    }
2153
2154    #[test]
2155    fn test_throttle_builder_with_strategy() {
2156        let definition = RouteBuilder::from("timer:tick")
2157            .route_id("test-route")
2158            .throttle(10, std::time::Duration::from_secs(1))
2159            .strategy(ThrottleStrategy::Reject)
2160            .to("mock:result")
2161            .end_throttle()
2162            .build()
2163            .unwrap();
2164
2165        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2166            assert_eq!(config.strategy, ThrottleStrategy::Reject);
2167        } else {
2168            panic!("Expected Throttle step");
2169        }
2170    }
2171
2172    #[test]
2173    fn test_throttle_builder_steps_collected() {
2174        let definition = RouteBuilder::from("timer:tick")
2175            .route_id("test-route")
2176            .throttle(5, std::time::Duration::from_secs(1))
2177            .set_header("throttled", Value::Bool(true))
2178            .to("mock:throttled")
2179            .end_throttle()
2180            .build()
2181            .unwrap();
2182
2183        match &definition.steps()[0] {
2184            BuilderStep::Throttle { steps, .. } => {
2185                assert_eq!(steps.len(), 2); // SetHeader + To
2186            }
2187            other => panic!("Expected Throttle, got {:?}", other),
2188        }
2189    }
2190
2191    #[test]
2192    fn test_throttle_step_after_throttle() {
2193        // Steps after end_throttle() are added to the outer pipeline, not inside throttle.
2194        let definition = RouteBuilder::from("timer:tick")
2195            .route_id("test-route")
2196            .throttle(10, std::time::Duration::from_secs(1))
2197            .to("mock:inner")
2198            .end_throttle()
2199            .to("mock:outer")
2200            .build()
2201            .unwrap();
2202
2203        assert_eq!(definition.steps().len(), 2);
2204        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2205    }
2206
2207    // ── LoadBalance typestate tests ──────────────────────────────────────────────────
2208
2209    #[test]
2210    fn test_load_balance_builder_typestate() {
2211        let definition = RouteBuilder::from("timer:tick")
2212            .route_id("test-route")
2213            .load_balance()
2214            .round_robin()
2215            .to("mock:a")
2216            .to("mock:b")
2217            .end_load_balance()
2218            .build()
2219            .unwrap();
2220
2221        assert_eq!(definition.steps().len(), 1);
2222        assert!(matches!(
2223            &definition.steps()[0],
2224            BuilderStep::LoadBalance { .. }
2225        ));
2226    }
2227
2228    #[test]
2229    fn test_load_balance_builder_with_strategy() {
2230        let definition = RouteBuilder::from("timer:tick")
2231            .route_id("test-route")
2232            .load_balance()
2233            .random()
2234            .to("mock:result")
2235            .end_load_balance()
2236            .build()
2237            .unwrap();
2238
2239        if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2240            assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2241        } else {
2242            panic!("Expected LoadBalance step");
2243        }
2244    }
2245
2246    #[test]
2247    fn test_load_balance_builder_steps_collected() {
2248        let definition = RouteBuilder::from("timer:tick")
2249            .route_id("test-route")
2250            .load_balance()
2251            .set_header("lb", Value::Bool(true))
2252            .to("mock:a")
2253            .end_load_balance()
2254            .build()
2255            .unwrap();
2256
2257        match &definition.steps()[0] {
2258            BuilderStep::LoadBalance { steps, .. } => {
2259                assert_eq!(steps.len(), 2); // SetHeader + To
2260            }
2261            other => panic!("Expected LoadBalance, got {:?}", other),
2262        }
2263    }
2264
2265    #[test]
2266    fn test_load_balance_step_after_load_balance() {
2267        // Steps after end_load_balance() are added to the outer pipeline, not inside load_balance.
2268        let definition = RouteBuilder::from("timer:tick")
2269            .route_id("test-route")
2270            .load_balance()
2271            .to("mock:inner")
2272            .end_load_balance()
2273            .to("mock:outer")
2274            .build()
2275            .unwrap();
2276
2277        assert_eq!(definition.steps().len(), 2);
2278        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2279    }
2280
2281    // ── DynamicRouter typestate tests ──────────────────────────────────────────────────
2282
2283    #[test]
2284    fn test_dynamic_router_builder() {
2285        let definition = RouteBuilder::from("timer:tick")
2286            .route_id("test-route")
2287            .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2288            .build()
2289            .unwrap();
2290
2291        assert_eq!(definition.steps().len(), 1);
2292        assert!(matches!(
2293            &definition.steps()[0],
2294            BuilderStep::DynamicRouter { .. }
2295        ));
2296    }
2297
2298    #[test]
2299    fn test_dynamic_router_builder_with_config() {
2300        let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2301            .max_iterations(100)
2302            .cache_size(500);
2303
2304        let definition = RouteBuilder::from("timer:tick")
2305            .route_id("test-route")
2306            .dynamic_router_with_config(config)
2307            .build()
2308            .unwrap();
2309
2310        assert_eq!(definition.steps().len(), 1);
2311        if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2312            assert_eq!(config.max_iterations, 100);
2313            assert_eq!(config.cache_size, 500);
2314        } else {
2315            panic!("Expected DynamicRouter step");
2316        }
2317    }
2318
2319    #[test]
2320    fn test_dynamic_router_step_after_router() {
2321        // Steps after dynamic_router() are added to the outer pipeline.
2322        let definition = RouteBuilder::from("timer:tick")
2323            .route_id("test-route")
2324            .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2325            .to("mock:outer")
2326            .build()
2327            .unwrap();
2328
2329        assert_eq!(definition.steps().len(), 2);
2330        assert!(matches!(
2331            &definition.steps()[0],
2332            BuilderStep::DynamicRouter { .. }
2333        ));
2334        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2335    }
2336
2337    #[test]
2338    fn routing_slip_builder_creates_step() {
2339        use camel_api::RoutingSlipExpression;
2340
2341        let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2342
2343        let route = RouteBuilder::from("direct:start")
2344            .route_id("routing-slip-test")
2345            .routing_slip(expression)
2346            .build()
2347            .unwrap();
2348
2349        assert!(
2350            matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2351            "Expected RoutingSlip step"
2352        );
2353    }
2354
2355    #[test]
2356    fn routing_slip_with_config_builder_creates_step() {
2357        use camel_api::RoutingSlipConfig;
2358
2359        let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2360            .uri_delimiter("|")
2361            .cache_size(50)
2362            .ignore_invalid_endpoints(true);
2363
2364        let route = RouteBuilder::from("direct:start")
2365            .route_id("routing-slip-config-test")
2366            .routing_slip_with_config(config)
2367            .build()
2368            .unwrap();
2369
2370        if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2371            assert_eq!(config.uri_delimiter, "|");
2372            assert_eq!(config.cache_size, 50);
2373            assert!(config.ignore_invalid_endpoints);
2374        } else {
2375            panic!("Expected RoutingSlip step");
2376        }
2377    }
2378
2379    #[test]
2380    fn test_builder_marshal_adds_processor_step() {
2381        let definition = RouteBuilder::from("timer:tick")
2382            .route_id("test-route")
2383            .marshal("json")
2384            .build()
2385            .unwrap();
2386        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2387    }
2388
2389    #[test]
2390    fn test_builder_unmarshal_adds_processor_step() {
2391        let definition = RouteBuilder::from("timer:tick")
2392            .route_id("test-route")
2393            .unmarshal("json")
2394            .build()
2395            .unwrap();
2396        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2397    }
2398
2399    #[test]
2400    #[should_panic(expected = "unknown data format: 'csv'")]
2401    fn test_builder_marshal_panics_on_unknown_format() {
2402        let _ = RouteBuilder::from("timer:tick")
2403            .route_id("test-route")
2404            .marshal("csv")
2405            .build();
2406    }
2407
2408    #[test]
2409    #[should_panic(expected = "unknown data format: 'csv'")]
2410    fn test_builder_unmarshal_panics_on_unknown_format() {
2411        let _ = RouteBuilder::from("timer:tick")
2412            .route_id("test-route")
2413            .unmarshal("csv")
2414            .build();
2415    }
2416}