Skip to main content

camel_builder/
lib.rs

1use camel_api::DelayConfig;
2use camel_api::aggregator::{
3    AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
4};
5use camel_api::body::Body;
6use camel_api::body_converter::BodyType;
7use camel_api::circuit_breaker::CircuitBreakerConfig;
8use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
9use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
10use camel_api::load_balancer::LoadBalancerConfig;
11use camel_api::multicast::{MulticastConfig, MulticastStrategy};
12use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
13use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
14use camel_api::splitter::SplitterConfig;
15use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
16use camel_api::{
17    BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
18    ProcessorFn, Value,
19    runtime::{
20        CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
21        CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
22        CanonicalWhenSpec,
23    },
24};
25use camel_component_api::ConcurrencyModel;
26use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
27use camel_processor::{
28    ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
29    UnmarshalService, builtin_data_format,
30};
31
32/// Shared step-accumulation methods for all builder types.
33///
34/// Implementors provide `steps_mut()` and get step-adding methods for free.
35/// `filter()` and other branching methods are NOT included — they return
36/// different types per builder and stay as per-builder methods.
37pub trait StepAccumulator: Sized {
38    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
39
40    fn to(mut self, endpoint: impl Into<String>) -> Self {
41        self.steps_mut().push(BuilderStep::To(endpoint.into()));
42        self
43    }
44
45    fn process<F, Fut>(mut self, f: F) -> Self
46    where
47        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
48        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
49    {
50        let svc = ProcessorFn::new(f);
51        self.steps_mut()
52            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
53        self
54    }
55
56    fn process_fn(mut self, processor: BoxProcessor) -> Self {
57        self.steps_mut().push(BuilderStep::Processor(processor));
58        self
59    }
60
61    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
62        let svc = SetHeader::new(IdentityProcessor, key, value);
63        self.steps_mut()
64            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
65        self
66    }
67
68    fn map_body<F>(mut self, mapper: F) -> Self
69    where
70        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
71    {
72        let svc = MapBody::new(IdentityProcessor, mapper);
73        self.steps_mut()
74            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
75        self
76    }
77
78    fn set_body<B>(mut self, body: B) -> Self
79    where
80        B: Into<Body> + Clone + Send + Sync + 'static,
81    {
82        let body: Body = body.into();
83        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
84        self.steps_mut()
85            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
86        self
87    }
88
89    /// Apache Camel-compatible alias for [`set_body`](Self::set_body).
90    ///
91    /// Transforms the message body using the given value. Semantically identical
92    /// to `set_body` — provided for familiarity with Apache Camel route DSLs.
93    fn transform<B>(self, body: B) -> Self
94    where
95        B: Into<Body> + Clone + Send + Sync + 'static,
96    {
97        self.set_body(body)
98    }
99
100    fn set_body_fn<F>(mut self, expr: F) -> Self
101    where
102        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
103    {
104        let svc = SetBody::new(IdentityProcessor, expr);
105        self.steps_mut()
106            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
107        self
108    }
109
110    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
111    where
112        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
113    {
114        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
115        self.steps_mut()
116            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
117        self
118    }
119
120    fn aggregate(mut self, config: AggregatorConfig) -> Self {
121        self.steps_mut().push(BuilderStep::Aggregate { config });
122        self
123    }
124
125    /// Stop processing this exchange immediately. No further steps in the
126    /// current pipeline will run.
127    ///
128    /// Can be used at any point in the route: directly on RouteBuilder,
129    /// inside `.filter()`, inside `.split()`, etc.
130    fn stop(mut self) -> Self {
131        self.steps_mut().push(BuilderStep::Stop);
132        self
133    }
134
135    fn delay(mut self, duration: std::time::Duration) -> Self {
136        self.steps_mut().push(BuilderStep::Delay {
137            config: DelayConfig::from_duration(duration),
138        });
139        self
140    }
141
142    fn delay_with_header(
143        mut self,
144        duration: std::time::Duration,
145        header: impl Into<String>,
146    ) -> Self {
147        self.steps_mut().push(BuilderStep::Delay {
148            config: DelayConfig::from_duration_with_header(duration, header),
149        });
150        self
151    }
152
153    /// Log a message at the specified level.
154    ///
155    /// The message will be logged when an exchange passes through this step.
156    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
157        self.steps_mut().push(BuilderStep::Log {
158            level,
159            message: message.into(),
160        });
161        self
162    }
163
164    /// Convert the message body to the target type.
165    ///
166    /// Supported: Text ↔ Json ↔ Bytes. `Body::Stream` always fails.
167    /// Returns `TypeConversionFailed` if conversion is not possible.
168    ///
169    /// # Example
170    /// ```ignore
171    /// route.set_body(Value::String(r#"{"x":1}"#.into()))
172    ///      .convert_body_to(BodyType::Json)
173    ///      .to("direct:next")
174    /// ```
175    fn convert_body_to(mut self, target: BodyType) -> Self {
176        let svc = ConvertBodyTo::new(IdentityProcessor, target);
177        self.steps_mut()
178            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
179        self
180    }
181
182    /// Marshal the message body using the specified data format.
183    ///
184    /// Supported formats: `"json"`, `"xml"`. Panics if the format name is unknown.
185    /// Converts a structured body (e.g., `Body::Json`) to a wire-format body (e.g., `Body::Text`).
186    ///
187    /// # Example
188    /// ```ignore
189    /// route.marshal("json").to("direct:next")
190    /// ```
191    fn marshal(mut self, format: impl Into<String>) -> Self {
192        let name = format.into();
193        let df =
194            builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
195        let svc = MarshalService::new(IdentityProcessor, df);
196        self.steps_mut()
197            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
198        self
199    }
200
201    /// Unmarshal the message body using the specified data format.
202    ///
203    /// Supported formats: `"json"`, `"xml"`. Panics if the format name is unknown.
204    /// Converts a wire-format body (e.g., `Body::Text`) to a structured body (e.g., `Body::Json`).
205    ///
206    /// # Example
207    /// ```ignore
208    /// route.unmarshal("json").to("direct:next")
209    /// ```
210    fn unmarshal(mut self, format: impl Into<String>) -> Self {
211        let name = format.into();
212        let df =
213            builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
214        let svc = UnmarshalService::new(IdentityProcessor, df);
215        self.steps_mut()
216            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
217        self
218    }
219
220    /// Validate the exchange body against a schema file.
221    ///
222    /// Shorthand for `to("validator:path")`. Supports XSD, JSON Schema, and YAML.
223    ///
224    /// # Example
225    /// ```ignore
226    /// route.validate("schemas/order.xsd").to("direct:out")
227    /// ```
228    fn validate(mut self, schema_path: impl Into<String>) -> Self {
229        let path = schema_path.into();
230        let uri = if path.starts_with("validator:") {
231            path
232        } else {
233            format!("validator:{path}")
234        };
235        self.steps_mut().push(BuilderStep::To(uri));
236        self
237    }
238
239    /// Execute a script that can modify the exchange (headers, properties, body).
240    ///
241    /// The script has access to `headers`, `properties`, and `body` variables
242    /// and can modify them with assignment syntax: `headers["k"] = v`.
243    ///
244    /// # Example
245    /// ```ignore
246    /// // ignore: requires full CamelContext setup with registered language
247    /// route.script("rhai", r#"headers["tenant"] = "acme"; body = body + "_processed""#)
248    /// ```
249    fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
250        self.steps_mut().push(BuilderStep::Script {
251            language: language.into(),
252            script: script.into(),
253        });
254        self
255    }
256}
257
258/// A fluent builder for constructing routes.
259///
260/// # Example
261///
262/// ```ignore
263/// let definition = RouteBuilder::from("timer:tick?period=1000")
264///     .set_header("source", Value::String("timer".into()))
265///     .filter(|ex| ex.input.body.as_text().is_some())
266///     .to("log:info?showHeaders=true")
267///     .build()?;
268/// ```
269pub struct RouteBuilder {
270    from_uri: String,
271    steps: Vec<BuilderStep>,
272    error_handler: Option<ErrorHandlerConfig>,
273    error_handler_mode: ErrorHandlerMode,
274    circuit_breaker_config: Option<CircuitBreakerConfig>,
275    concurrency: Option<ConcurrencyModel>,
276    route_id: Option<String>,
277    auto_startup: Option<bool>,
278    startup_order: Option<i32>,
279}
280
281#[derive(Default)]
282enum ErrorHandlerMode {
283    #[default]
284    None,
285    ExplicitConfig,
286    Shorthand {
287        dlc_uri: Option<String>,
288        specs: Vec<OnExceptionSpec>,
289    },
290    Mixed,
291}
292
293#[derive(Clone)]
294struct OnExceptionSpec {
295    matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
296    retry: Option<RedeliveryPolicy>,
297    handled_by: Option<String>,
298}
299
300impl RouteBuilder {
301    /// Start building a route from the given source endpoint URI.
302    pub fn from(endpoint: &str) -> Self {
303        Self {
304            from_uri: endpoint.to_string(),
305            steps: Vec::new(),
306            error_handler: None,
307            error_handler_mode: ErrorHandlerMode::None,
308            circuit_breaker_config: None,
309            concurrency: None,
310            route_id: None,
311            auto_startup: None,
312            startup_order: None,
313        }
314    }
315
316    /// Open a filter scope. Only exchanges matching `predicate` will be processed
317    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
318    /// and continue to steps after `.end_filter()`.
319    pub fn filter<F>(self, predicate: F) -> FilterBuilder
320    where
321        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
322    {
323        FilterBuilder {
324            parent: self,
325            predicate: std::sync::Arc::new(predicate),
326            steps: vec![],
327        }
328    }
329
330    /// Open a choice scope for content-based routing.
331    ///
332    /// Within the choice, you can define multiple `.when()` clauses and an
333    /// optional `.otherwise()` clause. The first matching `when` predicate
334    /// determines which sub-pipeline executes.
335    pub fn choice(self) -> ChoiceBuilder {
336        ChoiceBuilder {
337            parent: self,
338            whens: vec![],
339            _otherwise: None,
340        }
341    }
342
343    /// Add a WireTap step that sends a clone of the exchange to the given
344    /// endpoint URI (fire-and-forget). The original exchange continues
345    /// downstream unchanged.
346    pub fn wire_tap(mut self, endpoint: &str) -> Self {
347        self.steps.push(BuilderStep::WireTap {
348            uri: endpoint.to_string(),
349        });
350        self
351    }
352
353    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
354    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
355        self.error_handler_mode = match self.error_handler_mode {
356            ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
357                ErrorHandlerMode::ExplicitConfig
358            }
359            ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
360        };
361        self.error_handler = Some(config);
362        self
363    }
364
365    /// Set a dead letter channel URI for shorthand error handler mode.
366    pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
367        let uri = uri.into();
368        self.error_handler_mode = match self.error_handler_mode {
369            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
370                dlc_uri: Some(uri),
371                specs: Vec::new(),
372            },
373            ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
374                dlc_uri: Some(uri),
375                specs,
376            },
377            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
378        };
379        self
380    }
381
382    /// Add a shorthand exception policy scope. Call `.end_on_exception()` to return to route builder.
383    pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
384    where
385        F: Fn(&CamelError) -> bool + Send + Sync + 'static,
386    {
387        self.error_handler_mode = match self.error_handler_mode {
388            ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
389                dlc_uri: None,
390                specs: Vec::new(),
391            },
392            ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
393            shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
394        };
395
396        OnExceptionBuilder {
397            parent: self,
398            policy: OnExceptionSpec {
399                matches: std::sync::Arc::new(matches),
400                retry: None,
401                handled_by: None,
402            },
403        }
404    }
405
406    /// Set a circuit breaker for this route.
407    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
408        self.circuit_breaker_config = Some(config);
409        self
410    }
411
412    /// Override the consumer's default concurrency model.
413    ///
414    /// When set, the pipeline spawns a task per exchange, processing them
415    /// concurrently. `max` limits the number of simultaneously active
416    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
417    ///
418    /// # Example
419    /// ```ignore
420    /// RouteBuilder::from("http://0.0.0.0:8080/api")
421    ///     .concurrent(16)  // max 16 in-flight pipeline executions
422    ///     .process(handle_request)
423    ///     .build()
424    /// ```
425    pub fn concurrent(mut self, max: usize) -> Self {
426        let max = if max == 0 { None } else { Some(max) };
427        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
428        self
429    }
430
431    /// Force sequential processing, overriding a concurrent-capable consumer.
432    ///
433    /// Useful for HTTP routes that mutate shared state and need ordering
434    /// guarantees.
435    pub fn sequential(mut self) -> Self {
436        self.concurrency = Some(ConcurrencyModel::Sequential);
437        self
438    }
439
440    /// Set the route ID for this route.
441    ///
442    /// If not set, the route will be assigned an auto-generated ID.
443    pub fn route_id(mut self, id: impl Into<String>) -> Self {
444        self.route_id = Some(id.into());
445        self
446    }
447
448    /// Set whether this route should automatically start when the context starts.
449    ///
450    /// Default is `true`.
451    pub fn auto_startup(mut self, auto: bool) -> Self {
452        self.auto_startup = Some(auto);
453        self
454    }
455
456    /// Set the startup order for this route.
457    ///
458    /// Routes with lower values start first. Default is 1000.
459    pub fn startup_order(mut self, order: i32) -> Self {
460        self.startup_order = Some(order);
461        self
462    }
463
464    /// Begin a Splitter sub-pipeline. Steps added after this call (until
465    /// `.end_split()`) will be executed per-fragment.
466    ///
467    /// Returns a `SplitBuilder` — you cannot call `.build()` until
468    /// `.end_split()` closes the split scope (enforced by the type system).
469    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
470        SplitBuilder {
471            parent: self,
472            config,
473            steps: Vec::new(),
474        }
475    }
476
477    /// Begin a Multicast sub-pipeline. Steps added after this call (until
478    /// `.end_multicast()`) will each receive a copy of the exchange.
479    ///
480    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
481    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
482    pub fn multicast(self) -> MulticastBuilder {
483        MulticastBuilder {
484            parent: self,
485            steps: Vec::new(),
486            config: MulticastConfig::new(),
487        }
488    }
489
490    /// Begin a Throttle sub-pipeline. Rate limits message processing to at most
491    /// `max_requests` per `period`. Steps inside the throttle scope are only
492    /// executed when the rate limit allows.
493    ///
494    /// Returns a `ThrottleBuilder` — you cannot call `.build()` until
495    /// `.end_throttle()` closes the throttle scope (enforced by the type system).
496    pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
497        ThrottleBuilder {
498            parent: self,
499            config: ThrottlerConfig::new(max_requests, period),
500            steps: Vec::new(),
501        }
502    }
503
504    /// Begin a LoadBalance sub-pipeline. Distributes exchanges across multiple
505    /// endpoints using a configurable strategy (round-robin, random, weighted, failover).
506    ///
507    /// Returns a `LoadBalancerBuilder` — you cannot call `.build()` until
508    /// `.end_load_balance()` closes the load balance scope (enforced by the type system).
509    pub fn load_balance(self) -> LoadBalancerBuilder {
510        LoadBalancerBuilder {
511            parent: self,
512            config: LoadBalancerConfig::round_robin(),
513            steps: Vec::new(),
514        }
515    }
516
517    /// Add a dynamic router step that routes exchanges dynamically based on
518    /// expression evaluation at runtime.
519    ///
520    /// The expression receives the exchange and returns `Some(uri)` to route to
521    /// the next endpoint, or `None` to stop routing.
522    ///
523    /// # Example
524    /// ```ignore
525    /// RouteBuilder::from("timer:tick")
526    ///     .route_id("test-route")
527    ///     .dynamic_router(|ex| {
528    ///         ex.input.header("dest").and_then(|v| v.as_str().map(|s| s.to_string()))
529    ///     })
530    ///     .build()
531    /// ```
532    pub fn dynamic_router(self, expression: RouterExpression) -> Self {
533        self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
534    }
535
536    /// Add a dynamic router step with full configuration.
537    ///
538    /// Allows customization of URI delimiter, cache size, timeout, and other options.
539    pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
540        self.steps.push(BuilderStep::DynamicRouter { config });
541        self
542    }
543
544    pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
545        self.routing_slip_with_config(RoutingSlipConfig::new(expression))
546    }
547
548    pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
549        self.steps.push(BuilderStep::RoutingSlip { config });
550        self
551    }
552
553    pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
554        self.recipient_list_with_config(RecipientListConfig::new(expression))
555    }
556
557    pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
558        self.steps.push(BuilderStep::RecipientList { config });
559        self
560    }
561
562    /// Consume the builder and produce a [`RouteDefinition`].
563    pub fn build(self) -> Result<RouteDefinition, CamelError> {
564        if self.from_uri.is_empty() {
565            return Err(CamelError::RouteError(
566                "route must have a 'from' URI".to_string(),
567            ));
568        }
569        let route_id = self.route_id.ok_or_else(|| {
570            CamelError::RouteError(
571                "route must have a 'route_id' — call .route_id(\"name\") on the builder"
572                    .to_string(),
573            )
574        })?;
575        let resolved_error_handler = match self.error_handler_mode {
576            ErrorHandlerMode::None => self.error_handler,
577            ErrorHandlerMode::ExplicitConfig => self.error_handler,
578            ErrorHandlerMode::Mixed => {
579                return Err(CamelError::RouteError(
580                    "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
581                ));
582            }
583            ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
584                let mut config = if let Some(uri) = dlc_uri {
585                    ErrorHandlerConfig::dead_letter_channel(uri)
586                } else {
587                    ErrorHandlerConfig::log_only()
588                };
589
590                for spec in specs {
591                    let matcher = spec.matches.clone();
592                    let mut builder = config.on_exception(move |e| matcher(e));
593
594                    if let Some(retry) = spec.retry {
595                        builder = builder.retry(retry.max_attempts).with_backoff(
596                            retry.initial_delay,
597                            retry.multiplier,
598                            retry.max_delay,
599                        );
600                        if retry.jitter_factor > 0.0 {
601                            builder = builder.with_jitter(retry.jitter_factor);
602                        }
603                    }
604
605                    if let Some(uri) = spec.handled_by {
606                        builder = builder.handled_by(uri);
607                    }
608
609                    config = builder.build();
610                }
611
612                Some(config)
613            }
614        };
615
616        let definition = RouteDefinition::new(self.from_uri, self.steps);
617        let definition = if let Some(eh) = resolved_error_handler {
618            definition.with_error_handler(eh)
619        } else {
620            definition
621        };
622        let definition = if let Some(cb) = self.circuit_breaker_config {
623            definition.with_circuit_breaker(cb)
624        } else {
625            definition
626        };
627        let definition = if let Some(concurrency) = self.concurrency {
628            definition.with_concurrency(concurrency)
629        } else {
630            definition
631        };
632        let definition = definition.with_route_id(route_id);
633        let definition = if let Some(auto) = self.auto_startup {
634            definition.with_auto_startup(auto)
635        } else {
636            definition
637        };
638        let definition = if let Some(order) = self.startup_order {
639            definition.with_startup_order(order)
640        } else {
641            definition
642        };
643        Ok(definition)
644    }
645
646    /// Compile this builder route into canonical v1 spec.
647    pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
648        if self.from_uri.is_empty() {
649            return Err(CamelError::RouteError(
650                "route must have a 'from' URI".to_string(),
651            ));
652        }
653        let route_id = self.route_id.ok_or_else(|| {
654            CamelError::RouteError(
655                "route must have a 'route_id' — call .route_id(\"name\") on the builder"
656                    .to_string(),
657            )
658        })?;
659
660        let steps = canonicalize_steps(self.steps)?;
661        let circuit_breaker = self
662            .circuit_breaker_config
663            .map(canonicalize_circuit_breaker);
664
665        let spec = CanonicalRouteSpec {
666            route_id,
667            from: self.from_uri,
668            steps,
669            circuit_breaker,
670            version: camel_api::CANONICAL_CONTRACT_VERSION,
671        };
672        spec.validate_contract()?;
673        Ok(spec)
674    }
675}
676
677pub struct OnExceptionBuilder {
678    parent: RouteBuilder,
679    policy: OnExceptionSpec,
680}
681
682impl OnExceptionBuilder {
683    pub fn retry(mut self, max_attempts: u32) -> Self {
684        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
685        self
686    }
687
688    pub fn with_backoff(
689        mut self,
690        initial: std::time::Duration,
691        multiplier: f64,
692        max: std::time::Duration,
693    ) -> Self {
694        if let Some(ref mut retry) = self.policy.retry {
695            retry.initial_delay = initial;
696            retry.multiplier = multiplier;
697            retry.max_delay = max;
698        }
699        self
700    }
701
702    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
703        if let Some(ref mut retry) = self.policy.retry {
704            retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
705        }
706        self
707    }
708
709    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
710        self.policy.handled_by = Some(uri.into());
711        self
712    }
713
714    pub fn end_on_exception(mut self) -> RouteBuilder {
715        if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
716            specs.push(self.policy);
717        }
718        self.parent
719    }
720}
721
722fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
723    let mut canonical = Vec::with_capacity(steps.len());
724    for step in steps {
725        canonical.push(canonicalize_step(step)?);
726    }
727    Ok(canonical)
728}
729
730fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
731    match step {
732        BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
733        BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
734        BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
735        BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
736        BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
737            delay_ms: config.delay_ms,
738            dynamic_header: config.dynamic_header,
739        }),
740        BuilderStep::DeclarativeScript { expression } => {
741            Ok(CanonicalStepSpec::Script { expression })
742        }
743        BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
744            predicate,
745            steps: canonicalize_steps(steps)?,
746        }),
747        BuilderStep::DeclarativeChoice { whens, otherwise } => {
748            let mut canonical_whens = Vec::with_capacity(whens.len());
749            for DeclarativeWhenStep { predicate, steps } in whens {
750                canonical_whens.push(CanonicalWhenSpec {
751                    predicate,
752                    steps: canonicalize_steps(steps)?,
753                });
754            }
755            let otherwise = match otherwise {
756                Some(steps) => Some(canonicalize_steps(steps)?),
757                None => None,
758            };
759            Ok(CanonicalStepSpec::Choice {
760                whens: canonical_whens,
761                otherwise,
762            })
763        }
764        BuilderStep::DeclarativeSplit {
765            expression,
766            aggregation,
767            parallel,
768            parallel_limit,
769            stop_on_exception,
770            steps,
771        } => Ok(CanonicalStepSpec::Split {
772            expression: CanonicalSplitExpressionSpec::Language(expression),
773            aggregation: canonicalize_split_aggregation(aggregation)?,
774            parallel,
775            parallel_limit,
776            stop_on_exception,
777            steps: canonicalize_steps(steps)?,
778        }),
779        BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate {
780            config: canonicalize_aggregate(config)?,
781        }),
782        other => {
783            let step_name = canonical_step_name(&other);
784            let detail = camel_api::canonical_contract_rejection_reason(step_name)
785                .unwrap_or("not included in canonical v1");
786            Err(CamelError::RouteError(format!(
787                "canonical v1 does not support step `{step_name}`: {detail}"
788            )))
789        }
790    }
791}
792
793fn canonicalize_split_aggregation(
794    strategy: camel_api::splitter::AggregationStrategy,
795) -> Result<CanonicalSplitAggregationSpec, CamelError> {
796    match strategy {
797        camel_api::splitter::AggregationStrategy::LastWins => {
798            Ok(CanonicalSplitAggregationSpec::LastWins)
799        }
800        camel_api::splitter::AggregationStrategy::CollectAll => {
801            Ok(CanonicalSplitAggregationSpec::CollectAll)
802        }
803        camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
804            "canonical v1 does not support custom split aggregation".to_string(),
805        )),
806        camel_api::splitter::AggregationStrategy::Original => {
807            Ok(CanonicalSplitAggregationSpec::Original)
808        }
809    }
810}
811
812fn extract_completion_fields(
813    mode: &CompletionMode,
814) -> Result<(Option<usize>, Option<u64>), CamelError> {
815    match mode {
816        CompletionMode::Single(cond) => match cond {
817            CompletionCondition::Size(n) => Ok((Some(*n), None)),
818            CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
819            CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
820                "canonical v1 does not support aggregate predicate completion".to_string(),
821            )),
822        },
823        CompletionMode::Any(conds) => {
824            let mut size = None;
825            let mut timeout_ms = None;
826            for cond in conds {
827                match cond {
828                    CompletionCondition::Size(n) => size = Some(*n),
829                    CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
830                    CompletionCondition::Predicate(_) => {
831                        return Err(CamelError::RouteError(
832                            "canonical v1 does not support aggregate predicate completion"
833                                .to_string(),
834                        ));
835                    }
836                }
837            }
838            Ok((size, timeout_ms))
839        }
840    }
841}
842
843fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
844    let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
845
846    let header = match &config.correlation {
847        CorrelationStrategy::HeaderName(h) => h.clone(),
848        CorrelationStrategy::Expression { expr, .. } => expr.clone(),
849        CorrelationStrategy::Fn(_) => {
850            return Err(CamelError::RouteError(
851                "canonical v1 does not support Fn correlation strategy".to_string(),
852            ));
853        }
854    };
855
856    let correlation_key = match &config.correlation {
857        CorrelationStrategy::HeaderName(_) => None,
858        CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
859        CorrelationStrategy::Fn(_) => unreachable!(),
860    };
861
862    let strategy = match config.strategy {
863        AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
864        AggregationStrategy::Custom(_) => {
865            return Err(CamelError::RouteError(
866                "canonical v1 does not support custom aggregate strategy".to_string(),
867            ));
868        }
869    };
870    let bucket_ttl_ms = config
871        .bucket_ttl
872        .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
873
874    Ok(CanonicalAggregateSpec {
875        header,
876        completion_size,
877        completion_timeout_ms,
878        correlation_key,
879        force_completion_on_stop: if config.force_completion_on_stop {
880            Some(true)
881        } else {
882            None
883        },
884        discard_on_timeout: if config.discard_on_timeout {
885            Some(true)
886        } else {
887            None
888        },
889        strategy,
890        max_buckets: config.max_buckets,
891        bucket_ttl_ms,
892    })
893}
894
895fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
896    CanonicalCircuitBreakerSpec {
897        failure_threshold: config.failure_threshold,
898        open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
899    }
900}
901
902fn canonical_step_name(step: &BuilderStep) -> &'static str {
903    match step {
904        BuilderStep::Processor(_) => "processor",
905        BuilderStep::To(_) => "to",
906        BuilderStep::Stop => "stop",
907        BuilderStep::Log { .. } => "log",
908        BuilderStep::DeclarativeSetHeader { .. } => "set_header",
909        BuilderStep::DeclarativeSetBody { .. } => "set_body",
910        BuilderStep::DeclarativeFilter { .. } => "filter",
911        BuilderStep::DeclarativeChoice { .. } => "choice",
912        BuilderStep::DeclarativeScript { .. } => "script",
913        BuilderStep::DeclarativeSplit { .. } => "split",
914        BuilderStep::Split { .. } => "split",
915        BuilderStep::Aggregate { .. } => "aggregate",
916        BuilderStep::Filter { .. } => "filter",
917        BuilderStep::Choice { .. } => "choice",
918        BuilderStep::WireTap { .. } => "wire_tap",
919        BuilderStep::Delay { .. } => "delay",
920        BuilderStep::Multicast { .. } => "multicast",
921        BuilderStep::DeclarativeLog { .. } => "log",
922        BuilderStep::Bean { .. } => "bean",
923        BuilderStep::Script { .. } => "script",
924        BuilderStep::Throttle { .. } => "throttle",
925        BuilderStep::LoadBalance { .. } => "load_balancer",
926        BuilderStep::DynamicRouter { .. } => "dynamic_router",
927        BuilderStep::RoutingSlip { .. } => "routing_slip",
928        BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
929        BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
930        BuilderStep::RecipientList { .. } => "recipient_list",
931        BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
932    }
933}
934
935impl StepAccumulator for RouteBuilder {
936    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
937        &mut self.steps
938    }
939}
940
941/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
942///
943/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
944/// but NOT `.build()` and NOT `.split()` (no nested splits).
945///
946/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
947/// and returns the parent `RouteBuilder`.
948pub struct SplitBuilder {
949    parent: RouteBuilder,
950    config: SplitterConfig,
951    steps: Vec<BuilderStep>,
952}
953
954impl SplitBuilder {
955    /// Open a filter scope within the split sub-pipeline.
956    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
957    where
958        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
959    {
960        FilterInSplitBuilder {
961            parent: self,
962            predicate: std::sync::Arc::new(predicate),
963            steps: vec![],
964        }
965    }
966
967    /// Close the split scope. Packages the accumulated sub-steps into a
968    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
969    pub fn end_split(mut self) -> RouteBuilder {
970        let split_step = BuilderStep::Split {
971            config: self.config,
972            steps: self.steps,
973        };
974        self.parent.steps.push(split_step);
975        self.parent
976    }
977}
978
979impl StepAccumulator for SplitBuilder {
980    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
981        &mut self.steps
982    }
983}
984
985/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
986pub struct FilterBuilder {
987    parent: RouteBuilder,
988    predicate: FilterPredicate,
989    steps: Vec<BuilderStep>,
990}
991
992impl FilterBuilder {
993    /// Close the filter scope. Packages the accumulated sub-steps into a
994    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
995    pub fn end_filter(mut self) -> RouteBuilder {
996        let step = BuilderStep::Filter {
997            predicate: self.predicate,
998            steps: self.steps,
999        };
1000        self.parent.steps.push(step);
1001        self.parent
1002    }
1003}
1004
1005impl StepAccumulator for FilterBuilder {
1006    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1007        &mut self.steps
1008    }
1009}
1010
1011/// Builder for a filter scope nested inside a `.split()` block.
1012pub struct FilterInSplitBuilder {
1013    parent: SplitBuilder,
1014    predicate: FilterPredicate,
1015    steps: Vec<BuilderStep>,
1016}
1017
1018impl FilterInSplitBuilder {
1019    /// Close the filter scope and return the parent `SplitBuilder`.
1020    pub fn end_filter(mut self) -> SplitBuilder {
1021        let step = BuilderStep::Filter {
1022            predicate: self.predicate,
1023            steps: self.steps,
1024        };
1025        self.parent.steps.push(step);
1026        self.parent
1027    }
1028}
1029
1030impl StepAccumulator for FilterInSplitBuilder {
1031    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1032        &mut self.steps
1033    }
1034}
1035
1036// ── Choice/When/Otherwise builders ─────────────────────────────────────────
1037
1038/// Builder for a `.choice()` ... `.end_choice()` block.
1039///
1040/// Accumulates `when` clauses and an optional `otherwise` clause.
1041/// Cannot call `.build()` until `.end_choice()` is called.
1042pub struct ChoiceBuilder {
1043    parent: RouteBuilder,
1044    whens: Vec<WhenStep>,
1045    _otherwise: Option<Vec<BuilderStep>>,
1046}
1047
1048impl ChoiceBuilder {
1049    /// Open a `when` clause. Only exchanges matching `predicate` will be
1050    /// processed by the steps inside the `.when()` ... `.end_when()` scope.
1051    pub fn when<F>(self, predicate: F) -> WhenBuilder
1052    where
1053        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1054    {
1055        WhenBuilder {
1056            parent: self,
1057            predicate: std::sync::Arc::new(predicate),
1058            steps: vec![],
1059        }
1060    }
1061
1062    /// Open an `otherwise` clause. Executed when no `when` predicate matched.
1063    ///
1064    /// Only one `otherwise` is allowed per `choice`. Call this after all `.when()` clauses.
1065    pub fn otherwise(self) -> OtherwiseBuilder {
1066        OtherwiseBuilder {
1067            parent: self,
1068            steps: vec![],
1069        }
1070    }
1071
1072    /// Close the choice scope. Packages all accumulated `when` clauses and
1073    /// optional `otherwise` into a `BuilderStep::Choice` and returns the
1074    /// parent `RouteBuilder`.
1075    pub fn end_choice(mut self) -> RouteBuilder {
1076        let step = BuilderStep::Choice {
1077            whens: self.whens,
1078            otherwise: self._otherwise,
1079        };
1080        self.parent.steps.push(step);
1081        self.parent
1082    }
1083}
1084
1085/// Builder for the sub-pipeline within a `.when()` ... `.end_when()` block.
1086pub struct WhenBuilder {
1087    parent: ChoiceBuilder,
1088    predicate: camel_api::FilterPredicate,
1089    steps: Vec<BuilderStep>,
1090}
1091
1092impl WhenBuilder {
1093    /// Close the when scope. Packages the accumulated sub-steps into a
1094    /// `WhenStep` and returns the parent `ChoiceBuilder`.
1095    pub fn end_when(mut self) -> ChoiceBuilder {
1096        self.parent.whens.push(WhenStep {
1097            predicate: self.predicate,
1098            steps: self.steps,
1099        });
1100        self.parent
1101    }
1102}
1103
1104impl StepAccumulator for WhenBuilder {
1105    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1106        &mut self.steps
1107    }
1108}
1109
1110/// Builder for the sub-pipeline within an `.otherwise()` ... `.end_otherwise()` block.
1111pub struct OtherwiseBuilder {
1112    parent: ChoiceBuilder,
1113    steps: Vec<BuilderStep>,
1114}
1115
1116impl OtherwiseBuilder {
1117    /// Close the otherwise scope and return the parent `ChoiceBuilder`.
1118    pub fn end_otherwise(self) -> ChoiceBuilder {
1119        let OtherwiseBuilder { mut parent, steps } = self;
1120        parent._otherwise = Some(steps);
1121        parent
1122    }
1123}
1124
1125impl StepAccumulator for OtherwiseBuilder {
1126    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1127        &mut self.steps
1128    }
1129}
1130
1131/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
1132///
1133/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1134/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
1135///
1136/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
1137/// and returns the parent `RouteBuilder`.
1138pub struct MulticastBuilder {
1139    parent: RouteBuilder,
1140    steps: Vec<BuilderStep>,
1141    config: MulticastConfig,
1142}
1143
1144impl MulticastBuilder {
1145    pub fn parallel(mut self, parallel: bool) -> Self {
1146        self.config = self.config.parallel(parallel);
1147        self
1148    }
1149
1150    pub fn parallel_limit(mut self, limit: usize) -> Self {
1151        self.config = self.config.parallel_limit(limit);
1152        self
1153    }
1154
1155    pub fn stop_on_exception(mut self, stop: bool) -> Self {
1156        self.config = self.config.stop_on_exception(stop);
1157        self
1158    }
1159
1160    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1161        self.config = self.config.timeout(duration);
1162        self
1163    }
1164
1165    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1166        self.config = self.config.aggregation(strategy);
1167        self
1168    }
1169
1170    pub fn end_multicast(mut self) -> RouteBuilder {
1171        let step = BuilderStep::Multicast {
1172            steps: self.steps,
1173            config: self.config,
1174        };
1175        self.parent.steps.push(step);
1176        self.parent
1177    }
1178}
1179
1180impl StepAccumulator for MulticastBuilder {
1181    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1182        &mut self.steps
1183    }
1184}
1185
1186/// Builder for the sub-pipeline within a `.throttle()` ... `.end_throttle()` block.
1187///
1188/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1189/// but NOT `.build()` and NOT `.throttle()` (no nested throttles).
1190///
1191/// Calling `.end_throttle()` packages the sub-steps into a `BuilderStep::Throttle`
1192/// and returns the parent `RouteBuilder`.
1193pub struct ThrottleBuilder {
1194    parent: RouteBuilder,
1195    config: ThrottlerConfig,
1196    steps: Vec<BuilderStep>,
1197}
1198
1199impl ThrottleBuilder {
1200    /// Set the throttle strategy. Default is `Delay`.
1201    ///
1202    /// - `Delay`: Queue messages until capacity available
1203    /// - `Reject`: Return error immediately when throttled
1204    /// - `Drop`: Silently discard excess messages
1205    pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1206        self.config = self.config.strategy(strategy);
1207        self
1208    }
1209
1210    /// Close the throttle scope. Packages the accumulated sub-steps into a
1211    /// `BuilderStep::Throttle` and returns the parent `RouteBuilder`.
1212    pub fn end_throttle(mut self) -> RouteBuilder {
1213        let step = BuilderStep::Throttle {
1214            config: self.config,
1215            steps: self.steps,
1216        };
1217        self.parent.steps.push(step);
1218        self.parent
1219    }
1220}
1221
1222impl StepAccumulator for ThrottleBuilder {
1223    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1224        &mut self.steps
1225    }
1226}
1227
1228/// Builder for the sub-pipeline within a `.load_balance()` ... `.end_load_balance()` block.
1229///
1230/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
1231/// but NOT `.build()` and NOT `.load_balance()` (no nested load balancers).
1232///
1233/// Calling `.end_load_balance()` packages the sub-steps into a `BuilderStep::LoadBalance`
1234/// and returns the parent `RouteBuilder`.
1235pub struct LoadBalancerBuilder {
1236    parent: RouteBuilder,
1237    config: LoadBalancerConfig,
1238    steps: Vec<BuilderStep>,
1239}
1240
1241impl LoadBalancerBuilder {
1242    /// Set the load balance strategy to round-robin (default).
1243    pub fn round_robin(mut self) -> Self {
1244        self.config = LoadBalancerConfig::round_robin();
1245        self
1246    }
1247
1248    /// Set the load balance strategy to random selection.
1249    pub fn random(mut self) -> Self {
1250        self.config = LoadBalancerConfig::random();
1251        self
1252    }
1253
1254    /// Set the load balance strategy to weighted selection.
1255    ///
1256    /// Each endpoint is assigned a weight that determines its probability
1257    /// of being selected.
1258    pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1259        self.config = LoadBalancerConfig::weighted(weights);
1260        self
1261    }
1262
1263    /// Set the load balance strategy to failover.
1264    ///
1265    /// Exchanges are sent to the first endpoint; on failure, the next endpoint
1266    /// is tried.
1267    pub fn failover(mut self) -> Self {
1268        self.config = LoadBalancerConfig::failover();
1269        self
1270    }
1271
1272    /// Enable or disable parallel execution of endpoints.
1273    ///
1274    /// When enabled, all endpoints receive the exchange simultaneously.
1275    /// When disabled (default), only one endpoint is selected per exchange.
1276    pub fn parallel(mut self, parallel: bool) -> Self {
1277        self.config = self.config.parallel(parallel);
1278        self
1279    }
1280
1281    /// Close the load balance scope. Packages the accumulated sub-steps into a
1282    /// `BuilderStep::LoadBalance` and returns the parent `RouteBuilder`.
1283    pub fn end_load_balance(mut self) -> RouteBuilder {
1284        let step = BuilderStep::LoadBalance {
1285            config: self.config,
1286            steps: self.steps,
1287        };
1288        self.parent.steps.push(step);
1289        self.parent
1290    }
1291}
1292
1293impl StepAccumulator for LoadBalancerBuilder {
1294    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1295        &mut self.steps
1296    }
1297}
1298
1299// ---------------------------------------------------------------------------
1300// Tests
1301// ---------------------------------------------------------------------------
1302
1303#[cfg(test)]
1304mod tests {
1305    use super::*;
1306    use camel_api::error_handler::ErrorHandlerConfig;
1307    use camel_api::load_balancer::LoadBalanceStrategy;
1308    use camel_api::{Exchange, Message};
1309    use camel_core::route::BuilderStep;
1310    use std::sync::Arc;
1311    use std::time::Duration;
1312    use tower::{Service, ServiceExt};
1313
1314    #[test]
1315    fn test_builder_from_creates_definition() {
1316        let definition = RouteBuilder::from("timer:tick")
1317            .route_id("test-route")
1318            .build()
1319            .unwrap();
1320        assert_eq!(definition.from_uri(), "timer:tick");
1321    }
1322
1323    #[test]
1324    fn test_builder_empty_from_uri_errors() {
1325        let result = RouteBuilder::from("").route_id("test-route").build();
1326        assert!(result.is_err());
1327    }
1328
1329    #[test]
1330    fn test_builder_to_adds_step() {
1331        let definition = RouteBuilder::from("timer:tick")
1332            .route_id("test-route")
1333            .to("log:info")
1334            .build()
1335            .unwrap();
1336
1337        assert_eq!(definition.from_uri(), "timer:tick");
1338        // We can verify steps were added by checking the structure
1339        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1340    }
1341
1342    #[test]
1343    fn test_builder_filter_adds_filter_step() {
1344        let definition = RouteBuilder::from("timer:tick")
1345            .route_id("test-route")
1346            .filter(|_ex| true)
1347            .to("mock:result")
1348            .end_filter()
1349            .build()
1350            .unwrap();
1351
1352        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1353    }
1354
1355    #[test]
1356    fn test_builder_set_header_adds_processor_step() {
1357        let definition = RouteBuilder::from("timer:tick")
1358            .route_id("test-route")
1359            .set_header("key", Value::String("value".into()))
1360            .build()
1361            .unwrap();
1362
1363        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1364    }
1365
1366    #[test]
1367    fn test_builder_map_body_adds_processor_step() {
1368        let definition = RouteBuilder::from("timer:tick")
1369            .route_id("test-route")
1370            .map_body(|body| body)
1371            .build()
1372            .unwrap();
1373
1374        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1375    }
1376
1377    #[test]
1378    fn test_builder_process_adds_processor_step() {
1379        let definition = RouteBuilder::from("timer:tick")
1380            .route_id("test-route")
1381            .process(|ex| async move { Ok(ex) })
1382            .build()
1383            .unwrap();
1384
1385        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1386    }
1387
1388    #[test]
1389    fn test_builder_chain_multiple_steps() {
1390        let definition = RouteBuilder::from("timer:tick")
1391            .route_id("test-route")
1392            .set_header("source", Value::String("timer".into()))
1393            .filter(|ex| ex.input.header("source").is_some())
1394            .to("log:info")
1395            .end_filter()
1396            .to("mock:result")
1397            .build()
1398            .unwrap();
1399
1400        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
1401        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
1402        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
1403        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1404    }
1405
1406    // -----------------------------------------------------------------------
1407    // Processor behavior tests — exercise the real Tower services directly
1408    // -----------------------------------------------------------------------
1409
1410    #[tokio::test]
1411    async fn test_set_header_processor_works() {
1412        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1413        let exchange = Exchange::new(Message::new("test"));
1414        let result = svc.call(exchange).await.unwrap();
1415        assert_eq!(
1416            result.input.header("greeting"),
1417            Some(&Value::String("hello".into()))
1418        );
1419    }
1420
1421    #[tokio::test]
1422    async fn test_filter_processor_passes() {
1423        use camel_api::BoxProcessorExt;
1424        use camel_processor::FilterService;
1425
1426        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1427        let mut svc =
1428            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1429        let exchange = Exchange::new(Message::new("pass"));
1430        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1431        assert_eq!(result.input.body.as_text(), Some("pass"));
1432    }
1433
1434    #[tokio::test]
1435    async fn test_filter_processor_blocks() {
1436        use camel_api::BoxProcessorExt;
1437        use camel_processor::FilterService;
1438
1439        let sub = BoxProcessor::from_fn(|_ex| {
1440            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1441        });
1442        let mut svc =
1443            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1444        let exchange = Exchange::new(Message::new("reject"));
1445        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1446        assert_eq!(result.input.body.as_text(), Some("reject"));
1447    }
1448
1449    #[tokio::test]
1450    async fn test_map_body_processor_works() {
1451        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1452            if let Some(text) = body.as_text() {
1453                Body::Text(text.to_uppercase())
1454            } else {
1455                body
1456            }
1457        });
1458        let exchange = Exchange::new(Message::new("hello"));
1459        let result = mapper.oneshot(exchange).await.unwrap();
1460        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1461    }
1462
1463    #[tokio::test]
1464    async fn test_process_custom_processor_works() {
1465        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1466            ex.set_property("custom", Value::Bool(true));
1467            Ok(ex)
1468        });
1469        let exchange = Exchange::new(Message::default());
1470        let result = processor.oneshot(exchange).await.unwrap();
1471        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1472    }
1473
1474    // -----------------------------------------------------------------------
1475    // Sequential pipeline test
1476    // -----------------------------------------------------------------------
1477
1478    #[tokio::test]
1479    async fn test_compose_pipeline_runs_steps_in_order() {
1480        use camel_core::route::compose_pipeline;
1481
1482        let processors = vec![
1483            BoxProcessor::new(SetHeader::new(
1484                IdentityProcessor,
1485                "step",
1486                Value::String("one".into()),
1487            )),
1488            BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1489                if let Some(text) = body.as_text() {
1490                    Body::Text(format!("{}-processed", text))
1491                } else {
1492                    body
1493                }
1494            })),
1495        ];
1496
1497        let pipeline = compose_pipeline(processors);
1498        let exchange = Exchange::new(Message::new("hello"));
1499        let result = pipeline.oneshot(exchange).await.unwrap();
1500
1501        assert_eq!(
1502            result.input.header("step"),
1503            Some(&Value::String("one".into()))
1504        );
1505        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1506    }
1507
1508    #[tokio::test]
1509    async fn test_compose_pipeline_empty_is_identity() {
1510        use camel_core::route::compose_pipeline;
1511
1512        let pipeline = compose_pipeline(vec![]);
1513        let exchange = Exchange::new(Message::new("unchanged"));
1514        let result = pipeline.oneshot(exchange).await.unwrap();
1515        assert_eq!(result.input.body.as_text(), Some("unchanged"));
1516    }
1517
1518    // -----------------------------------------------------------------------
1519    // Circuit breaker builder tests
1520    // -----------------------------------------------------------------------
1521
1522    #[test]
1523    fn test_builder_circuit_breaker_sets_config() {
1524        use camel_api::circuit_breaker::CircuitBreakerConfig;
1525
1526        let config = CircuitBreakerConfig::new().failure_threshold(5);
1527        let definition = RouteBuilder::from("timer:tick")
1528            .route_id("test-route")
1529            .circuit_breaker(config)
1530            .build()
1531            .unwrap();
1532
1533        let cb = definition
1534            .circuit_breaker_config()
1535            .expect("circuit breaker should be set");
1536        assert_eq!(cb.failure_threshold, 5);
1537    }
1538
1539    #[test]
1540    fn test_builder_circuit_breaker_with_error_handler() {
1541        use camel_api::circuit_breaker::CircuitBreakerConfig;
1542        use camel_api::error_handler::ErrorHandlerConfig;
1543
1544        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1545        let eh_config = ErrorHandlerConfig::log_only();
1546
1547        let definition = RouteBuilder::from("timer:tick")
1548            .route_id("test-route")
1549            .to("log:info")
1550            .circuit_breaker(cb_config)
1551            .error_handler(eh_config)
1552            .build()
1553            .unwrap();
1554
1555        assert!(
1556            definition.circuit_breaker_config().is_some(),
1557            "circuit breaker config should be set"
1558        );
1559        // Route definition was built successfully with both configs.
1560    }
1561
1562    #[test]
1563    fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1564        let definition = RouteBuilder::from("direct:start")
1565            .route_id("test-route")
1566            .dead_letter_channel("log:dlc")
1567            .on_exception(|e| matches!(e, CamelError::Io(_)))
1568            .retry(3)
1569            .handled_by("log:io")
1570            .end_on_exception()
1571            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1572            .retry(1)
1573            .end_on_exception()
1574            .to("mock:out")
1575            .build()
1576            .expect("route should build");
1577
1578        let cfg = definition
1579            .error_handler_config()
1580            .expect("error handler should be set");
1581        assert_eq!(cfg.policies.len(), 2);
1582        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1583        assert_eq!(
1584            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1585            Some(3)
1586        );
1587        assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1588        assert_eq!(
1589            cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1590            Some(1)
1591        );
1592    }
1593
1594    #[test]
1595    fn test_builder_on_exception_mixed_mode_rejected() {
1596        let result = RouteBuilder::from("direct:start")
1597            .route_id("test-route")
1598            .error_handler(ErrorHandlerConfig::log_only())
1599            .on_exception(|_e| true)
1600            .end_on_exception()
1601            .to("mock:out")
1602            .build();
1603
1604        let err = result.err().expect("mixed mode should fail with an error");
1605
1606        assert!(
1607            format!("{err}").contains("mixed error handler modes"),
1608            "unexpected error: {err}"
1609        );
1610    }
1611
1612    #[test]
1613    fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1614        let definition = RouteBuilder::from("direct:start")
1615            .route_id("test-route")
1616            .on_exception(|_e| true)
1617            .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1618            .with_jitter(0.5)
1619            .end_on_exception()
1620            .to("mock:out")
1621            .build()
1622            .expect("route should build");
1623
1624        let cfg = definition
1625            .error_handler_config()
1626            .expect("error handler should be set");
1627        assert_eq!(cfg.policies.len(), 1);
1628        assert!(cfg.policies[0].retry.is_none());
1629    }
1630
1631    #[test]
1632    fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1633        let definition = RouteBuilder::from("direct:start")
1634            .route_id("test-route")
1635            .dead_letter_channel("log:dlc")
1636            .to("mock:out")
1637            .build()
1638            .expect("route should build");
1639
1640        let cfg = definition
1641            .error_handler_config()
1642            .expect("error handler should be set");
1643        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1644        assert!(cfg.policies.is_empty());
1645    }
1646
1647    #[test]
1648    fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1649        let definition = RouteBuilder::from("direct:start")
1650            .route_id("test-route")
1651            .dead_letter_channel("log:first")
1652            .on_exception(|e| matches!(e, CamelError::Io(_)))
1653            .retry(2)
1654            .end_on_exception()
1655            .dead_letter_channel("log:second")
1656            .to("mock:out")
1657            .build()
1658            .expect("route should build");
1659
1660        let cfg = definition
1661            .error_handler_config()
1662            .expect("error handler should be set");
1663        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1664        assert_eq!(cfg.policies.len(), 1);
1665        assert_eq!(
1666            cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1667            Some(2)
1668        );
1669    }
1670
1671    #[test]
1672    fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
1673        let definition = RouteBuilder::from("direct:start")
1674            .route_id("test-route")
1675            .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1676            .retry(1)
1677            .end_on_exception()
1678            .to("mock:out")
1679            .build()
1680            .expect("route should build");
1681
1682        let cfg = definition
1683            .error_handler_config()
1684            .expect("error handler should be set");
1685        assert!(cfg.dlc_uri.is_none());
1686        assert_eq!(cfg.policies.len(), 1);
1687    }
1688
1689    #[test]
1690    fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
1691        let first = ErrorHandlerConfig::dead_letter_channel("log:first");
1692        let second = ErrorHandlerConfig::dead_letter_channel("log:second");
1693
1694        let definition = RouteBuilder::from("direct:start")
1695            .route_id("test-route")
1696            .error_handler(first)
1697            .error_handler(second)
1698            .to("mock:out")
1699            .build()
1700            .expect("route should build");
1701
1702        let cfg = definition
1703            .error_handler_config()
1704            .expect("error handler should be set");
1705        assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1706    }
1707
1708    // --- Splitter builder tests ---
1709
1710    #[test]
1711    fn test_split_builder_typestate() {
1712        use camel_api::splitter::{SplitterConfig, split_body_lines};
1713
1714        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
1715        let definition = RouteBuilder::from("timer:test?period=1000")
1716            .route_id("test-route")
1717            .split(SplitterConfig::new(split_body_lines()))
1718            .to("mock:per-fragment")
1719            .end_split()
1720            .to("mock:final")
1721            .build()
1722            .unwrap();
1723
1724        // Should have 2 top-level steps: Split + To("mock:final")
1725        assert_eq!(definition.steps().len(), 2);
1726    }
1727
1728    #[test]
1729    fn test_split_builder_steps_collected() {
1730        use camel_api::splitter::{SplitterConfig, split_body_lines};
1731
1732        let definition = RouteBuilder::from("timer:test?period=1000")
1733            .route_id("test-route")
1734            .split(SplitterConfig::new(split_body_lines()))
1735            .set_header("fragment", Value::String("yes".into()))
1736            .to("mock:per-fragment")
1737            .end_split()
1738            .build()
1739            .unwrap();
1740
1741        // Should have 1 top-level step: Split (containing 2 sub-steps)
1742        assert_eq!(definition.steps().len(), 1);
1743        match &definition.steps()[0] {
1744            BuilderStep::Split { steps, .. } => {
1745                assert_eq!(steps.len(), 2); // SetHeader + To
1746            }
1747            other => panic!("Expected Split, got {:?}", other),
1748        }
1749    }
1750
1751    #[test]
1752    fn test_split_builder_config_propagated() {
1753        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1754
1755        let definition = RouteBuilder::from("timer:test?period=1000")
1756            .route_id("test-route")
1757            .split(
1758                SplitterConfig::new(split_body_lines())
1759                    .parallel(true)
1760                    .parallel_limit(4)
1761                    .aggregation(AggregationStrategy::CollectAll),
1762            )
1763            .to("mock:per-fragment")
1764            .end_split()
1765            .build()
1766            .unwrap();
1767
1768        match &definition.steps()[0] {
1769            BuilderStep::Split { config, .. } => {
1770                assert!(config.parallel);
1771                assert_eq!(config.parallel_limit, Some(4));
1772                assert!(matches!(
1773                    config.aggregation,
1774                    AggregationStrategy::CollectAll
1775                ));
1776            }
1777            other => panic!("Expected Split, got {:?}", other),
1778        }
1779    }
1780
1781    #[test]
1782    fn test_aggregate_builder_adds_step() {
1783        use camel_api::aggregator::AggregatorConfig;
1784        use camel_core::route::BuilderStep;
1785
1786        let definition = RouteBuilder::from("timer:tick")
1787            .route_id("test-route")
1788            .aggregate(
1789                AggregatorConfig::correlate_by("key")
1790                    .complete_when_size(2)
1791                    .build(),
1792            )
1793            .build()
1794            .unwrap();
1795
1796        assert_eq!(definition.steps().len(), 1);
1797        assert!(matches!(
1798            definition.steps()[0],
1799            BuilderStep::Aggregate { .. }
1800        ));
1801    }
1802
1803    #[test]
1804    fn test_aggregate_in_split_builder() {
1805        use camel_api::aggregator::AggregatorConfig;
1806        use camel_api::splitter::{SplitterConfig, split_body_lines};
1807        use camel_core::route::BuilderStep;
1808
1809        let definition = RouteBuilder::from("timer:tick")
1810            .route_id("test-route")
1811            .split(SplitterConfig::new(split_body_lines()))
1812            .aggregate(
1813                AggregatorConfig::correlate_by("key")
1814                    .complete_when_size(1)
1815                    .build(),
1816            )
1817            .end_split()
1818            .build()
1819            .unwrap();
1820
1821        assert_eq!(definition.steps().len(), 1);
1822        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
1823            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
1824        } else {
1825            panic!("expected Split step");
1826        }
1827    }
1828
1829    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
1830
1831    #[test]
1832    fn test_builder_set_body_static_adds_processor() {
1833        let definition = RouteBuilder::from("timer:tick")
1834            .route_id("test-route")
1835            .set_body("fixed")
1836            .build()
1837            .unwrap();
1838        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1839    }
1840
1841    #[test]
1842    fn test_builder_set_body_fn_adds_processor() {
1843        let definition = RouteBuilder::from("timer:tick")
1844            .route_id("test-route")
1845            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
1846            .build()
1847            .unwrap();
1848        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1849    }
1850
1851    #[test]
1852    fn transform_alias_produces_same_as_set_body() {
1853        let route_transform = RouteBuilder::from("timer:tick")
1854            .route_id("test-route")
1855            .transform("hello")
1856            .build()
1857            .unwrap();
1858
1859        let route_set_body = RouteBuilder::from("timer:tick")
1860            .route_id("test-route")
1861            .set_body("hello")
1862            .build()
1863            .unwrap();
1864
1865        assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
1866    }
1867
1868    #[test]
1869    fn test_builder_set_header_fn_adds_processor() {
1870        let definition = RouteBuilder::from("timer:tick")
1871            .route_id("test-route")
1872            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
1873            .build()
1874            .unwrap();
1875        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1876    }
1877
1878    #[tokio::test]
1879    async fn test_set_body_static_processor_works() {
1880        use camel_core::route::compose_pipeline;
1881        let def = RouteBuilder::from("t:t")
1882            .route_id("test-route")
1883            .set_body("replaced")
1884            .build()
1885            .unwrap();
1886        let pipeline = compose_pipeline(
1887            def.steps()
1888                .iter()
1889                .filter_map(|s| {
1890                    if let BuilderStep::Processor(p) = s {
1891                        Some(p.clone())
1892                    } else {
1893                        None
1894                    }
1895                })
1896                .collect(),
1897        );
1898        let exchange = Exchange::new(Message::new("original"));
1899        let result = pipeline.oneshot(exchange).await.unwrap();
1900        assert_eq!(result.input.body.as_text(), Some("replaced"));
1901    }
1902
1903    #[tokio::test]
1904    async fn test_set_body_fn_processor_works() {
1905        use camel_core::route::compose_pipeline;
1906        let def = RouteBuilder::from("t:t")
1907            .route_id("test-route")
1908            .set_body_fn(|ex: &Exchange| {
1909                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
1910            })
1911            .build()
1912            .unwrap();
1913        let pipeline = compose_pipeline(
1914            def.steps()
1915                .iter()
1916                .filter_map(|s| {
1917                    if let BuilderStep::Processor(p) = s {
1918                        Some(p.clone())
1919                    } else {
1920                        None
1921                    }
1922                })
1923                .collect(),
1924        );
1925        let exchange = Exchange::new(Message::new("hello"));
1926        let result = pipeline.oneshot(exchange).await.unwrap();
1927        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1928    }
1929
1930    #[tokio::test]
1931    async fn test_set_header_fn_processor_works() {
1932        use camel_core::route::compose_pipeline;
1933        let def = RouteBuilder::from("t:t")
1934            .route_id("test-route")
1935            .set_header_fn("echo", |ex: &Exchange| {
1936                ex.input
1937                    .body
1938                    .as_text()
1939                    .map(|t| Value::String(t.into()))
1940                    .unwrap_or(Value::Null)
1941            })
1942            .build()
1943            .unwrap();
1944        let pipeline = compose_pipeline(
1945            def.steps()
1946                .iter()
1947                .filter_map(|s| {
1948                    if let BuilderStep::Processor(p) = s {
1949                        Some(p.clone())
1950                    } else {
1951                        None
1952                    }
1953                })
1954                .collect(),
1955        );
1956        let exchange = Exchange::new(Message::new("ping"));
1957        let result = pipeline.oneshot(exchange).await.unwrap();
1958        assert_eq!(
1959            result.input.header("echo"),
1960            Some(&Value::String("ping".into()))
1961        );
1962    }
1963
1964    // ── FilterBuilder typestate tests ─────────────────────────────────────
1965
1966    #[test]
1967    fn test_filter_builder_typestate() {
1968        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1969            .route_id("test-route")
1970            .filter(|_ex| true)
1971            .to("mock:inner")
1972            .end_filter()
1973            .to("mock:outer")
1974            .build();
1975        assert!(result.is_ok());
1976    }
1977
1978    #[test]
1979    fn test_filter_builder_steps_collected() {
1980        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1981            .route_id("test-route")
1982            .filter(|_ex| true)
1983            .to("mock:inner")
1984            .end_filter()
1985            .build()
1986            .unwrap();
1987
1988        assert_eq!(definition.steps().len(), 1);
1989        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1990    }
1991
1992    #[test]
1993    fn test_wire_tap_builder_adds_step() {
1994        let definition = RouteBuilder::from("timer:tick")
1995            .route_id("test-route")
1996            .wire_tap("mock:tap")
1997            .to("mock:result")
1998            .build()
1999            .unwrap();
2000
2001        assert_eq!(definition.steps().len(), 2);
2002        assert!(
2003            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2004        );
2005        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2006    }
2007
2008    // ── MulticastBuilder typestate tests ─────────────────────────────────────
2009
2010    #[test]
2011    fn test_multicast_builder_typestate() {
2012        let definition = RouteBuilder::from("timer:tick")
2013            .route_id("test-route")
2014            .multicast()
2015            .to("direct:a")
2016            .to("direct:b")
2017            .end_multicast()
2018            .to("mock:result")
2019            .build()
2020            .unwrap();
2021
2022        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
2023    }
2024
2025    #[test]
2026    fn test_multicast_builder_steps_collected() {
2027        let definition = RouteBuilder::from("timer:tick")
2028            .route_id("test-route")
2029            .multicast()
2030            .to("direct:a")
2031            .to("direct:b")
2032            .end_multicast()
2033            .build()
2034            .unwrap();
2035
2036        match &definition.steps()[0] {
2037            BuilderStep::Multicast { steps, .. } => {
2038                assert_eq!(steps.len(), 2);
2039            }
2040            other => panic!("Expected Multicast, got {:?}", other),
2041        }
2042    }
2043
2044    // ── Concurrency builder tests ─────────────────────────────────────
2045
2046    #[test]
2047    fn test_builder_concurrent_sets_concurrency() {
2048        use camel_component_api::ConcurrencyModel;
2049
2050        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2051            .route_id("test-route")
2052            .concurrent(16)
2053            .to("log:info")
2054            .build()
2055            .unwrap();
2056
2057        assert_eq!(
2058            definition.concurrency_override(),
2059            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2060        );
2061    }
2062
2063    #[test]
2064    fn test_builder_concurrent_zero_means_unbounded() {
2065        use camel_component_api::ConcurrencyModel;
2066
2067        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2068            .route_id("test-route")
2069            .concurrent(0)
2070            .to("log:info")
2071            .build()
2072            .unwrap();
2073
2074        assert_eq!(
2075            definition.concurrency_override(),
2076            Some(&ConcurrencyModel::Concurrent { max: None })
2077        );
2078    }
2079
2080    #[test]
2081    fn test_builder_sequential_sets_concurrency() {
2082        use camel_component_api::ConcurrencyModel;
2083
2084        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2085            .route_id("test-route")
2086            .sequential()
2087            .to("log:info")
2088            .build()
2089            .unwrap();
2090
2091        assert_eq!(
2092            definition.concurrency_override(),
2093            Some(&ConcurrencyModel::Sequential)
2094        );
2095    }
2096
2097    #[test]
2098    fn test_builder_default_concurrency_is_none() {
2099        let definition = RouteBuilder::from("timer:tick")
2100            .route_id("test-route")
2101            .to("log:info")
2102            .build()
2103            .unwrap();
2104
2105        assert_eq!(definition.concurrency_override(), None);
2106    }
2107
2108    // ── Route lifecycle builder tests ─────────────────────────────────────
2109
2110    #[test]
2111    fn test_builder_route_id_sets_id() {
2112        let definition = RouteBuilder::from("timer:tick")
2113            .route_id("my-route")
2114            .build()
2115            .unwrap();
2116
2117        assert_eq!(definition.route_id(), "my-route");
2118    }
2119
2120    #[test]
2121    fn test_build_without_route_id_fails() {
2122        let result = RouteBuilder::from("timer:tick?period=1000")
2123            .to("log:info")
2124            .build();
2125        let err = match result {
2126            Err(e) => e.to_string(),
2127            Ok(_) => panic!("build() should fail without route_id"),
2128        };
2129        assert!(
2130            err.contains("route_id"),
2131            "error should mention route_id, got: {}",
2132            err
2133        );
2134    }
2135
2136    #[test]
2137    fn test_builder_auto_startup_false() {
2138        let definition = RouteBuilder::from("timer:tick")
2139            .route_id("test-route")
2140            .auto_startup(false)
2141            .build()
2142            .unwrap();
2143
2144        assert!(!definition.auto_startup());
2145    }
2146
2147    #[test]
2148    fn test_builder_startup_order_custom() {
2149        let definition = RouteBuilder::from("timer:tick")
2150            .route_id("test-route")
2151            .startup_order(50)
2152            .build()
2153            .unwrap();
2154
2155        assert_eq!(definition.startup_order(), 50);
2156    }
2157
2158    #[test]
2159    fn test_builder_defaults() {
2160        let definition = RouteBuilder::from("timer:tick")
2161            .route_id("test-route")
2162            .build()
2163            .unwrap();
2164
2165        assert_eq!(definition.route_id(), "test-route");
2166        assert!(definition.auto_startup());
2167        assert_eq!(definition.startup_order(), 1000);
2168    }
2169
2170    // ── Choice typestate tests ──────────────────────────────────────────────────
2171
2172    #[test]
2173    fn test_choice_builder_single_when() {
2174        let definition = RouteBuilder::from("timer:tick")
2175            .route_id("test-route")
2176            .choice()
2177            .when(|ex: &Exchange| ex.input.header("type").is_some())
2178            .to("mock:typed")
2179            .end_when()
2180            .end_choice()
2181            .build()
2182            .unwrap();
2183        assert_eq!(definition.steps().len(), 1);
2184        assert!(
2185            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2186            if whens.len() == 1 && otherwise.is_none())
2187        );
2188    }
2189
2190    #[test]
2191    fn test_choice_builder_when_otherwise() {
2192        let definition = RouteBuilder::from("timer:tick")
2193            .route_id("test-route")
2194            .choice()
2195            .when(|ex: &Exchange| ex.input.header("a").is_some())
2196            .to("mock:a")
2197            .end_when()
2198            .otherwise()
2199            .to("mock:fallback")
2200            .end_otherwise()
2201            .end_choice()
2202            .build()
2203            .unwrap();
2204        assert!(
2205            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2206            if whens.len() == 1 && otherwise.is_some())
2207        );
2208    }
2209
2210    #[test]
2211    fn test_choice_builder_multiple_whens() {
2212        let definition = RouteBuilder::from("timer:tick")
2213            .route_id("test-route")
2214            .choice()
2215            .when(|ex: &Exchange| ex.input.header("a").is_some())
2216            .to("mock:a")
2217            .end_when()
2218            .when(|ex: &Exchange| ex.input.header("b").is_some())
2219            .to("mock:b")
2220            .end_when()
2221            .end_choice()
2222            .build()
2223            .unwrap();
2224        assert!(
2225            matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2226            if whens.len() == 2)
2227        );
2228    }
2229
2230    #[test]
2231    fn test_choice_step_after_choice() {
2232        // Steps after end_choice() are added to the outer pipeline, not inside choice.
2233        let definition = RouteBuilder::from("timer:tick")
2234            .route_id("test-route")
2235            .choice()
2236            .when(|_ex: &Exchange| true)
2237            .to("mock:inner")
2238            .end_when()
2239            .end_choice()
2240            .to("mock:outer") // must be step[1], not inside choice
2241            .build()
2242            .unwrap();
2243        assert_eq!(definition.steps().len(), 2);
2244        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2245    }
2246
2247    // ── Throttle typestate tests ──────────────────────────────────────────────────
2248
2249    #[test]
2250    fn test_throttle_builder_typestate() {
2251        let definition = RouteBuilder::from("timer:tick")
2252            .route_id("test-route")
2253            .throttle(10, std::time::Duration::from_secs(1))
2254            .to("mock:result")
2255            .end_throttle()
2256            .build()
2257            .unwrap();
2258
2259        assert_eq!(definition.steps().len(), 1);
2260        assert!(matches!(
2261            &definition.steps()[0],
2262            BuilderStep::Throttle { .. }
2263        ));
2264    }
2265
2266    #[test]
2267    fn test_throttle_builder_with_strategy() {
2268        let definition = RouteBuilder::from("timer:tick")
2269            .route_id("test-route")
2270            .throttle(10, std::time::Duration::from_secs(1))
2271            .strategy(ThrottleStrategy::Reject)
2272            .to("mock:result")
2273            .end_throttle()
2274            .build()
2275            .unwrap();
2276
2277        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2278            assert_eq!(config.strategy, ThrottleStrategy::Reject);
2279        } else {
2280            panic!("Expected Throttle step");
2281        }
2282    }
2283
2284    #[test]
2285    fn test_throttle_builder_steps_collected() {
2286        let definition = RouteBuilder::from("timer:tick")
2287            .route_id("test-route")
2288            .throttle(5, std::time::Duration::from_secs(1))
2289            .set_header("throttled", Value::Bool(true))
2290            .to("mock:throttled")
2291            .end_throttle()
2292            .build()
2293            .unwrap();
2294
2295        match &definition.steps()[0] {
2296            BuilderStep::Throttle { steps, .. } => {
2297                assert_eq!(steps.len(), 2); // SetHeader + To
2298            }
2299            other => panic!("Expected Throttle, got {:?}", other),
2300        }
2301    }
2302
2303    #[test]
2304    fn test_throttle_step_after_throttle() {
2305        // Steps after end_throttle() are added to the outer pipeline, not inside throttle.
2306        let definition = RouteBuilder::from("timer:tick")
2307            .route_id("test-route")
2308            .throttle(10, std::time::Duration::from_secs(1))
2309            .to("mock:inner")
2310            .end_throttle()
2311            .to("mock:outer")
2312            .build()
2313            .unwrap();
2314
2315        assert_eq!(definition.steps().len(), 2);
2316        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2317    }
2318
2319    // ── LoadBalance typestate tests ──────────────────────────────────────────────────
2320
2321    #[test]
2322    fn test_load_balance_builder_typestate() {
2323        let definition = RouteBuilder::from("timer:tick")
2324            .route_id("test-route")
2325            .load_balance()
2326            .round_robin()
2327            .to("mock:a")
2328            .to("mock:b")
2329            .end_load_balance()
2330            .build()
2331            .unwrap();
2332
2333        assert_eq!(definition.steps().len(), 1);
2334        assert!(matches!(
2335            &definition.steps()[0],
2336            BuilderStep::LoadBalance { .. }
2337        ));
2338    }
2339
2340    #[test]
2341    fn test_load_balance_builder_with_strategy() {
2342        let definition = RouteBuilder::from("timer:tick")
2343            .route_id("test-route")
2344            .load_balance()
2345            .random()
2346            .to("mock:result")
2347            .end_load_balance()
2348            .build()
2349            .unwrap();
2350
2351        if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2352            assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2353        } else {
2354            panic!("Expected LoadBalance step");
2355        }
2356    }
2357
2358    #[test]
2359    fn test_load_balance_builder_steps_collected() {
2360        let definition = RouteBuilder::from("timer:tick")
2361            .route_id("test-route")
2362            .load_balance()
2363            .set_header("lb", Value::Bool(true))
2364            .to("mock:a")
2365            .end_load_balance()
2366            .build()
2367            .unwrap();
2368
2369        match &definition.steps()[0] {
2370            BuilderStep::LoadBalance { steps, .. } => {
2371                assert_eq!(steps.len(), 2); // SetHeader + To
2372            }
2373            other => panic!("Expected LoadBalance, got {:?}", other),
2374        }
2375    }
2376
2377    #[test]
2378    fn test_load_balance_step_after_load_balance() {
2379        // Steps after end_load_balance() are added to the outer pipeline, not inside load_balance.
2380        let definition = RouteBuilder::from("timer:tick")
2381            .route_id("test-route")
2382            .load_balance()
2383            .to("mock:inner")
2384            .end_load_balance()
2385            .to("mock:outer")
2386            .build()
2387            .unwrap();
2388
2389        assert_eq!(definition.steps().len(), 2);
2390        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2391    }
2392
2393    // ── DynamicRouter typestate tests ──────────────────────────────────────────────────
2394
2395    #[test]
2396    fn test_dynamic_router_builder() {
2397        let definition = RouteBuilder::from("timer:tick")
2398            .route_id("test-route")
2399            .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2400            .build()
2401            .unwrap();
2402
2403        assert_eq!(definition.steps().len(), 1);
2404        assert!(matches!(
2405            &definition.steps()[0],
2406            BuilderStep::DynamicRouter { .. }
2407        ));
2408    }
2409
2410    #[test]
2411    fn test_dynamic_router_builder_with_config() {
2412        let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2413            .max_iterations(100)
2414            .cache_size(500);
2415
2416        let definition = RouteBuilder::from("timer:tick")
2417            .route_id("test-route")
2418            .dynamic_router_with_config(config)
2419            .build()
2420            .unwrap();
2421
2422        assert_eq!(definition.steps().len(), 1);
2423        if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2424            assert_eq!(config.max_iterations, 100);
2425            assert_eq!(config.cache_size, 500);
2426        } else {
2427            panic!("Expected DynamicRouter step");
2428        }
2429    }
2430
2431    #[test]
2432    fn test_dynamic_router_step_after_router() {
2433        // Steps after dynamic_router() are added to the outer pipeline.
2434        let definition = RouteBuilder::from("timer:tick")
2435            .route_id("test-route")
2436            .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2437            .to("mock:outer")
2438            .build()
2439            .unwrap();
2440
2441        assert_eq!(definition.steps().len(), 2);
2442        assert!(matches!(
2443            &definition.steps()[0],
2444            BuilderStep::DynamicRouter { .. }
2445        ));
2446        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2447    }
2448
2449    #[test]
2450    fn routing_slip_builder_creates_step() {
2451        use camel_api::RoutingSlipExpression;
2452
2453        let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2454
2455        let route = RouteBuilder::from("direct:start")
2456            .route_id("routing-slip-test")
2457            .routing_slip(expression)
2458            .build()
2459            .unwrap();
2460
2461        assert!(
2462            matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2463            "Expected RoutingSlip step"
2464        );
2465    }
2466
2467    #[test]
2468    fn routing_slip_with_config_builder_creates_step() {
2469        use camel_api::RoutingSlipConfig;
2470
2471        let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2472            .uri_delimiter("|")
2473            .cache_size(50)
2474            .ignore_invalid_endpoints(true);
2475
2476        let route = RouteBuilder::from("direct:start")
2477            .route_id("routing-slip-config-test")
2478            .routing_slip_with_config(config)
2479            .build()
2480            .unwrap();
2481
2482        if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2483            assert_eq!(config.uri_delimiter, "|");
2484            assert_eq!(config.cache_size, 50);
2485            assert!(config.ignore_invalid_endpoints);
2486        } else {
2487            panic!("Expected RoutingSlip step");
2488        }
2489    }
2490
2491    #[test]
2492    fn test_builder_marshal_adds_processor_step() {
2493        let definition = RouteBuilder::from("timer:tick")
2494            .route_id("test-route")
2495            .marshal("json")
2496            .build()
2497            .unwrap();
2498        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2499    }
2500
2501    #[test]
2502    fn test_builder_unmarshal_adds_processor_step() {
2503        let definition = RouteBuilder::from("timer:tick")
2504            .route_id("test-route")
2505            .unmarshal("json")
2506            .build()
2507            .unwrap();
2508        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2509    }
2510
2511    #[test]
2512    fn validate_adds_to_step_with_validator_uri() {
2513        let def = RouteBuilder::from("direct:in")
2514            .route_id("test")
2515            .validate("schemas/order.xsd")
2516            .build()
2517            .unwrap();
2518        let steps = def.steps();
2519        assert_eq!(steps.len(), 1);
2520        assert!(
2521            matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2522            "got: {:?}",
2523            steps[0]
2524        );
2525    }
2526
2527    #[test]
2528    #[should_panic(expected = "unknown data format: 'csv'")]
2529    fn test_builder_marshal_panics_on_unknown_format() {
2530        let _ = RouteBuilder::from("timer:tick")
2531            .route_id("test-route")
2532            .marshal("csv")
2533            .build();
2534    }
2535
2536    #[test]
2537    #[should_panic(expected = "unknown data format: 'csv'")]
2538    fn test_builder_unmarshal_panics_on_unknown_format() {
2539        let _ = RouteBuilder::from("timer:tick")
2540            .route_id("test-route")
2541            .unmarshal("csv")
2542            .build();
2543    }
2544}