Skip to main content

camel_builder/
lib.rs

1use camel_api::DelayConfig;
2use camel_api::aggregator::{
3    AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
4};
5use camel_api::body::Body;
6use camel_api::body_converter::BodyType;
7use camel_api::circuit_breaker::CircuitBreakerConfig;
8use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
9use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
10use camel_api::load_balancer::LoadBalancerConfig;
11use camel_api::multicast::{MulticastConfig, MulticastStrategy};
12use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
13use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
14use camel_api::splitter::SplitterConfig;
15use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
16use camel_api::{
17    BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
18    ProcessorFn, Value,
19    runtime::{
20        CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
21        CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
22        CanonicalWhenSpec,
23    },
24};
25use camel_component_api::ConcurrencyModel;
26use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
27use camel_processor::{
28    ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
29    UnmarshalService, builtin_data_format,
30};
31
32/// Shared step-accumulation methods for all builder types.
33///
34/// Implementors provide `steps_mut()` and get step-adding methods for free.
35/// `filter()` and other branching methods are NOT included — they return
36/// different types per builder and stay as per-builder methods.
37pub trait StepAccumulator: Sized {
38    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
39
40    fn to(mut self, endpoint: impl Into<String>) -> Self {
41        self.steps_mut().push(BuilderStep::To(endpoint.into()));
42        self
43    }
44
45    fn process<F, Fut>(mut self, f: F) -> Self
46    where
47        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
48        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
49    {
50        let svc = ProcessorFn::new(f);
51        self.steps_mut()
52            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
53        self
54    }
55
56    fn process_fn(mut self, processor: BoxProcessor) -> Self {
57        self.steps_mut().push(BuilderStep::Processor(processor));
58        self
59    }
60
61    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
62        let svc = SetHeader::new(IdentityProcessor, key, value);
63        self.steps_mut()
64            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
65        self
66    }
67
68    fn map_body<F>(mut self, mapper: F) -> Self
69    where
70        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
71    {
72        let svc = MapBody::new(IdentityProcessor, mapper);
73        self.steps_mut()
74            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
75        self
76    }
77
78    fn set_body<B>(mut self, body: B) -> Self
79    where
80        B: Into<Body> + Clone + Send + Sync + 'static,
81    {
82        let body: Body = body.into();
83        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
84        self.steps_mut()
85            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
86        self
87    }
88
89    /// Apache Camel-compatible alias for [`set_body`](Self::set_body).
90    ///
91    /// Transforms the message body using the given value. Semantically identical
92    /// to `set_body` — provided for familiarity with Apache Camel route DSLs.
93    fn transform<B>(self, body: B) -> Self
94    where
95        B: Into<Body> + Clone + Send + Sync + 'static,
96    {
97        self.set_body(body)
98    }
99
100    fn set_body_fn<F>(mut self, expr: F) -> Self
101    where
102        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
103    {
104        let svc = SetBody::new(IdentityProcessor, expr);
105        self.steps_mut()
106            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
107        self
108    }
109
110    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
111    where
112        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
113    {
114        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
115        self.steps_mut()
116            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
117        self
118    }
119
120    fn aggregate(mut self, config: AggregatorConfig) -> Self {
121        self.steps_mut().push(BuilderStep::Aggregate { config });
122        self
123    }
124
125    /// Stop processing this exchange immediately. No further steps in the
126    /// current pipeline will run.
127    ///
128    /// Can be used at any point in the route: directly on RouteBuilder,
129    /// inside `.filter()`, inside `.split()`, etc.
130    fn stop(mut self) -> Self {
131        self.steps_mut().push(BuilderStep::Stop);
132        self
133    }
134
135    fn delay(mut self, duration: std::time::Duration) -> Self {
136        self.steps_mut().push(BuilderStep::Delay {
137            config: DelayConfig::from_duration(duration),
138        });
139        self
140    }
141
142    fn delay_with_header(
143        mut self,
144        duration: std::time::Duration,
145        header: impl Into<String>,
146    ) -> Self {
147        self.steps_mut().push(BuilderStep::Delay {
148            config: DelayConfig::from_duration_with_header(duration, header),
149        });
150        self
151    }
152
153    /// Log a message at the specified level.
154    ///
155    /// The message will be logged when an exchange passes through this step.
156    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
157        self.steps_mut().push(BuilderStep::Log {
158            level,
159            message: message.into(),
160        });
161        self
162    }
163
164    /// Convert the message body to the target type.
165    ///
166    /// Supported: Text ↔ Json ↔ Bytes. `Body::Stream` always fails.
167    /// Returns `TypeConversionFailed` if conversion is not possible.
168    ///
169    /// # Example
170    /// ```ignore
171    /// route.set_body(Value::String(r#"{"x":1}"#.into()))
172    ///      .convert_body_to(BodyType::Json)
173    ///      .to("direct:next")
174    /// ```
175    fn convert_body_to(mut self, target: BodyType) -> Self {
176        let svc = ConvertBodyTo::new(IdentityProcessor, target);
177        self.steps_mut()
178            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
179        self
180    }
181
182    /// Marshal the message body using the specified data format.
183    ///
184    /// Supported formats: `"json"`, `"xml"`. Panics if the format name is unknown.
185    /// Converts a structured body (e.g., `Body::Json`) to a wire-format body (e.g., `Body::Text`).
186    ///
187    /// # Example
188    /// ```ignore
189    /// route.marshal("json").to("direct:next")
190    /// ```
191    fn marshal(mut self, format: impl Into<String>) -> Self {
192        let name = format.into();
193        let df =
194            builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
195        let svc = MarshalService::new(IdentityProcessor, df);
196        self.steps_mut()
197            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
198        self
199    }
200
201    /// Unmarshal the message body using the specified data format.
202    ///
203    /// Supported formats: `"json"`, `"xml"`. Panics if the format name is unknown.
204    /// Converts a wire-format body (e.g., `Body::Text`) to a structured body (e.g., `Body::Json`).
205    ///
206    /// # Example
207    /// ```ignore
208    /// route.unmarshal("json").to("direct:next")
209    /// ```
210    fn unmarshal(mut self, format: impl Into<String>) -> Self {
211        let name = format.into();
212        let df =
213            builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
214        let svc = UnmarshalService::new(IdentityProcessor, df);
215        self.steps_mut()
216            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
217        self
218    }
219
220    /// Execute a script that can modify the exchange (headers, properties, body).
221    ///
222    /// The script has access to `headers`, `properties`, and `body` variables
223    /// and can modify them with assignment syntax: `headers["k"] = v`.
224    ///
225    /// # Example
226    /// ```ignore
227    /// // ignore: requires full CamelContext setup with registered language
228    /// route.script("rhai", r#"headers["tenant"] = "acme"; body = body + "_processed""#)
229    /// ```
230    fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
231        self.steps_mut().push(BuilderStep::Script {
232            language: language.into(),
233            script: script.into(),
234        });
235        self
236    }
237}
238
239/// A fluent builder for constructing routes.
240///
241/// # Example
242///
243/// ```ignore
244/// let definition = RouteBuilder::from("timer:tick?period=1000")
245///     .set_header("source", Value::String("timer".into()))
246///     .filter(|ex| ex.input.body.as_text().is_some())
247///     .to("log:info?showHeaders=true")
248///     .build()?;
249/// ```
250pub struct RouteBuilder {
251    from_uri: String,
252    steps: Vec<BuilderStep>,
253    error_handler: Option<ErrorHandlerConfig>,
254    error_handler_mode: ErrorHandlerMode,
255    circuit_breaker_config: Option<CircuitBreakerConfig>,
256    concurrency: Option<ConcurrencyModel>,
257    route_id: Option<String>,
258    auto_startup: Option<bool>,
259    startup_order: Option<i32>,
260}
261
262#[derive(Default)]
263enum ErrorHandlerMode {
264    #[default]
265    None,
266    ExplicitConfig,
267    Shorthand {
268        dlc_uri: Option<String>,
269        specs: Vec<OnExceptionSpec>,
270    },
271    Mixed,
272}
273
274#[derive(Clone)]
275struct OnExceptionSpec {
276    matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
277    retry: Option<RedeliveryPolicy>,
278    handled_by: Option<String>,
279}
280
281impl RouteBuilder {
282    /// Start building a route from the given source endpoint URI.
283    pub fn from(endpoint: &str) -> Self {
284        Self {
285            from_uri: endpoint.to_string(),
286            steps: Vec::new(),
287            error_handler: None,
288            error_handler_mode: ErrorHandlerMode::None,
289            circuit_breaker_config: None,
290            concurrency: None,
291            route_id: None,
292            auto_startup: None,
293            startup_order: None,
294        }
295    }
296
297    /// Open a filter scope. Only exchanges matching `predicate` will be processed
298    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
299    /// and continue to steps after `.end_filter()`.
300    pub fn filter<F>(self, predicate: F) -> FilterBuilder
301    where
302        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
303    {
304        FilterBuilder {
305            parent: self,
306            predicate: std::sync::Arc::new(predicate),
307            steps: vec![],
308        }
309    }
310
311    /// Open a choice scope for content-based routing.
312    ///
313    /// Within the choice, you can define multiple `.when()` clauses and an
314    /// optional `.otherwise()` clause. The first matching `when` predicate
315    /// determines which sub-pipeline executes.
316    pub fn choice(self) -> ChoiceBuilder {
317        ChoiceBuilder {
318            parent: self,
319            whens: vec![],
320            _otherwise: None,
321        }
322    }
323
324    /// Add a WireTap step that sends a clone of the exchange to the given
325    /// endpoint URI (fire-and-forget). The original exchange continues
326    /// downstream unchanged.
327    pub fn wire_tap(mut self, endpoint: &str) -> Self {
328        self.steps.push(BuilderStep::WireTap {
329            uri: endpoint.to_string(),
330        });
331        self
332    }
333
334    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
335    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
336        self.error_handler_mode = match self.error_handler_mode {
337            ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
338                ErrorHandlerMode::ExplicitConfig
339            }
340            ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
341        };
342        self.error_handler = Some(config);
343        self
344    }
345
346    /// Set a dead letter channel URI for shorthand error handler mode.
347    pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
348        let uri = uri.into();
349        self.error_handler_mode = match self.error_handler_mode {
350            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
351                dlc_uri: Some(uri),
352                specs: Vec::new(),
353            },
354            ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
355                dlc_uri: Some(uri),
356                specs,
357            },
358            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
359        };
360        self
361    }
362
363    /// Add a shorthand exception policy scope. Call `.end_on_exception()` to return to route builder.
364    pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
365    where
366        F: Fn(&CamelError) -> bool + Send + Sync + 'static,
367    {
368        self.error_handler_mode = match self.error_handler_mode {
369            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
370                dlc_uri: None,
371                specs: Vec::new(),
372            },
373            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
374            shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
375        };
376
377        OnExceptionBuilder {
378            parent: self,
379            policy: OnExceptionSpec {
380                matches: std::sync::Arc::new(matches),
381                retry: None,
382                handled_by: None,
383            },
384        }
385    }
386
387    /// Set a circuit breaker for this route.
388    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
389        self.circuit_breaker_config = Some(config);
390        self
391    }
392
393    /// Override the consumer's default concurrency model.
394    ///
395    /// When set, the pipeline spawns a task per exchange, processing them
396    /// concurrently. `max` limits the number of simultaneously active
397    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
398    ///
399    /// # Example
400    /// ```ignore
401    /// RouteBuilder::from("http://0.0.0.0:8080/api")
402    ///     .concurrent(16)  // max 16 in-flight pipeline executions
403    ///     .process(handle_request)
404    ///     .build()
405    /// ```
406    pub fn concurrent(mut self, max: usize) -> Self {
407        let max = if max == 0 { None } else { Some(max) };
408        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
409        self
410    }
411
412    /// Force sequential processing, overriding a concurrent-capable consumer.
413    ///
414    /// Useful for HTTP routes that mutate shared state and need ordering
415    /// guarantees.
416    pub fn sequential(mut self) -> Self {
417        self.concurrency = Some(ConcurrencyModel::Sequential);
418        self
419    }
420
421    /// Set the route ID for this route.
422    ///
423    /// If not set, the route will be assigned an auto-generated ID.
424    pub fn route_id(mut self, id: impl Into<String>) -> Self {
425        self.route_id = Some(id.into());
426        self
427    }
428
429    /// Set whether this route should automatically start when the context starts.
430    ///
431    /// Default is `true`.
432    pub fn auto_startup(mut self, auto: bool) -> Self {
433        self.auto_startup = Some(auto);
434        self
435    }
436
437    /// Set the startup order for this route.
438    ///
439    /// Routes with lower values start first. Default is 1000.
440    pub fn startup_order(mut self, order: i32) -> Self {
441        self.startup_order = Some(order);
442        self
443    }
444
445    /// Begin a Splitter sub-pipeline. Steps added after this call (until
446    /// `.end_split()`) will be executed per-fragment.
447    ///
448    /// Returns a `SplitBuilder` — you cannot call `.build()` until
449    /// `.end_split()` closes the split scope (enforced by the type system).
450    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
451        SplitBuilder {
452            parent: self,
453            config,
454            steps: Vec::new(),
455        }
456    }
457
458    /// Begin a Multicast sub-pipeline. Steps added after this call (until
459    /// `.end_multicast()`) will each receive a copy of the exchange.
460    ///
461    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
462    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
463    pub fn multicast(self) -> MulticastBuilder {
464        MulticastBuilder {
465            parent: self,
466            steps: Vec::new(),
467            config: MulticastConfig::new(),
468        }
469    }
470
471    /// Begin a Throttle sub-pipeline. Rate limits message processing to at most
472    /// `max_requests` per `period`. Steps inside the throttle scope are only
473    /// executed when the rate limit allows.
474    ///
475    /// Returns a `ThrottleBuilder` — you cannot call `.build()` until
476    /// `.end_throttle()` closes the throttle scope (enforced by the type system).
477    pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
478        ThrottleBuilder {
479            parent: self,
480            config: ThrottlerConfig::new(max_requests, period),
481            steps: Vec::new(),
482        }
483    }
484
485    /// Begin a LoadBalance sub-pipeline. Distributes exchanges across multiple
486    /// endpoints using a configurable strategy (round-robin, random, weighted, failover).
487    ///
488    /// Returns a `LoadBalancerBuilder` — you cannot call `.build()` until
489    /// `.end_load_balance()` closes the load balance scope (enforced by the type system).
490    pub fn load_balance(self) -> LoadBalancerBuilder {
491        LoadBalancerBuilder {
492            parent: self,
493            config: LoadBalancerConfig::round_robin(),
494            steps: Vec::new(),
495        }
496    }
497
498    /// Add a dynamic router step that routes exchanges dynamically based on
499    /// expression evaluation at runtime.
500    ///
501    /// The expression receives the exchange and returns `Some(uri)` to route to
502    /// the next endpoint, or `None` to stop routing.
503    ///
504    /// # Example
505    /// ```ignore
506    /// RouteBuilder::from("timer:tick")
507    ///     .route_id("test-route")
508    ///     .dynamic_router(|ex| {
509    ///         ex.input.header("dest").and_then(|v| v.as_str().map(|s| s.to_string()))
510    ///     })
511    ///     .build()
512    /// ```
513    pub fn dynamic_router(self, expression: RouterExpression) -> Self {
514        self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
515    }
516
517    /// Add a dynamic router step with full configuration.
518    ///
519    /// Allows customization of URI delimiter, cache size, timeout, and other options.
520    pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
521        self.steps.push(BuilderStep::DynamicRouter { config });
522        self
523    }
524
525    pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
526        self.routing_slip_with_config(RoutingSlipConfig::new(expression))
527    }
528
529    pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
530        self.steps.push(BuilderStep::RoutingSlip { config });
531        self
532    }
533
534    pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
535        self.recipient_list_with_config(RecipientListConfig::new(expression))
536    }
537
538    pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
539        self.steps.push(BuilderStep::RecipientList { config });
540        self
541    }
542
543    /// Consume the builder and produce a [`RouteDefinition`].
544    pub fn build(self) -> Result<RouteDefinition, CamelError> {
545        if self.from_uri.is_empty() {
546            return Err(CamelError::RouteError(
547                "route must have a 'from' URI".to_string(),
548            ));
549        }
550        let route_id = self.route_id.ok_or_else(|| {
551            CamelError::RouteError(
552                "route must have a 'route_id' — call .route_id(\"name\") on the builder"
553                    .to_string(),
554            )
555        })?;
556        let resolved_error_handler = match self.error_handler_mode {
557            ErrorHandlerMode::None => self.error_handler,
558            ErrorHandlerMode::ExplicitConfig => self.error_handler,
559            ErrorHandlerMode::Mixed => {
560                return Err(CamelError::RouteError(
561                    "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
562                ));
563            }
564            ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
565                let mut config = if let Some(uri) = dlc_uri {
566                    ErrorHandlerConfig::dead_letter_channel(uri)
567                } else {
568                    ErrorHandlerConfig::log_only()
569                };
570
571                for spec in specs {
572                    let matcher = spec.matches.clone();
573                    let mut builder = config.on_exception(move |e| matcher(e));
574
575                    if let Some(retry) = spec.retry {
576                        builder = builder.retry(retry.max_attempts).with_backoff(
577                            retry.initial_delay,
578                            retry.multiplier,
579                            retry.max_delay,
580                        );
581                        if retry.jitter_factor > 0.0 {
582                            builder = builder.with_jitter(retry.jitter_factor);
583                        }
584                    }
585
586                    if let Some(uri) = spec.handled_by {
587                        builder = builder.handled_by(uri);
588                    }
589
590                    config = builder.build();
591                }
592
593                Some(config)
594            }
595        };
596
597        let definition = RouteDefinition::new(self.from_uri, self.steps);
598        let definition = if let Some(eh) = resolved_error_handler {
599            definition.with_error_handler(eh)
600        } else {
601            definition
602        };
603        let definition = if let Some(cb) = self.circuit_breaker_config {
604            definition.with_circuit_breaker(cb)
605        } else {
606            definition
607        };
608        let definition = if let Some(concurrency) = self.concurrency {
609            definition.with_concurrency(concurrency)
610        } else {
611            definition
612        };
613        let definition = definition.with_route_id(route_id);
614        let definition = if let Some(auto) = self.auto_startup {
615            definition.with_auto_startup(auto)
616        } else {
617            definition
618        };
619        let definition = if let Some(order) = self.startup_order {
620            definition.with_startup_order(order)
621        } else {
622            definition
623        };
624        Ok(definition)
625    }
626
627    /// Compile this builder route into canonical v1 spec.
628    pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
629        if self.from_uri.is_empty() {
630            return Err(CamelError::RouteError(
631                "route must have a 'from' URI".to_string(),
632            ));
633        }
634        let route_id = self.route_id.ok_or_else(|| {
635            CamelError::RouteError(
636                "route must have a 'route_id' — call .route_id(\"name\") on the builder"
637                    .to_string(),
638            )
639        })?;
640
641        let steps = canonicalize_steps(self.steps)?;
642        let circuit_breaker = self
643            .circuit_breaker_config
644            .map(canonicalize_circuit_breaker);
645
646        let spec = CanonicalRouteSpec {
647            route_id,
648            from: self.from_uri,
649            steps,
650            circuit_breaker,
651            version: camel_api::CANONICAL_CONTRACT_VERSION,
652        };
653        spec.validate_contract()?;
654        Ok(spec)
655    }
656}
657
658pub struct OnExceptionBuilder {
659    parent: RouteBuilder,
660    policy: OnExceptionSpec,
661}
662
663impl OnExceptionBuilder {
664    pub fn retry(mut self, max_attempts: u32) -> Self {
665        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
666        self
667    }
668
669    pub fn with_backoff(
670        mut self,
671        initial: std::time::Duration,
672        multiplier: f64,
673        max: std::time::Duration,
674    ) -> Self {
675        if let Some(ref mut retry) = self.policy.retry {
676            retry.initial_delay = initial;
677            retry.multiplier = multiplier;
678            retry.max_delay = max;
679        }
680        self
681    }
682
683    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
684        if let Some(ref mut retry) = self.policy.retry {
685            retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
686        }
687        self
688    }
689
690    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
691        self.policy.handled_by = Some(uri.into());
692        self
693    }
694
695    pub fn end_on_exception(mut self) -> RouteBuilder {
696        if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
697            specs.push(self.policy);
698        }
699        self.parent
700    }
701}
702
703fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
704    let mut canonical = Vec::with_capacity(steps.len());
705    for step in steps {
706        canonical.push(canonicalize_step(step)?);
707    }
708    Ok(canonical)
709}
710
711fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
712    match step {
713        BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
714        BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
715        BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
716        BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
717        BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
718            delay_ms: config.delay_ms,
719            dynamic_header: config.dynamic_header,
720        }),
721        BuilderStep::DeclarativeScript { expression } => {
722            Ok(CanonicalStepSpec::Script { expression })
723        }
724        BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
725            predicate,
726            steps: canonicalize_steps(steps)?,
727        }),
728        BuilderStep::DeclarativeChoice { whens, otherwise } => {
729            let mut canonical_whens = Vec::with_capacity(whens.len());
730            for DeclarativeWhenStep { predicate, steps } in whens {
731                canonical_whens.push(CanonicalWhenSpec {
732                    predicate,
733                    steps: canonicalize_steps(steps)?,
734                });
735            }
736            let otherwise = match otherwise {
737                Some(steps) => Some(canonicalize_steps(steps)?),
738                None => None,
739            };
740            Ok(CanonicalStepSpec::Choice {
741                whens: canonical_whens,
742                otherwise,
743            })
744        }
745        BuilderStep::DeclarativeSplit {
746            expression,
747            aggregation,
748            parallel,
749            parallel_limit,
750            stop_on_exception,
751            steps,
752        } => Ok(CanonicalStepSpec::Split {
753            expression: CanonicalSplitExpressionSpec::Language(expression),
754            aggregation: canonicalize_split_aggregation(aggregation)?,
755            parallel,
756            parallel_limit,
757            stop_on_exception,
758            steps: canonicalize_steps(steps)?,
759        }),
760        BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate {
761            config: canonicalize_aggregate(config)?,
762        }),
763        other => {
764            let step_name = canonical_step_name(&other);
765            let detail = camel_api::canonical_contract_rejection_reason(step_name)
766                .unwrap_or("not included in canonical v1");
767            Err(CamelError::RouteError(format!(
768                "canonical v1 does not support step `{step_name}`: {detail}"
769            )))
770        }
771    }
772}
773
774fn canonicalize_split_aggregation(
775    strategy: camel_api::splitter::AggregationStrategy,
776) -> Result<CanonicalSplitAggregationSpec, CamelError> {
777    match strategy {
778        camel_api::splitter::AggregationStrategy::LastWins => {
779            Ok(CanonicalSplitAggregationSpec::LastWins)
780        }
781        camel_api::splitter::AggregationStrategy::CollectAll => {
782            Ok(CanonicalSplitAggregationSpec::CollectAll)
783        }
784        camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
785            "canonical v1 does not support custom split aggregation".to_string(),
786        )),
787        camel_api::splitter::AggregationStrategy::Original => {
788            Ok(CanonicalSplitAggregationSpec::Original)
789        }
790    }
791}
792
793fn extract_completion_fields(
794    mode: &CompletionMode,
795) -> Result<(Option<usize>, Option<u64>), CamelError> {
796    match mode {
797        CompletionMode::Single(cond) => match cond {
798            CompletionCondition::Size(n) => Ok((Some(*n), None)),
799            CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
800            CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
801                "canonical v1 does not support aggregate predicate completion".to_string(),
802            )),
803        },
804        CompletionMode::Any(conds) => {
805            let mut size = None;
806            let mut timeout_ms = None;
807            for cond in conds {
808                match cond {
809                    CompletionCondition::Size(n) => size = Some(*n),
810                    CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
811                    CompletionCondition::Predicate(_) => {
812                        return Err(CamelError::RouteError(
813                            "canonical v1 does not support aggregate predicate completion"
814                                .to_string(),
815                        ));
816                    }
817                }
818            }
819            Ok((size, timeout_ms))
820        }
821    }
822}
823
824fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
825    let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
826
827    let header = match &config.correlation {
828        CorrelationStrategy::HeaderName(h) => h.clone(),
829        CorrelationStrategy::Expression { expr, .. } => expr.clone(),
830        CorrelationStrategy::Fn(_) => {
831            return Err(CamelError::RouteError(
832                "canonical v1 does not support Fn correlation strategy".to_string(),
833            ));
834        }
835    };
836
837    let correlation_key = match &config.correlation {
838        CorrelationStrategy::HeaderName(_) => None,
839        CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
840        CorrelationStrategy::Fn(_) => unreachable!(),
841    };
842
843    let strategy = match config.strategy {
844        AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
845        AggregationStrategy::Custom(_) => {
846            return Err(CamelError::RouteError(
847                "canonical v1 does not support custom aggregate strategy".to_string(),
848            ));
849        }
850    };
851    let bucket_ttl_ms = config
852        .bucket_ttl
853        .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
854
855    Ok(CanonicalAggregateSpec {
856        header,
857        completion_size,
858        completion_timeout_ms,
859        correlation_key,
860        force_completion_on_stop: if config.force_completion_on_stop {
861            Some(true)
862        } else {
863            None
864        },
865        discard_on_timeout: if config.discard_on_timeout {
866            Some(true)
867        } else {
868            None
869        },
870        strategy,
871        max_buckets: config.max_buckets,
872        bucket_ttl_ms,
873    })
874}
875
876fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
877    CanonicalCircuitBreakerSpec {
878        failure_threshold: config.failure_threshold,
879        open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
880    }
881}
882
883fn canonical_step_name(step: &BuilderStep) -> &'static str {
884    match step {
885        BuilderStep::Processor(_) => "processor",
886        BuilderStep::To(_) => "to",
887        BuilderStep::Stop => "stop",
888        BuilderStep::Log { .. } => "log",
889        BuilderStep::DeclarativeSetHeader { .. } => "set_header",
890        BuilderStep::DeclarativeSetBody { .. } => "set_body",
891        BuilderStep::DeclarativeFilter { .. } => "filter",
892        BuilderStep::DeclarativeChoice { .. } => "choice",
893        BuilderStep::DeclarativeScript { .. } => "script",
894        BuilderStep::DeclarativeSplit { .. } => "split",
895        BuilderStep::Split { .. } => "split",
896        BuilderStep::Aggregate { .. } => "aggregate",
897        BuilderStep::Filter { .. } => "filter",
898        BuilderStep::Choice { .. } => "choice",
899        BuilderStep::WireTap { .. } => "wire_tap",
900        BuilderStep::Delay { .. } => "delay",
901        BuilderStep::Multicast { .. } => "multicast",
902        BuilderStep::DeclarativeLog { .. } => "log",
903        BuilderStep::Bean { .. } => "bean",
904        BuilderStep::Script { .. } => "script",
905        BuilderStep::Throttle { .. } => "throttle",
906        BuilderStep::LoadBalance { .. } => "load_balancer",
907        BuilderStep::DynamicRouter { .. } => "dynamic_router",
908        BuilderStep::RoutingSlip { .. } => "routing_slip",
909        BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
910        BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
911        BuilderStep::RecipientList { .. } => "recipient_list",
912        BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
913    }
914}
915
916impl StepAccumulator for RouteBuilder {
917    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
918        &mut self.steps
919    }
920}
921
922/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
923///
924/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
925/// but NOT `.build()` and NOT `.split()` (no nested splits).
926///
927/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
928/// and returns the parent `RouteBuilder`.
929pub struct SplitBuilder {
930    parent: RouteBuilder,
931    config: SplitterConfig,
932    steps: Vec<BuilderStep>,
933}
934
935impl SplitBuilder {
936    /// Open a filter scope within the split sub-pipeline.
937    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
938    where
939        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
940    {
941        FilterInSplitBuilder {
942            parent: self,
943            predicate: std::sync::Arc::new(predicate),
944            steps: vec![],
945        }
946    }
947
948    /// Close the split scope. Packages the accumulated sub-steps into a
949    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
950    pub fn end_split(mut self) -> RouteBuilder {
951        let split_step = BuilderStep::Split {
952            config: self.config,
953            steps: self.steps,
954        };
955        self.parent.steps.push(split_step);
956        self.parent
957    }
958}
959
960impl StepAccumulator for SplitBuilder {
961    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
962        &mut self.steps
963    }
964}
965
966/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
967pub struct FilterBuilder {
968    parent: RouteBuilder,
969    predicate: FilterPredicate,
970    steps: Vec<BuilderStep>,
971}
972
973impl FilterBuilder {
974    /// Close the filter scope. Packages the accumulated sub-steps into a
975    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
976    pub fn end_filter(mut self) -> RouteBuilder {
977        let step = BuilderStep::Filter {
978            predicate: self.predicate,
979            steps: self.steps,
980        };
981        self.parent.steps.push(step);
982        self.parent
983    }
984}
985
986impl StepAccumulator for FilterBuilder {
987    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
988        &mut self.steps
989    }
990}
991
992/// Builder for a filter scope nested inside a `.split()` block.
993pub struct FilterInSplitBuilder {
994    parent: SplitBuilder,
995    predicate: FilterPredicate,
996    steps: Vec<BuilderStep>,
997}
998
999impl FilterInSplitBuilder {
1000    /// Close the filter scope and return the parent `SplitBuilder`.
1001    pub fn end_filter(mut self) -> SplitBuilder {
1002        let step = BuilderStep::Filter {
1003            predicate: self.predicate,
1004            steps: self.steps,
1005        };
1006        self.parent.steps.push(step);
1007        self.parent
1008    }
1009}
1010
1011impl StepAccumulator for FilterInSplitBuilder {
1012    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1013        &mut self.steps
1014    }
1015}
1016
1017// ── Choice/When/Otherwise builders ─────────────────────────────────────────
1018
1019/// Builder for a `.choice()` ... `.end_choice()` block.
1020///
1021/// Accumulates `when` clauses and an optional `otherwise` clause.
1022/// Cannot call `.build()` until `.end_choice()` is called.
1023pub struct ChoiceBuilder {
1024    parent: RouteBuilder,
1025    whens: Vec<WhenStep>,
1026    _otherwise: Option<Vec<BuilderStep>>,
1027}
1028
1029impl ChoiceBuilder {
1030    /// Open a `when` clause. Only exchanges matching `predicate` will be
1031    /// processed by the steps inside the `.when()` ... `.end_when()` scope.
1032    pub fn when<F>(self, predicate: F) -> WhenBuilder
1033    where
1034        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1035    {
1036        WhenBuilder {
1037            parent: self,
1038            predicate: std::sync::Arc::new(predicate),
1039            steps: vec![],
1040        }
1041    }
1042
1043    /// Open an `otherwise` clause. Executed when no `when` predicate matched.
1044    ///
1045    /// Only one `otherwise` is allowed per `choice`. Call this after all `.when()` clauses.
1046    pub fn otherwise(self) -> OtherwiseBuilder {
1047        OtherwiseBuilder {
1048            parent: self,
1049            steps: vec![],
1050        }
1051    }
1052
1053    /// Close the choice scope. Packages all accumulated `when` clauses and
1054    /// optional `otherwise` into a `BuilderStep::Choice` and returns the
1055    /// parent `RouteBuilder`.
1056    pub fn end_choice(mut self) -> RouteBuilder {
1057        let step = BuilderStep::Choice {
1058            whens: self.whens,
1059            otherwise: self._otherwise,
1060        };
1061        self.parent.steps.push(step);
1062        self.parent
1063    }
1064}
1065
1066/// Builder for the sub-pipeline within a `.when()` ... `.end_when()` block.
1067pub struct WhenBuilder {
1068    parent: ChoiceBuilder,
1069    predicate: camel_api::FilterPredicate,
1070    steps: Vec<BuilderStep>,
1071}
1072
1073impl WhenBuilder {
1074    /// Close the when scope. Packages the accumulated sub-steps into a
1075    /// `WhenStep` and returns the parent `ChoiceBuilder`.
1076    pub fn end_when(mut self) -> ChoiceBuilder {
1077        self.parent.whens.push(WhenStep {
1078            predicate: self.predicate,
1079            steps: self.steps,
1080        });
1081        self.parent
1082    }
1083}
1084
1085impl StepAccumulator for WhenBuilder {
1086    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1087        &mut self.steps
1088    }
1089}
1090
1091/// Builder for the sub-pipeline within an `.otherwise()` ... `.end_otherwise()` block.
1092pub struct OtherwiseBuilder {
1093    parent: ChoiceBuilder,
1094    steps: Vec<BuilderStep>,
1095}
1096
1097impl OtherwiseBuilder {
1098    /// Close the otherwise scope and return the parent `ChoiceBuilder`.
1099    pub fn end_otherwise(self) -> ChoiceBuilder {
1100        let OtherwiseBuilder { mut parent, steps } = self;
1101        parent._otherwise = Some(steps);
1102        parent
1103    }
1104}
1105
1106impl StepAccumulator for OtherwiseBuilder {
1107    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1108        &mut self.steps
1109    }
1110}
1111
1112/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
1113///
1114/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1115/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
1116///
1117/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
1118/// and returns the parent `RouteBuilder`.
1119pub struct MulticastBuilder {
1120    parent: RouteBuilder,
1121    steps: Vec<BuilderStep>,
1122    config: MulticastConfig,
1123}
1124
1125impl MulticastBuilder {
1126    pub fn parallel(mut self, parallel: bool) -> Self {
1127        self.config = self.config.parallel(parallel);
1128        self
1129    }
1130
1131    pub fn parallel_limit(mut self, limit: usize) -> Self {
1132        self.config = self.config.parallel_limit(limit);
1133        self
1134    }
1135
1136    pub fn stop_on_exception(mut self, stop: bool) -> Self {
1137        self.config = self.config.stop_on_exception(stop);
1138        self
1139    }
1140
1141    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1142        self.config = self.config.timeout(duration);
1143        self
1144    }
1145
1146    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1147        self.config = self.config.aggregation(strategy);
1148        self
1149    }
1150
1151    pub fn end_multicast(mut self) -> RouteBuilder {
1152        let step = BuilderStep::Multicast {
1153            steps: self.steps,
1154            config: self.config,
1155        };
1156        self.parent.steps.push(step);
1157        self.parent
1158    }
1159}
1160
1161impl StepAccumulator for MulticastBuilder {
1162    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1163        &mut self.steps
1164    }
1165}
1166
1167/// Builder for the sub-pipeline within a `.throttle()` ... `.end_throttle()` block.
1168///
1169/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1170/// but NOT `.build()` and NOT `.throttle()` (no nested throttles).
1171///
1172/// Calling `.end_throttle()` packages the sub-steps into a `BuilderStep::Throttle`
1173/// and returns the parent `RouteBuilder`.
1174pub struct ThrottleBuilder {
1175    parent: RouteBuilder,
1176    config: ThrottlerConfig,
1177    steps: Vec<BuilderStep>,
1178}
1179
1180impl ThrottleBuilder {
1181    /// Set the throttle strategy. Default is `Delay`.
1182    ///
1183    /// - `Delay`: Queue messages until capacity available
1184    /// - `Reject`: Return error immediately when throttled
1185    /// - `Drop`: Silently discard excess messages
1186    pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1187        self.config = self.config.strategy(strategy);
1188        self
1189    }
1190
1191    /// Close the throttle scope. Packages the accumulated sub-steps into a
1192    /// `BuilderStep::Throttle` and returns the parent `RouteBuilder`.
1193    pub fn end_throttle(mut self) -> RouteBuilder {
1194        let step = BuilderStep::Throttle {
1195            config: self.config,
1196            steps: self.steps,
1197        };
1198        self.parent.steps.push(step);
1199        self.parent
1200    }
1201}
1202
1203impl StepAccumulator for ThrottleBuilder {
1204    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1205        &mut self.steps
1206    }
1207}
1208
1209/// Builder for the sub-pipeline within a `.load_balance()` ... `.end_load_balance()` block.
1210///
1211/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1212/// but NOT `.build()` and NOT `.load_balance()` (no nested load balancers).
1213///
1214/// Calling `.end_load_balance()` packages the sub-steps into a `BuilderStep::LoadBalance`
1215/// and returns the parent `RouteBuilder`.
1216pub struct LoadBalancerBuilder {
1217    parent: RouteBuilder,
1218    config: LoadBalancerConfig,
1219    steps: Vec<BuilderStep>,
1220}
1221
1222impl LoadBalancerBuilder {
1223    /// Set the load balance strategy to round-robin (default).
1224    pub fn round_robin(mut self) -> Self {
1225        self.config = LoadBalancerConfig::round_robin();
1226        self
1227    }
1228
1229    /// Set the load balance strategy to random selection.
1230    pub fn random(mut self) -> Self {
1231        self.config = LoadBalancerConfig::random();
1232        self
1233    }
1234
1235    /// Set the load balance strategy to weighted selection.
1236    ///
1237    /// Each endpoint is assigned a weight that determines its probability
1238    /// of being selected.
1239    pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1240        self.config = LoadBalancerConfig::weighted(weights);
1241        self
1242    }
1243
1244    /// Set the load balance strategy to failover.
1245    ///
1246    /// Exchanges are sent to the first endpoint; on failure, the next endpoint
1247    /// is tried.
1248    pub fn failover(mut self) -> Self {
1249        self.config = LoadBalancerConfig::failover();
1250        self
1251    }
1252
1253    /// Enable or disable parallel execution of endpoints.
1254    ///
1255    /// When enabled, all endpoints receive the exchange simultaneously.
1256    /// When disabled (default), only one endpoint is selected per exchange.
1257    pub fn parallel(mut self, parallel: bool) -> Self {
1258        self.config = self.config.parallel(parallel);
1259        self
1260    }
1261
1262    /// Close the load balance scope. Packages the accumulated sub-steps into a
1263    /// `BuilderStep::LoadBalance` and returns the parent `RouteBuilder`.
1264    pub fn end_load_balance(mut self) -> RouteBuilder {
1265        let step = BuilderStep::LoadBalance {
1266            config: self.config,
1267            steps: self.steps,
1268        };
1269        self.parent.steps.push(step);
1270        self.parent
1271    }
1272}
1273
1274impl StepAccumulator for LoadBalancerBuilder {
1275    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1276        &mut self.steps
1277    }
1278}
1279
1280// ---------------------------------------------------------------------------
1281// Tests
1282// ---------------------------------------------------------------------------
1283
1284#[cfg(test)]
1285mod tests {
1286    use super::*;
1287    use camel_api::error_handler::ErrorHandlerConfig;
1288    use camel_api::load_balancer::LoadBalanceStrategy;
1289    use camel_api::{Exchange, Message};
1290    use camel_core::route::BuilderStep;
1291    use std::sync::Arc;
1292    use std::time::Duration;
1293    use tower::{Service, ServiceExt};
1294
1295    #[test]
1296    fn test_builder_from_creates_definition() {
1297        let definition = RouteBuilder::from("timer:tick")
1298            .route_id("test-route")
1299            .build()
1300            .unwrap();
1301        assert_eq!(definition.from_uri(), "timer:tick");
1302    }
1303
1304    #[test]
1305    fn test_builder_empty_from_uri_errors() {
1306        let result = RouteBuilder::from("").route_id("test-route").build();
1307        assert!(result.is_err());
1308    }
1309
1310    #[test]
1311    fn test_builder_to_adds_step() {
1312        let definition = RouteBuilder::from("timer:tick")
1313            .route_id("test-route")
1314            .to("log:info")
1315            .build()
1316            .unwrap();
1317
1318        assert_eq!(definition.from_uri(), "timer:tick");
1319        // We can verify steps were added by checking the structure
1320        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1321    }
1322
1323    #[test]
1324    fn test_builder_filter_adds_filter_step() {
1325        let definition = RouteBuilder::from("timer:tick")
1326            .route_id("test-route")
1327            .filter(|_ex| true)
1328            .to("mock:result")
1329            .end_filter()
1330            .build()
1331            .unwrap();
1332
1333        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1334    }
1335
1336    #[test]
1337    fn test_builder_set_header_adds_processor_step() {
1338        let definition = RouteBuilder::from("timer:tick")
1339            .route_id("test-route")
1340            .set_header("key", Value::String("value".into()))
1341            .build()
1342            .unwrap();
1343
1344        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1345    }
1346
1347    #[test]
1348    fn test_builder_map_body_adds_processor_step() {
1349        let definition = RouteBuilder::from("timer:tick")
1350            .route_id("test-route")
1351            .map_body(|body| body)
1352            .build()
1353            .unwrap();
1354
1355        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1356    }
1357
1358    #[test]
1359    fn test_builder_process_adds_processor_step() {
1360        let definition = RouteBuilder::from("timer:tick")
1361            .route_id("test-route")
1362            .process(|ex| async move { Ok(ex) })
1363            .build()
1364            .unwrap();
1365
1366        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1367    }
1368
1369    #[test]
1370    fn test_builder_chain_multiple_steps() {
1371        let definition = RouteBuilder::from("timer:tick")
1372            .route_id("test-route")
1373            .set_header("source", Value::String("timer".into()))
1374            .filter(|ex| ex.input.header("source").is_some())
1375            .to("log:info")
1376            .end_filter()
1377            .to("mock:result")
1378            .build()
1379            .unwrap();
1380
1381        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
1382        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
1383        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
1384        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1385    }
1386
1387    // -----------------------------------------------------------------------
1388    // Processor behavior tests — exercise the real Tower services directly
1389    // -----------------------------------------------------------------------
1390
1391    #[tokio::test]
1392    async fn test_set_header_processor_works() {
1393        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1394        let exchange = Exchange::new(Message::new("test"));
1395        let result = svc.call(exchange).await.unwrap();
1396        assert_eq!(
1397            result.input.header("greeting"),
1398            Some(&Value::String("hello".into()))
1399        );
1400    }
1401
1402    #[tokio::test]
1403    async fn test_filter_processor_passes() {
1404        use camel_api::BoxProcessorExt;
1405        use camel_processor::FilterService;
1406
1407        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1408        let mut svc =
1409            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1410        let exchange = Exchange::new(Message::new("pass"));
1411        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1412        assert_eq!(result.input.body.as_text(), Some("pass"));
1413    }
1414
1415    #[tokio::test]
1416    async fn test_filter_processor_blocks() {
1417        use camel_api::BoxProcessorExt;
1418        use camel_processor::FilterService;
1419
1420        let sub = BoxProcessor::from_fn(|_ex| {
1421            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1422        });
1423        let mut svc =
1424            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1425        let exchange = Exchange::new(Message::new("reject"));
1426        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1427        assert_eq!(result.input.body.as_text(), Some("reject"));
1428    }
1429
1430    #[tokio::test]
1431    async fn test_map_body_processor_works() {
1432        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1433            if let Some(text) = body.as_text() {
1434                Body::Text(text.to_uppercase())
1435            } else {
1436                body
1437            }
1438        });
1439        let exchange = Exchange::new(Message::new("hello"));
1440        let result = mapper.oneshot(exchange).await.unwrap();
1441        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1442    }
1443
1444    #[tokio::test]
1445    async fn test_process_custom_processor_works() {
1446        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1447            ex.set_property("custom", Value::Bool(true));
1448            Ok(ex)
1449        });
1450        let exchange = Exchange::new(Message::default());
1451        let result = processor.oneshot(exchange).await.unwrap();
1452        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1453    }
1454
1455    // -----------------------------------------------------------------------
1456    // Sequential pipeline test
1457    // -----------------------------------------------------------------------
1458
1459    #[tokio::test]
1460    async fn test_compose_pipeline_runs_steps_in_order() {
1461        use camel_core::route::compose_pipeline;
1462
1463        let processors = vec![
1464            BoxProcessor::new(SetHeader::new(
1465                IdentityProcessor,
1466                "step",
1467                Value::String("one".into()),
1468            )),
1469            BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1470                if let Some(text) = body.as_text() {
1471                    Body::Text(format!("{}-processed", text))
1472                } else {
1473                    body
1474                }
1475            })),
1476        ];
1477
1478        let pipeline = compose_pipeline(processors);
1479        let exchange = Exchange::new(Message::new("hello"));
1480        let result = pipeline.oneshot(exchange).await.unwrap();
1481
1482        assert_eq!(
1483            result.input.header("step"),
1484            Some(&Value::String("one".into()))
1485        );
1486        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1487    }
1488
1489    #[tokio::test]
1490    async fn test_compose_pipeline_empty_is_identity() {
1491        use camel_core::route::compose_pipeline;
1492
1493        let pipeline = compose_pipeline(vec![]);
1494        let exchange = Exchange::new(Message::new("unchanged"));
1495        let result = pipeline.oneshot(exchange).await.unwrap();
1496        assert_eq!(result.input.body.as_text(), Some("unchanged"));
1497    }
1498
1499    // -----------------------------------------------------------------------
1500    // Circuit breaker builder tests
1501    // -----------------------------------------------------------------------
1502
1503    #[test]
1504    fn test_builder_circuit_breaker_sets_config() {
1505        use camel_api::circuit_breaker::CircuitBreakerConfig;
1506
1507        let config = CircuitBreakerConfig::new().failure_threshold(5);
1508        let definition = RouteBuilder::from("timer:tick")
1509            .route_id("test-route")
1510            .circuit_breaker(config)
1511            .build()
1512            .unwrap();
1513
1514        let cb = definition
1515            .circuit_breaker_config()
1516            .expect("circuit breaker should be set");
1517        assert_eq!(cb.failure_threshold, 5);
1518    }
1519
1520    #[test]
1521    fn test_builder_circuit_breaker_with_error_handler() {
1522        use camel_api::circuit_breaker::CircuitBreakerConfig;
1523        use camel_api::error_handler::ErrorHandlerConfig;
1524
1525        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1526        let eh_config = ErrorHandlerConfig::log_only();
1527
1528        let definition = RouteBuilder::from("timer:tick")
1529            .route_id("test-route")
1530            .to("log:info")
1531            .circuit_breaker(cb_config)
1532            .error_handler(eh_config)
1533            .build()
1534            .unwrap();
1535
1536        assert!(
1537            definition.circuit_breaker_config().is_some(),
1538            "circuit breaker config should be set"
1539        );
1540        // Route definition was built successfully with both configs.
1541    }
1542
1543    #[test]
1544    fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1545        let definition = RouteBuilder::from("direct:start")
1546            .route_id("test-route")
1547            .dead_letter_channel("log:dlc")
1548            .on_exception(|e| matches!(e, CamelError::Io(_)))
1549            .retry(3)
1550            .handled_by("log:io")
1551            .end_on_exception()
1552            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1553            .retry(1)
1554            .end_on_exception()
1555            .to("mock:out")
1556            .build()
1557            .expect("route should build");
1558
1559        let cfg = definition
1560            .error_handler_config()
1561            .expect("error handler should be set");
1562        assert_eq!(cfg.policies.len(), 2);
1563        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1564        assert_eq!(
1565            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1566            Some(3)
1567        );
1568        assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1569        assert_eq!(
1570            cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1571            Some(1)
1572        );
1573    }
1574
1575    #[test]
1576    fn test_builder_on_exception_mixed_mode_rejected() {
1577        let result = RouteBuilder::from("direct:start")
1578            .route_id("test-route")
1579            .error_handler(ErrorHandlerConfig::log_only())
1580            .on_exception(|_e| true)
1581            .end_on_exception()
1582            .to("mock:out")
1583            .build();
1584
1585        let err = result.err().expect("mixed mode should fail with an error");
1586
1587        assert!(
1588            format!("{err}").contains("mixed error handler modes"),
1589            "unexpected error: {err}"
1590        );
1591    }
1592
1593    #[test]
1594    fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1595        let definition = RouteBuilder::from("direct:start")
1596            .route_id("test-route")
1597            .on_exception(|_e| true)
1598            .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1599            .with_jitter(0.5)
1600            .end_on_exception()
1601            .to("mock:out")
1602            .build()
1603            .expect("route should build");
1604
1605        let cfg = definition
1606            .error_handler_config()
1607            .expect("error handler should be set");
1608        assert_eq!(cfg.policies.len(), 1);
1609        assert!(cfg.policies[0].retry.is_none());
1610    }
1611
1612    #[test]
1613    fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1614        let definition = RouteBuilder::from("direct:start")
1615            .route_id("test-route")
1616            .dead_letter_channel("log:dlc")
1617            .to("mock:out")
1618            .build()
1619            .expect("route should build");
1620
1621        let cfg = definition
1622            .error_handler_config()
1623            .expect("error handler should be set");
1624        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1625        assert!(cfg.policies.is_empty());
1626    }
1627
1628    #[test]
1629    fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1630        let definition = RouteBuilder::from("direct:start")
1631            .route_id("test-route")
1632            .dead_letter_channel("log:first")
1633            .on_exception(|e| matches!(e, CamelError::Io(_)))
1634            .retry(2)
1635            .end_on_exception()
1636            .dead_letter_channel("log:second")
1637            .to("mock:out")
1638            .build()
1639            .expect("route should build");
1640
1641        let cfg = definition
1642            .error_handler_config()
1643            .expect("error handler should be set");
1644        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1645        assert_eq!(cfg.policies.len(), 1);
1646        assert_eq!(
1647            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1648            Some(2)
1649        );
1650    }
1651
1652    #[test]
1653    fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
1654        let definition = RouteBuilder::from("direct:start")
1655            .route_id("test-route")
1656            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1657            .retry(1)
1658            .end_on_exception()
1659            .to("mock:out")
1660            .build()
1661            .expect("route should build");
1662
1663        let cfg = definition
1664            .error_handler_config()
1665            .expect("error handler should be set");
1666        assert!(cfg.dlc_uri.is_none());
1667        assert_eq!(cfg.policies.len(), 1);
1668    }
1669
1670    #[test]
1671    fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
1672        let first = ErrorHandlerConfig::dead_letter_channel("log:first");
1673        let second = ErrorHandlerConfig::dead_letter_channel("log:second");
1674
1675        let definition = RouteBuilder::from("direct:start")
1676            .route_id("test-route")
1677            .error_handler(first)
1678            .error_handler(second)
1679            .to("mock:out")
1680            .build()
1681            .expect("route should build");
1682
1683        let cfg = definition
1684            .error_handler_config()
1685            .expect("error handler should be set");
1686        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1687    }
1688
1689    // --- Splitter builder tests ---
1690
1691    #[test]
1692    fn test_split_builder_typestate() {
1693        use camel_api::splitter::{SplitterConfig, split_body_lines};
1694
1695        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
1696        let definition = RouteBuilder::from("timer:test?period=1000")
1697            .route_id("test-route")
1698            .split(SplitterConfig::new(split_body_lines()))
1699            .to("mock:per-fragment")
1700            .end_split()
1701            .to("mock:final")
1702            .build()
1703            .unwrap();
1704
1705        // Should have 2 top-level steps: Split + To("mock:final")
1706        assert_eq!(definition.steps().len(), 2);
1707    }
1708
1709    #[test]
1710    fn test_split_builder_steps_collected() {
1711        use camel_api::splitter::{SplitterConfig, split_body_lines};
1712
1713        let definition = RouteBuilder::from("timer:test?period=1000")
1714            .route_id("test-route")
1715            .split(SplitterConfig::new(split_body_lines()))
1716            .set_header("fragment", Value::String("yes".into()))
1717            .to("mock:per-fragment")
1718            .end_split()
1719            .build()
1720            .unwrap();
1721
1722        // Should have 1 top-level step: Split (containing 2 sub-steps)
1723        assert_eq!(definition.steps().len(), 1);
1724        match &definition.steps()[0] {
1725            BuilderStep::Split { steps, .. } => {
1726                assert_eq!(steps.len(), 2); // SetHeader + To
1727            }
1728            other => panic!("Expected Split, got {:?}", other),
1729        }
1730    }
1731
1732    #[test]
1733    fn test_split_builder_config_propagated() {
1734        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1735
1736        let definition = RouteBuilder::from("timer:test?period=1000")
1737            .route_id("test-route")
1738            .split(
1739                SplitterConfig::new(split_body_lines())
1740                    .parallel(true)
1741                    .parallel_limit(4)
1742                    .aggregation(AggregationStrategy::CollectAll),
1743            )
1744            .to("mock:per-fragment")
1745            .end_split()
1746            .build()
1747            .unwrap();
1748
1749        match &definition.steps()[0] {
1750            BuilderStep::Split { config, .. } => {
1751                assert!(config.parallel);
1752                assert_eq!(config.parallel_limit, Some(4));
1753                assert!(matches!(
1754                    config.aggregation,
1755                    AggregationStrategy::CollectAll
1756                ));
1757            }
1758            other => panic!("Expected Split, got {:?}", other),
1759        }
1760    }
1761
1762    #[test]
1763    fn test_aggregate_builder_adds_step() {
1764        use camel_api::aggregator::AggregatorConfig;
1765        use camel_core::route::BuilderStep;
1766
1767        let definition = RouteBuilder::from("timer:tick")
1768            .route_id("test-route")
1769            .aggregate(
1770                AggregatorConfig::correlate_by("key")
1771                    .complete_when_size(2)
1772                    .build(),
1773            )
1774            .build()
1775            .unwrap();
1776
1777        assert_eq!(definition.steps().len(), 1);
1778        assert!(matches!(
1779            definition.steps()[0],
1780            BuilderStep::Aggregate { .. }
1781        ));
1782    }
1783
1784    #[test]
1785    fn test_aggregate_in_split_builder() {
1786        use camel_api::aggregator::AggregatorConfig;
1787        use camel_api::splitter::{SplitterConfig, split_body_lines};
1788        use camel_core::route::BuilderStep;
1789
1790        let definition = RouteBuilder::from("timer:tick")
1791            .route_id("test-route")
1792            .split(SplitterConfig::new(split_body_lines()))
1793            .aggregate(
1794                AggregatorConfig::correlate_by("key")
1795                    .complete_when_size(1)
1796                    .build(),
1797            )
1798            .end_split()
1799            .build()
1800            .unwrap();
1801
1802        assert_eq!(definition.steps().len(), 1);
1803        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
1804            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
1805        } else {
1806            panic!("expected Split step");
1807        }
1808    }
1809
1810    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
1811
1812    #[test]
1813    fn test_builder_set_body_static_adds_processor() {
1814        let definition = RouteBuilder::from("timer:tick")
1815            .route_id("test-route")
1816            .set_body("fixed")
1817            .build()
1818            .unwrap();
1819        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1820    }
1821
1822    #[test]
1823    fn test_builder_set_body_fn_adds_processor() {
1824        let definition = RouteBuilder::from("timer:tick")
1825            .route_id("test-route")
1826            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
1827            .build()
1828            .unwrap();
1829        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1830    }
1831
1832    #[test]
1833    fn transform_alias_produces_same_as_set_body() {
1834        let route_transform = RouteBuilder::from("timer:tick")
1835            .route_id("test-route")
1836            .transform("hello")
1837            .build()
1838            .unwrap();
1839
1840        let route_set_body = RouteBuilder::from("timer:tick")
1841            .route_id("test-route")
1842            .set_body("hello")
1843            .build()
1844            .unwrap();
1845
1846        assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
1847    }
1848
1849    #[test]
1850    fn test_builder_set_header_fn_adds_processor() {
1851        let definition = RouteBuilder::from("timer:tick")
1852            .route_id("test-route")
1853            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
1854            .build()
1855            .unwrap();
1856        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1857    }
1858
1859    #[tokio::test]
1860    async fn test_set_body_static_processor_works() {
1861        use camel_core::route::compose_pipeline;
1862        let def = RouteBuilder::from("t:t")
1863            .route_id("test-route")
1864            .set_body("replaced")
1865            .build()
1866            .unwrap();
1867        let pipeline = compose_pipeline(
1868            def.steps()
1869                .iter()
1870                .filter_map(|s| {
1871                    if let BuilderStep::Processor(p) = s {
1872                        Some(p.clone())
1873                    } else {
1874                        None
1875                    }
1876                })
1877                .collect(),
1878        );
1879        let exchange = Exchange::new(Message::new("original"));
1880        let result = pipeline.oneshot(exchange).await.unwrap();
1881        assert_eq!(result.input.body.as_text(), Some("replaced"));
1882    }
1883
1884    #[tokio::test]
1885    async fn test_set_body_fn_processor_works() {
1886        use camel_core::route::compose_pipeline;
1887        let def = RouteBuilder::from("t:t")
1888            .route_id("test-route")
1889            .set_body_fn(|ex: &Exchange| {
1890                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
1891            })
1892            .build()
1893            .unwrap();
1894        let pipeline = compose_pipeline(
1895            def.steps()
1896                .iter()
1897                .filter_map(|s| {
1898                    if let BuilderStep::Processor(p) = s {
1899                        Some(p.clone())
1900                    } else {
1901                        None
1902                    }
1903                })
1904                .collect(),
1905        );
1906        let exchange = Exchange::new(Message::new("hello"));
1907        let result = pipeline.oneshot(exchange).await.unwrap();
1908        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1909    }
1910
1911    #[tokio::test]
1912    async fn test_set_header_fn_processor_works() {
1913        use camel_core::route::compose_pipeline;
1914        let def = RouteBuilder::from("t:t")
1915            .route_id("test-route")
1916            .set_header_fn("echo", |ex: &Exchange| {
1917                ex.input
1918                    .body
1919                    .as_text()
1920                    .map(|t| Value::String(t.into()))
1921                    .unwrap_or(Value::Null)
1922            })
1923            .build()
1924            .unwrap();
1925        let pipeline = compose_pipeline(
1926            def.steps()
1927                .iter()
1928                .filter_map(|s| {
1929                    if let BuilderStep::Processor(p) = s {
1930                        Some(p.clone())
1931                    } else {
1932                        None
1933                    }
1934                })
1935                .collect(),
1936        );
1937        let exchange = Exchange::new(Message::new("ping"));
1938        let result = pipeline.oneshot(exchange).await.unwrap();
1939        assert_eq!(
1940            result.input.header("echo"),
1941            Some(&Value::String("ping".into()))
1942        );
1943    }
1944
1945    // ── FilterBuilder typestate tests ─────────────────────────────────────
1946
1947    #[test]
1948    fn test_filter_builder_typestate() {
1949        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1950            .route_id("test-route")
1951            .filter(|_ex| true)
1952            .to("mock:inner")
1953            .end_filter()
1954            .to("mock:outer")
1955            .build();
1956        assert!(result.is_ok());
1957    }
1958
1959    #[test]
1960    fn test_filter_builder_steps_collected() {
1961        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1962            .route_id("test-route")
1963            .filter(|_ex| true)
1964            .to("mock:inner")
1965            .end_filter()
1966            .build()
1967            .unwrap();
1968
1969        assert_eq!(definition.steps().len(), 1);
1970        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1971    }
1972
1973    #[test]
1974    fn test_wire_tap_builder_adds_step() {
1975        let definition = RouteBuilder::from("timer:tick")
1976            .route_id("test-route")
1977            .wire_tap("mock:tap")
1978            .to("mock:result")
1979            .build()
1980            .unwrap();
1981
1982        assert_eq!(definition.steps().len(), 2);
1983        assert!(
1984            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
1985        );
1986        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
1987    }
1988
1989    // ── MulticastBuilder typestate tests ─────────────────────────────────────
1990
1991    #[test]
1992    fn test_multicast_builder_typestate() {
1993        let definition = RouteBuilder::from("timer:tick")
1994            .route_id("test-route")
1995            .multicast()
1996            .to("direct:a")
1997            .to("direct:b")
1998            .end_multicast()
1999            .to("mock:result")
2000            .build()
2001            .unwrap();
2002
2003        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
2004    }
2005
2006    #[test]
2007    fn test_multicast_builder_steps_collected() {
2008        let definition = RouteBuilder::from("timer:tick")
2009            .route_id("test-route")
2010            .multicast()
2011            .to("direct:a")
2012            .to("direct:b")
2013            .end_multicast()
2014            .build()
2015            .unwrap();
2016
2017        match &definition.steps()[0] {
2018            BuilderStep::Multicast { steps, .. } => {
2019                assert_eq!(steps.len(), 2);
2020            }
2021            other => panic!("Expected Multicast, got {:?}", other),
2022        }
2023    }
2024
2025    // ── Concurrency builder tests ─────────────────────────────────────
2026
2027    #[test]
2028    fn test_builder_concurrent_sets_concurrency() {
2029        use camel_component_api::ConcurrencyModel;
2030
2031        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2032            .route_id("test-route")
2033            .concurrent(16)
2034            .to("log:info")
2035            .build()
2036            .unwrap();
2037
2038        assert_eq!(
2039            definition.concurrency_override(),
2040            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2041        );
2042    }
2043
2044    #[test]
2045    fn test_builder_concurrent_zero_means_unbounded() {
2046        use camel_component_api::ConcurrencyModel;
2047
2048        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2049            .route_id("test-route")
2050            .concurrent(0)
2051            .to("log:info")
2052            .build()
2053            .unwrap();
2054
2055        assert_eq!(
2056            definition.concurrency_override(),
2057            Some(&ConcurrencyModel::Concurrent { max: None })
2058        );
2059    }
2060
2061    #[test]
2062    fn test_builder_sequential_sets_concurrency() {
2063        use camel_component_api::ConcurrencyModel;
2064
2065        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2066            .route_id("test-route")
2067            .sequential()
2068            .to("log:info")
2069            .build()
2070            .unwrap();
2071
2072        assert_eq!(
2073            definition.concurrency_override(),
2074            Some(&ConcurrencyModel::Sequential)
2075        );
2076    }
2077
2078    #[test]
2079    fn test_builder_default_concurrency_is_none() {
2080        let definition = RouteBuilder::from("timer:tick")
2081            .route_id("test-route")
2082            .to("log:info")
2083            .build()
2084            .unwrap();
2085
2086        assert_eq!(definition.concurrency_override(), None);
2087    }
2088
2089    // ── Route lifecycle builder tests ─────────────────────────────────────
2090
2091    #[test]
2092    fn test_builder_route_id_sets_id() {
2093        let definition = RouteBuilder::from("timer:tick")
2094            .route_id("my-route")
2095            .build()
2096            .unwrap();
2097
2098        assert_eq!(definition.route_id(), "my-route");
2099    }
2100
2101    #[test]
2102    fn test_build_without_route_id_fails() {
2103        let result = RouteBuilder::from("timer:tick?period=1000")
2104            .to("log:info")
2105            .build();
2106        let err = match result {
2107            Err(e) => e.to_string(),
2108            Ok(_) => panic!("build() should fail without route_id"),
2109        };
2110        assert!(
2111            err.contains("route_id"),
2112            "error should mention route_id, got: {}",
2113            err
2114        );
2115    }
2116
2117    #[test]
2118    fn test_builder_auto_startup_false() {
2119        let definition = RouteBuilder::from("timer:tick")
2120            .route_id("test-route")
2121            .auto_startup(false)
2122            .build()
2123            .unwrap();
2124
2125        assert!(!definition.auto_startup());
2126    }
2127
2128    #[test]
2129    fn test_builder_startup_order_custom() {
2130        let definition = RouteBuilder::from("timer:tick")
2131            .route_id("test-route")
2132            .startup_order(50)
2133            .build()
2134            .unwrap();
2135
2136        assert_eq!(definition.startup_order(), 50);
2137    }
2138
2139    #[test]
2140    fn test_builder_defaults() {
2141        let definition = RouteBuilder::from("timer:tick")
2142            .route_id("test-route")
2143            .build()
2144            .unwrap();
2145
2146        assert_eq!(definition.route_id(), "test-route");
2147        assert!(definition.auto_startup());
2148        assert_eq!(definition.startup_order(), 1000);
2149    }
2150
2151    // ── Choice typestate tests ──────────────────────────────────────────────────
2152
2153    #[test]
2154    fn test_choice_builder_single_when() {
2155        let definition = RouteBuilder::from("timer:tick")
2156            .route_id("test-route")
2157            .choice()
2158            .when(|ex: &Exchange| ex.input.header("type").is_some())
2159            .to("mock:typed")
2160            .end_when()
2161            .end_choice()
2162            .build()
2163            .unwrap();
2164        assert_eq!(definition.steps().len(), 1);
2165        assert!(
2166            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2167            if whens.len() == 1 && otherwise.is_none())
2168        );
2169    }
2170
2171    #[test]
2172    fn test_choice_builder_when_otherwise() {
2173        let definition = RouteBuilder::from("timer:tick")
2174            .route_id("test-route")
2175            .choice()
2176            .when(|ex: &Exchange| ex.input.header("a").is_some())
2177            .to("mock:a")
2178            .end_when()
2179            .otherwise()
2180            .to("mock:fallback")
2181            .end_otherwise()
2182            .end_choice()
2183            .build()
2184            .unwrap();
2185        assert!(
2186            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2187            if whens.len() == 1 && otherwise.is_some())
2188        );
2189    }
2190
2191    #[test]
2192    fn test_choice_builder_multiple_whens() {
2193        let definition = RouteBuilder::from("timer:tick")
2194            .route_id("test-route")
2195            .choice()
2196            .when(|ex: &Exchange| ex.input.header("a").is_some())
2197            .to("mock:a")
2198            .end_when()
2199            .when(|ex: &Exchange| ex.input.header("b").is_some())
2200            .to("mock:b")
2201            .end_when()
2202            .end_choice()
2203            .build()
2204            .unwrap();
2205        assert!(
2206            matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2207            if whens.len() == 2)
2208        );
2209    }
2210
2211    #[test]
2212    fn test_choice_step_after_choice() {
2213        // Steps after end_choice() are added to the outer pipeline, not inside choice.
2214        let definition = RouteBuilder::from("timer:tick")
2215            .route_id("test-route")
2216            .choice()
2217            .when(|_ex: &Exchange| true)
2218            .to("mock:inner")
2219            .end_when()
2220            .end_choice()
2221            .to("mock:outer") // must be step[1], not inside choice
2222            .build()
2223            .unwrap();
2224        assert_eq!(definition.steps().len(), 2);
2225        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2226    }
2227
2228    // ── Throttle typestate tests ──────────────────────────────────────────────────
2229
2230    #[test]
2231    fn test_throttle_builder_typestate() {
2232        let definition = RouteBuilder::from("timer:tick")
2233            .route_id("test-route")
2234            .throttle(10, std::time::Duration::from_secs(1))
2235            .to("mock:result")
2236            .end_throttle()
2237            .build()
2238            .unwrap();
2239
2240        assert_eq!(definition.steps().len(), 1);
2241        assert!(matches!(
2242            &definition.steps()[0],
2243            BuilderStep::Throttle { .. }
2244        ));
2245    }
2246
2247    #[test]
2248    fn test_throttle_builder_with_strategy() {
2249        let definition = RouteBuilder::from("timer:tick")
2250            .route_id("test-route")
2251            .throttle(10, std::time::Duration::from_secs(1))
2252            .strategy(ThrottleStrategy::Reject)
2253            .to("mock:result")
2254            .end_throttle()
2255            .build()
2256            .unwrap();
2257
2258        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2259            assert_eq!(config.strategy, ThrottleStrategy::Reject);
2260        } else {
2261            panic!("Expected Throttle step");
2262        }
2263    }
2264
2265    #[test]
2266    fn test_throttle_builder_steps_collected() {
2267        let definition = RouteBuilder::from("timer:tick")
2268            .route_id("test-route")
2269            .throttle(5, std::time::Duration::from_secs(1))
2270            .set_header("throttled", Value::Bool(true))
2271            .to("mock:throttled")
2272            .end_throttle()
2273            .build()
2274            .unwrap();
2275
2276        match &definition.steps()[0] {
2277            BuilderStep::Throttle { steps, .. } => {
2278                assert_eq!(steps.len(), 2); // SetHeader + To
2279            }
2280            other => panic!("Expected Throttle, got {:?}", other),
2281        }
2282    }
2283
2284    #[test]
2285    fn test_throttle_step_after_throttle() {
2286        // Steps after end_throttle() are added to the outer pipeline, not inside throttle.
2287        let definition = RouteBuilder::from("timer:tick")
2288            .route_id("test-route")
2289            .throttle(10, std::time::Duration::from_secs(1))
2290            .to("mock:inner")
2291            .end_throttle()
2292            .to("mock:outer")
2293            .build()
2294            .unwrap();
2295
2296        assert_eq!(definition.steps().len(), 2);
2297        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2298    }
2299
2300    // ── LoadBalance typestate tests ──────────────────────────────────────────────────
2301
2302    #[test]
2303    fn test_load_balance_builder_typestate() {
2304        let definition = RouteBuilder::from("timer:tick")
2305            .route_id("test-route")
2306            .load_balance()
2307            .round_robin()
2308            .to("mock:a")
2309            .to("mock:b")
2310            .end_load_balance()
2311            .build()
2312            .unwrap();
2313
2314        assert_eq!(definition.steps().len(), 1);
2315        assert!(matches!(
2316            &definition.steps()[0],
2317            BuilderStep::LoadBalance { .. }
2318        ));
2319    }
2320
2321    #[test]
2322    fn test_load_balance_builder_with_strategy() {
2323        let definition = RouteBuilder::from("timer:tick")
2324            .route_id("test-route")
2325            .load_balance()
2326            .random()
2327            .to("mock:result")
2328            .end_load_balance()
2329            .build()
2330            .unwrap();
2331
2332        if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2333            assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2334        } else {
2335            panic!("Expected LoadBalance step");
2336        }
2337    }
2338
2339    #[test]
2340    fn test_load_balance_builder_steps_collected() {
2341        let definition = RouteBuilder::from("timer:tick")
2342            .route_id("test-route")
2343            .load_balance()
2344            .set_header("lb", Value::Bool(true))
2345            .to("mock:a")
2346            .end_load_balance()
2347            .build()
2348            .unwrap();
2349
2350        match &definition.steps()[0] {
2351            BuilderStep::LoadBalance { steps, .. } => {
2352                assert_eq!(steps.len(), 2); // SetHeader + To
2353            }
2354            other => panic!("Expected LoadBalance, got {:?}", other),
2355        }
2356    }
2357
2358    #[test]
2359    fn test_load_balance_step_after_load_balance() {
2360        // Steps after end_load_balance() are added to the outer pipeline, not inside load_balance.
2361        let definition = RouteBuilder::from("timer:tick")
2362            .route_id("test-route")
2363            .load_balance()
2364            .to("mock:inner")
2365            .end_load_balance()
2366            .to("mock:outer")
2367            .build()
2368            .unwrap();
2369
2370        assert_eq!(definition.steps().len(), 2);
2371        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2372    }
2373
2374    // ── DynamicRouter typestate tests ──────────────────────────────────────────────────
2375
2376    #[test]
2377    fn test_dynamic_router_builder() {
2378        let definition = RouteBuilder::from("timer:tick")
2379            .route_id("test-route")
2380            .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2381            .build()
2382            .unwrap();
2383
2384        assert_eq!(definition.steps().len(), 1);
2385        assert!(matches!(
2386            &definition.steps()[0],
2387            BuilderStep::DynamicRouter { .. }
2388        ));
2389    }
2390
2391    #[test]
2392    fn test_dynamic_router_builder_with_config() {
2393        let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2394            .max_iterations(100)
2395            .cache_size(500);
2396
2397        let definition = RouteBuilder::from("timer:tick")
2398            .route_id("test-route")
2399            .dynamic_router_with_config(config)
2400            .build()
2401            .unwrap();
2402
2403        assert_eq!(definition.steps().len(), 1);
2404        if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2405            assert_eq!(config.max_iterations, 100);
2406            assert_eq!(config.cache_size, 500);
2407        } else {
2408            panic!("Expected DynamicRouter step");
2409        }
2410    }
2411
2412    #[test]
2413    fn test_dynamic_router_step_after_router() {
2414        // Steps after dynamic_router() are added to the outer pipeline.
2415        let definition = RouteBuilder::from("timer:tick")
2416            .route_id("test-route")
2417            .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2418            .to("mock:outer")
2419            .build()
2420            .unwrap();
2421
2422        assert_eq!(definition.steps().len(), 2);
2423        assert!(matches!(
2424            &definition.steps()[0],
2425            BuilderStep::DynamicRouter { .. }
2426        ));
2427        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2428    }
2429
2430    #[test]
2431    fn routing_slip_builder_creates_step() {
2432        use camel_api::RoutingSlipExpression;
2433
2434        let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2435
2436        let route = RouteBuilder::from("direct:start")
2437            .route_id("routing-slip-test")
2438            .routing_slip(expression)
2439            .build()
2440            .unwrap();
2441
2442        assert!(
2443            matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2444            "Expected RoutingSlip step"
2445        );
2446    }
2447
2448    #[test]
2449    fn routing_slip_with_config_builder_creates_step() {
2450        use camel_api::RoutingSlipConfig;
2451
2452        let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2453            .uri_delimiter("|")
2454            .cache_size(50)
2455            .ignore_invalid_endpoints(true);
2456
2457        let route = RouteBuilder::from("direct:start")
2458            .route_id("routing-slip-config-test")
2459            .routing_slip_with_config(config)
2460            .build()
2461            .unwrap();
2462
2463        if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2464            assert_eq!(config.uri_delimiter, "|");
2465            assert_eq!(config.cache_size, 50);
2466            assert!(config.ignore_invalid_endpoints);
2467        } else {
2468            panic!("Expected RoutingSlip step");
2469        }
2470    }
2471
2472    #[test]
2473    fn test_builder_marshal_adds_processor_step() {
2474        let definition = RouteBuilder::from("timer:tick")
2475            .route_id("test-route")
2476            .marshal("json")
2477            .build()
2478            .unwrap();
2479        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2480    }
2481
2482    #[test]
2483    fn test_builder_unmarshal_adds_processor_step() {
2484        let definition = RouteBuilder::from("timer:tick")
2485            .route_id("test-route")
2486            .unmarshal("json")
2487            .build()
2488            .unwrap();
2489        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2490    }
2491
2492    #[test]
2493    #[should_panic(expected = "unknown data format: 'csv'")]
2494    fn test_builder_marshal_panics_on_unknown_format() {
2495        let _ = RouteBuilder::from("timer:tick")
2496            .route_id("test-route")
2497            .marshal("csv")
2498            .build();
2499    }
2500
2501    #[test]
2502    #[should_panic(expected = "unknown data format: 'csv'")]
2503    fn test_builder_unmarshal_panics_on_unknown_format() {
2504        let _ = RouteBuilder::from("timer:tick")
2505            .route_id("test-route")
2506            .unmarshal("csv")
2507            .build();
2508    }
2509}