Skip to main content

camel_builder/
lib.rs

1use camel_api::aggregator::{AggregationStrategy, AggregatorConfig, CompletionCondition};
2use camel_api::body::Body;
3use camel_api::body_converter::BodyType;
4use camel_api::circuit_breaker::CircuitBreakerConfig;
5use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
6use camel_api::error_handler::ErrorHandlerConfig;
7use camel_api::load_balancer::LoadBalancerConfig;
8use camel_api::multicast::{MulticastConfig, MulticastStrategy};
9use camel_api::splitter::SplitterConfig;
10use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
11use camel_api::{
12    BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
13    ProcessorFn, Value,
14    runtime::{
15        CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
16        CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
17        CanonicalWhenSpec,
18    },
19};
20use camel_component::ConcurrencyModel;
21use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
22use camel_processor::{ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, SetBody, SetHeader};
23
24/// Shared step-accumulation methods for all builder types.
25///
26/// Implementors provide `steps_mut()` and get step-adding methods for free.
27/// `filter()` and other branching methods are NOT included — they return
28/// different types per builder and stay as per-builder methods.
29pub trait StepAccumulator: Sized {
30    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
31
32    fn to(mut self, endpoint: impl Into<String>) -> Self {
33        self.steps_mut().push(BuilderStep::To(endpoint.into()));
34        self
35    }
36
37    fn process<F, Fut>(mut self, f: F) -> Self
38    where
39        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
40        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
41    {
42        let svc = ProcessorFn::new(f);
43        self.steps_mut()
44            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
45        self
46    }
47
48    fn process_fn(mut self, processor: BoxProcessor) -> Self {
49        self.steps_mut().push(BuilderStep::Processor(processor));
50        self
51    }
52
53    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
54        let svc = SetHeader::new(IdentityProcessor, key, value);
55        self.steps_mut()
56            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
57        self
58    }
59
60    fn map_body<F>(mut self, mapper: F) -> Self
61    where
62        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
63    {
64        let svc = MapBody::new(IdentityProcessor, mapper);
65        self.steps_mut()
66            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
67        self
68    }
69
70    fn set_body<B>(mut self, body: B) -> Self
71    where
72        B: Into<Body> + Clone + Send + Sync + 'static,
73    {
74        let body: Body = body.into();
75        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
76        self.steps_mut()
77            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
78        self
79    }
80
81    fn set_body_fn<F>(mut self, expr: F) -> Self
82    where
83        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
84    {
85        let svc = SetBody::new(IdentityProcessor, expr);
86        self.steps_mut()
87            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
88        self
89    }
90
91    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
92    where
93        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
94    {
95        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
96        self.steps_mut()
97            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
98        self
99    }
100
101    fn aggregate(mut self, config: AggregatorConfig) -> Self {
102        self.steps_mut().push(BuilderStep::Aggregate { config });
103        self
104    }
105
106    /// Stop processing this exchange immediately. No further steps in the
107    /// current pipeline will run.
108    ///
109    /// Can be used at any point in the route: directly on RouteBuilder,
110    /// inside `.filter()`, inside `.split()`, etc.
111    fn stop(mut self) -> Self {
112        self.steps_mut().push(BuilderStep::Stop);
113        self
114    }
115
116    /// Log a message at the specified level.
117    ///
118    /// The message will be logged when an exchange passes through this step.
119    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
120        self.steps_mut().push(BuilderStep::Log {
121            level,
122            message: message.into(),
123        });
124        self
125    }
126
127    /// Convert the message body to the target type.
128    ///
129    /// Supported: Text ↔ Json ↔ Bytes. `Body::Stream` always fails.
130    /// Returns `TypeConversionFailed` if conversion is not possible.
131    ///
132    /// # Example
133    /// ```ignore
134    /// route.set_body(Value::String(r#"{"x":1}"#.into()))
135    ///      .convert_body_to(BodyType::Json)
136    ///      .to("direct:next")
137    /// ```
138    fn convert_body_to(mut self, target: BodyType) -> Self {
139        let svc = ConvertBodyTo::new(IdentityProcessor, target);
140        self.steps_mut()
141            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
142        self
143    }
144
145    /// Execute a script that can modify the exchange (headers, properties, body).
146    ///
147    /// The script has access to `headers`, `properties`, and `body` variables
148    /// and can modify them with assignment syntax: `headers["k"] = v`.
149    ///
150    /// # Example
151    /// ```ignore
152    /// // ignore: requires full CamelContext setup with registered language
153    /// route.script("rhai", r#"headers["tenant"] = "acme"; body = body + "_processed""#)
154    /// ```
155    fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
156        self.steps_mut().push(BuilderStep::Script {
157            language: language.into(),
158            script: script.into(),
159        });
160        self
161    }
162}
163
164/// A fluent builder for constructing routes.
165///
166/// # Example
167///
168/// ```ignore
169/// let definition = RouteBuilder::from("timer:tick?period=1000")
170///     .set_header("source", Value::String("timer".into()))
171///     .filter(|ex| ex.input.body.as_text().is_some())
172///     .to("log:info?showHeaders=true")
173///     .build()?;
174/// ```
175pub struct RouteBuilder {
176    from_uri: String,
177    steps: Vec<BuilderStep>,
178    error_handler: Option<ErrorHandlerConfig>,
179    circuit_breaker_config: Option<CircuitBreakerConfig>,
180    concurrency: Option<ConcurrencyModel>,
181    route_id: Option<String>,
182    auto_startup: Option<bool>,
183    startup_order: Option<i32>,
184}
185
186impl RouteBuilder {
187    /// Start building a route from the given source endpoint URI.
188    pub fn from(endpoint: &str) -> Self {
189        Self {
190            from_uri: endpoint.to_string(),
191            steps: Vec::new(),
192            error_handler: None,
193            circuit_breaker_config: None,
194            concurrency: None,
195            route_id: None,
196            auto_startup: None,
197            startup_order: None,
198        }
199    }
200
201    /// Open a filter scope. Only exchanges matching `predicate` will be processed
202    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
203    /// and continue to steps after `.end_filter()`.
204    pub fn filter<F>(self, predicate: F) -> FilterBuilder
205    where
206        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
207    {
208        FilterBuilder {
209            parent: self,
210            predicate: std::sync::Arc::new(predicate),
211            steps: vec![],
212        }
213    }
214
215    /// Open a choice scope for content-based routing.
216    ///
217    /// Within the choice, you can define multiple `.when()` clauses and an
218    /// optional `.otherwise()` clause. The first matching `when` predicate
219    /// determines which sub-pipeline executes.
220    pub fn choice(self) -> ChoiceBuilder {
221        ChoiceBuilder {
222            parent: self,
223            whens: vec![],
224            _otherwise: None,
225        }
226    }
227
228    /// Add a WireTap step that sends a clone of the exchange to the given
229    /// endpoint URI (fire-and-forget). The original exchange continues
230    /// downstream unchanged.
231    pub fn wire_tap(mut self, endpoint: &str) -> Self {
232        self.steps.push(BuilderStep::WireTap {
233            uri: endpoint.to_string(),
234        });
235        self
236    }
237
238    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
239    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
240        self.error_handler = Some(config);
241        self
242    }
243
244    /// Set a circuit breaker for this route.
245    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
246        self.circuit_breaker_config = Some(config);
247        self
248    }
249
250    /// Override the consumer's default concurrency model.
251    ///
252    /// When set, the pipeline spawns a task per exchange, processing them
253    /// concurrently. `max` limits the number of simultaneously active
254    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
255    ///
256    /// # Example
257    /// ```ignore
258    /// RouteBuilder::from("http://0.0.0.0:8080/api")
259    ///     .concurrent(16)  // max 16 in-flight pipeline executions
260    ///     .process(handle_request)
261    ///     .build()
262    /// ```
263    pub fn concurrent(mut self, max: usize) -> Self {
264        let max = if max == 0 { None } else { Some(max) };
265        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
266        self
267    }
268
269    /// Force sequential processing, overriding a concurrent-capable consumer.
270    ///
271    /// Useful for HTTP routes that mutate shared state and need ordering
272    /// guarantees.
273    pub fn sequential(mut self) -> Self {
274        self.concurrency = Some(ConcurrencyModel::Sequential);
275        self
276    }
277
278    /// Set the route ID for this route.
279    ///
280    /// If not set, the route will be assigned an auto-generated ID.
281    pub fn route_id(mut self, id: impl Into<String>) -> Self {
282        self.route_id = Some(id.into());
283        self
284    }
285
286    /// Set whether this route should automatically start when the context starts.
287    ///
288    /// Default is `true`.
289    pub fn auto_startup(mut self, auto: bool) -> Self {
290        self.auto_startup = Some(auto);
291        self
292    }
293
294    /// Set the startup order for this route.
295    ///
296    /// Routes with lower values start first. Default is 1000.
297    pub fn startup_order(mut self, order: i32) -> Self {
298        self.startup_order = Some(order);
299        self
300    }
301
302    /// Begin a Splitter sub-pipeline. Steps added after this call (until
303    /// `.end_split()`) will be executed per-fragment.
304    ///
305    /// Returns a `SplitBuilder` — you cannot call `.build()` until
306    /// `.end_split()` closes the split scope (enforced by the type system).
307    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
308        SplitBuilder {
309            parent: self,
310            config,
311            steps: Vec::new(),
312        }
313    }
314
315    /// Begin a Multicast sub-pipeline. Steps added after this call (until
316    /// `.end_multicast()`) will each receive a copy of the exchange.
317    ///
318    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
319    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
320    pub fn multicast(self) -> MulticastBuilder {
321        MulticastBuilder {
322            parent: self,
323            steps: Vec::new(),
324            config: MulticastConfig::new(),
325        }
326    }
327
328    /// Begin a Throttle sub-pipeline. Rate limits message processing to at most
329    /// `max_requests` per `period`. Steps inside the throttle scope are only
330    /// executed when the rate limit allows.
331    ///
332    /// Returns a `ThrottleBuilder` — you cannot call `.build()` until
333    /// `.end_throttle()` closes the throttle scope (enforced by the type system).
334    pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
335        ThrottleBuilder {
336            parent: self,
337            config: ThrottlerConfig::new(max_requests, period),
338            steps: Vec::new(),
339        }
340    }
341
342    /// Begin a LoadBalance sub-pipeline. Distributes exchanges across multiple
343    /// endpoints using a configurable strategy (round-robin, random, weighted, failover).
344    ///
345    /// Returns a `LoadBalancerBuilder` — you cannot call `.build()` until
346    /// `.end_load_balance()` closes the load balance scope (enforced by the type system).
347    pub fn load_balance(self) -> LoadBalancerBuilder {
348        LoadBalancerBuilder {
349            parent: self,
350            config: LoadBalancerConfig::round_robin(),
351            steps: Vec::new(),
352        }
353    }
354
355    /// Add a dynamic router step that routes exchanges dynamically based on
356    /// expression evaluation at runtime.
357    ///
358    /// The expression receives the exchange and returns `Some(uri)` to route to
359    /// the next endpoint, or `None` to stop routing.
360    ///
361    /// # Example
362    /// ```ignore
363    /// RouteBuilder::from("timer:tick")
364    ///     .route_id("test-route")
365    ///     .dynamic_router(|ex| {
366    ///         ex.input.header("dest").and_then(|v| v.as_str().map(|s| s.to_string()))
367    ///     })
368    ///     .build()
369    /// ```
370    pub fn dynamic_router(self, expression: RouterExpression) -> Self {
371        self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
372    }
373
374    /// Add a dynamic router step with full configuration.
375    ///
376    /// Allows customization of URI delimiter, cache size, timeout, and other options.
377    pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
378        self.steps.push(BuilderStep::DynamicRouter { config });
379        self
380    }
381
382    /// Consume the builder and produce a [`RouteDefinition`].
383    pub fn build(self) -> Result<RouteDefinition, CamelError> {
384        if self.from_uri.is_empty() {
385            return Err(CamelError::RouteError(
386                "route must have a 'from' URI".to_string(),
387            ));
388        }
389        let route_id = self.route_id.ok_or_else(|| {
390            CamelError::RouteError(
391                "route must have a 'route_id' — call .route_id(\"name\") on the builder"
392                    .to_string(),
393            )
394        })?;
395        let definition = RouteDefinition::new(self.from_uri, self.steps);
396        let definition = if let Some(eh) = self.error_handler {
397            definition.with_error_handler(eh)
398        } else {
399            definition
400        };
401        let definition = if let Some(cb) = self.circuit_breaker_config {
402            definition.with_circuit_breaker(cb)
403        } else {
404            definition
405        };
406        let definition = if let Some(concurrency) = self.concurrency {
407            definition.with_concurrency(concurrency)
408        } else {
409            definition
410        };
411        let definition = definition.with_route_id(route_id);
412        let definition = if let Some(auto) = self.auto_startup {
413            definition.with_auto_startup(auto)
414        } else {
415            definition
416        };
417        let definition = if let Some(order) = self.startup_order {
418            definition.with_startup_order(order)
419        } else {
420            definition
421        };
422        Ok(definition)
423    }
424
425    /// Compile this builder route into canonical v1 spec.
426    pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
427        if self.from_uri.is_empty() {
428            return Err(CamelError::RouteError(
429                "route must have a 'from' URI".to_string(),
430            ));
431        }
432        let route_id = self.route_id.ok_or_else(|| {
433            CamelError::RouteError(
434                "route must have a 'route_id' — call .route_id(\"name\") on the builder"
435                    .to_string(),
436            )
437        })?;
438
439        let steps = canonicalize_steps(self.steps)?;
440        let circuit_breaker = self
441            .circuit_breaker_config
442            .map(canonicalize_circuit_breaker);
443
444        let spec = CanonicalRouteSpec {
445            route_id,
446            from: self.from_uri,
447            steps,
448            circuit_breaker,
449            version: camel_api::CANONICAL_CONTRACT_VERSION,
450        };
451        spec.validate_contract()?;
452        Ok(spec)
453    }
454}
455
456fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
457    let mut canonical = Vec::with_capacity(steps.len());
458    for step in steps {
459        canonical.push(canonicalize_step(step)?);
460    }
461    Ok(canonical)
462}
463
464fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
465    match step {
466        BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
467        BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
468        BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
469        BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
470        BuilderStep::DeclarativeScript { expression } => {
471            Ok(CanonicalStepSpec::Script { expression })
472        }
473        BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
474            predicate,
475            steps: canonicalize_steps(steps)?,
476        }),
477        BuilderStep::DeclarativeChoice { whens, otherwise } => {
478            let mut canonical_whens = Vec::with_capacity(whens.len());
479            for DeclarativeWhenStep { predicate, steps } in whens {
480                canonical_whens.push(CanonicalWhenSpec {
481                    predicate,
482                    steps: canonicalize_steps(steps)?,
483                });
484            }
485            let otherwise = match otherwise {
486                Some(steps) => Some(canonicalize_steps(steps)?),
487                None => None,
488            };
489            Ok(CanonicalStepSpec::Choice {
490                whens: canonical_whens,
491                otherwise,
492            })
493        }
494        BuilderStep::DeclarativeSplit {
495            expression,
496            aggregation,
497            parallel,
498            parallel_limit,
499            stop_on_exception,
500            steps,
501        } => Ok(CanonicalStepSpec::Split {
502            expression: CanonicalSplitExpressionSpec::Language(expression),
503            aggregation: canonicalize_split_aggregation(aggregation)?,
504            parallel,
505            parallel_limit,
506            stop_on_exception,
507            steps: canonicalize_steps(steps)?,
508        }),
509        BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate {
510            config: canonicalize_aggregate(config)?,
511        }),
512        other => {
513            let step_name = canonical_step_name(&other);
514            let detail = camel_api::canonical_contract_rejection_reason(step_name)
515                .unwrap_or("not included in canonical v1");
516            Err(CamelError::RouteError(format!(
517                "canonical v1 does not support step `{step_name}`: {detail}"
518            )))
519        }
520    }
521}
522
523fn canonicalize_split_aggregation(
524    strategy: camel_api::splitter::AggregationStrategy,
525) -> Result<CanonicalSplitAggregationSpec, CamelError> {
526    match strategy {
527        camel_api::splitter::AggregationStrategy::LastWins => {
528            Ok(CanonicalSplitAggregationSpec::LastWins)
529        }
530        camel_api::splitter::AggregationStrategy::CollectAll => {
531            Ok(CanonicalSplitAggregationSpec::CollectAll)
532        }
533        camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
534            "canonical v1 does not support custom split aggregation".to_string(),
535        )),
536        camel_api::splitter::AggregationStrategy::Original => {
537            Ok(CanonicalSplitAggregationSpec::Original)
538        }
539    }
540}
541
542fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
543    let completion_size = match config.completion {
544        CompletionCondition::Size(size) => Some(size),
545        CompletionCondition::Predicate(_) => {
546            return Err(CamelError::RouteError(
547                "canonical v1 does not support aggregate predicate completion".to_string(),
548            ));
549        }
550    };
551    let strategy = match config.strategy {
552        AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
553        AggregationStrategy::Custom(_) => {
554            return Err(CamelError::RouteError(
555                "canonical v1 does not support custom aggregate strategy".to_string(),
556            ));
557        }
558    };
559    let bucket_ttl_ms = config
560        .bucket_ttl
561        .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
562
563    Ok(CanonicalAggregateSpec {
564        header: config.header_name,
565        completion_size,
566        strategy,
567        max_buckets: config.max_buckets,
568        bucket_ttl_ms,
569    })
570}
571
572fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
573    CanonicalCircuitBreakerSpec {
574        failure_threshold: config.failure_threshold,
575        open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
576    }
577}
578
579fn canonical_step_name(step: &BuilderStep) -> &'static str {
580    match step {
581        BuilderStep::Processor(_) => "processor",
582        BuilderStep::To(_) => "to",
583        BuilderStep::Stop => "stop",
584        BuilderStep::Log { .. } => "log",
585        BuilderStep::DeclarativeSetHeader { .. } => "set_header",
586        BuilderStep::DeclarativeSetBody { .. } => "set_body",
587        BuilderStep::DeclarativeFilter { .. } => "filter",
588        BuilderStep::DeclarativeChoice { .. } => "choice",
589        BuilderStep::DeclarativeScript { .. } => "script",
590        BuilderStep::DeclarativeSplit { .. } => "split",
591        BuilderStep::Split { .. } => "split",
592        BuilderStep::Aggregate { .. } => "aggregate",
593        BuilderStep::Filter { .. } => "filter",
594        BuilderStep::Choice { .. } => "choice",
595        BuilderStep::WireTap { .. } => "wire_tap",
596        BuilderStep::Multicast { .. } => "multicast",
597        BuilderStep::DeclarativeLog { .. } => "log",
598        BuilderStep::Bean { .. } => "bean",
599        BuilderStep::Script { .. } => "script",
600        BuilderStep::Throttle { .. } => "throttle",
601        BuilderStep::LoadBalance { .. } => "load_balancer",
602        BuilderStep::DynamicRouter { .. } => "dynamic_router",
603    }
604}
605
606impl StepAccumulator for RouteBuilder {
607    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
608        &mut self.steps
609    }
610}
611
612/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
613///
614/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
615/// but NOT `.build()` and NOT `.split()` (no nested splits).
616///
617/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
618/// and returns the parent `RouteBuilder`.
619pub struct SplitBuilder {
620    parent: RouteBuilder,
621    config: SplitterConfig,
622    steps: Vec<BuilderStep>,
623}
624
625impl SplitBuilder {
626    /// Open a filter scope within the split sub-pipeline.
627    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
628    where
629        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
630    {
631        FilterInSplitBuilder {
632            parent: self,
633            predicate: std::sync::Arc::new(predicate),
634            steps: vec![],
635        }
636    }
637
638    /// Close the split scope. Packages the accumulated sub-steps into a
639    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
640    pub fn end_split(mut self) -> RouteBuilder {
641        let split_step = BuilderStep::Split {
642            config: self.config,
643            steps: self.steps,
644        };
645        self.parent.steps.push(split_step);
646        self.parent
647    }
648}
649
650impl StepAccumulator for SplitBuilder {
651    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
652        &mut self.steps
653    }
654}
655
656/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
657pub struct FilterBuilder {
658    parent: RouteBuilder,
659    predicate: FilterPredicate,
660    steps: Vec<BuilderStep>,
661}
662
663impl FilterBuilder {
664    /// Close the filter scope. Packages the accumulated sub-steps into a
665    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
666    pub fn end_filter(mut self) -> RouteBuilder {
667        let step = BuilderStep::Filter {
668            predicate: self.predicate,
669            steps: self.steps,
670        };
671        self.parent.steps.push(step);
672        self.parent
673    }
674}
675
676impl StepAccumulator for FilterBuilder {
677    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
678        &mut self.steps
679    }
680}
681
682/// Builder for a filter scope nested inside a `.split()` block.
683pub struct FilterInSplitBuilder {
684    parent: SplitBuilder,
685    predicate: FilterPredicate,
686    steps: Vec<BuilderStep>,
687}
688
689impl FilterInSplitBuilder {
690    /// Close the filter scope and return the parent `SplitBuilder`.
691    pub fn end_filter(mut self) -> SplitBuilder {
692        let step = BuilderStep::Filter {
693            predicate: self.predicate,
694            steps: self.steps,
695        };
696        self.parent.steps.push(step);
697        self.parent
698    }
699}
700
701impl StepAccumulator for FilterInSplitBuilder {
702    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
703        &mut self.steps
704    }
705}
706
707// ── Choice/When/Otherwise builders ─────────────────────────────────────────
708
709/// Builder for a `.choice()` ... `.end_choice()` block.
710///
711/// Accumulates `when` clauses and an optional `otherwise` clause.
712/// Cannot call `.build()` until `.end_choice()` is called.
713pub struct ChoiceBuilder {
714    parent: RouteBuilder,
715    whens: Vec<WhenStep>,
716    _otherwise: Option<Vec<BuilderStep>>,
717}
718
719impl ChoiceBuilder {
720    /// Open a `when` clause. Only exchanges matching `predicate` will be
721    /// processed by the steps inside the `.when()` ... `.end_when()` scope.
722    pub fn when<F>(self, predicate: F) -> WhenBuilder
723    where
724        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
725    {
726        WhenBuilder {
727            parent: self,
728            predicate: std::sync::Arc::new(predicate),
729            steps: vec![],
730        }
731    }
732
733    /// Open an `otherwise` clause. Executed when no `when` predicate matched.
734    ///
735    /// Only one `otherwise` is allowed per `choice`. Call this after all `.when()` clauses.
736    pub fn otherwise(self) -> OtherwiseBuilder {
737        OtherwiseBuilder {
738            parent: self,
739            steps: vec![],
740        }
741    }
742
743    /// Close the choice scope. Packages all accumulated `when` clauses and
744    /// optional `otherwise` into a `BuilderStep::Choice` and returns the
745    /// parent `RouteBuilder`.
746    pub fn end_choice(mut self) -> RouteBuilder {
747        let step = BuilderStep::Choice {
748            whens: self.whens,
749            otherwise: self._otherwise,
750        };
751        self.parent.steps.push(step);
752        self.parent
753    }
754}
755
756/// Builder for the sub-pipeline within a `.when()` ... `.end_when()` block.
757pub struct WhenBuilder {
758    parent: ChoiceBuilder,
759    predicate: camel_api::FilterPredicate,
760    steps: Vec<BuilderStep>,
761}
762
763impl WhenBuilder {
764    /// Close the when scope. Packages the accumulated sub-steps into a
765    /// `WhenStep` and returns the parent `ChoiceBuilder`.
766    pub fn end_when(mut self) -> ChoiceBuilder {
767        self.parent.whens.push(WhenStep {
768            predicate: self.predicate,
769            steps: self.steps,
770        });
771        self.parent
772    }
773}
774
775impl StepAccumulator for WhenBuilder {
776    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
777        &mut self.steps
778    }
779}
780
781/// Builder for the sub-pipeline within an `.otherwise()` ... `.end_otherwise()` block.
782pub struct OtherwiseBuilder {
783    parent: ChoiceBuilder,
784    steps: Vec<BuilderStep>,
785}
786
787impl OtherwiseBuilder {
788    /// Close the otherwise scope and return the parent `ChoiceBuilder`.
789    pub fn end_otherwise(self) -> ChoiceBuilder {
790        let OtherwiseBuilder { mut parent, steps } = self;
791        parent._otherwise = Some(steps);
792        parent
793    }
794}
795
796impl StepAccumulator for OtherwiseBuilder {
797    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
798        &mut self.steps
799    }
800}
801
802/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
803///
804/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
805/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
806///
807/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
808/// and returns the parent `RouteBuilder`.
809pub struct MulticastBuilder {
810    parent: RouteBuilder,
811    steps: Vec<BuilderStep>,
812    config: MulticastConfig,
813}
814
815impl MulticastBuilder {
816    pub fn parallel(mut self, parallel: bool) -> Self {
817        self.config = self.config.parallel(parallel);
818        self
819    }
820
821    pub fn parallel_limit(mut self, limit: usize) -> Self {
822        self.config = self.config.parallel_limit(limit);
823        self
824    }
825
826    pub fn stop_on_exception(mut self, stop: bool) -> Self {
827        self.config = self.config.stop_on_exception(stop);
828        self
829    }
830
831    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
832        self.config = self.config.timeout(duration);
833        self
834    }
835
836    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
837        self.config = self.config.aggregation(strategy);
838        self
839    }
840
841    pub fn end_multicast(mut self) -> RouteBuilder {
842        let step = BuilderStep::Multicast {
843            steps: self.steps,
844            config: self.config,
845        };
846        self.parent.steps.push(step);
847        self.parent
848    }
849}
850
851impl StepAccumulator for MulticastBuilder {
852    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
853        &mut self.steps
854    }
855}
856
857/// Builder for the sub-pipeline within a `.throttle()` ... `.end_throttle()` block.
858///
859/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
860/// but NOT `.build()` and NOT `.throttle()` (no nested throttles).
861///
862/// Calling `.end_throttle()` packages the sub-steps into a `BuilderStep::Throttle`
863/// and returns the parent `RouteBuilder`.
864pub struct ThrottleBuilder {
865    parent: RouteBuilder,
866    config: ThrottlerConfig,
867    steps: Vec<BuilderStep>,
868}
869
870impl ThrottleBuilder {
871    /// Set the throttle strategy. Default is `Delay`.
872    ///
873    /// - `Delay`: Queue messages until capacity available
874    /// - `Reject`: Return error immediately when throttled
875    /// - `Drop`: Silently discard excess messages
876    pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
877        self.config = self.config.strategy(strategy);
878        self
879    }
880
881    /// Close the throttle scope. Packages the accumulated sub-steps into a
882    /// `BuilderStep::Throttle` and returns the parent `RouteBuilder`.
883    pub fn end_throttle(mut self) -> RouteBuilder {
884        let step = BuilderStep::Throttle {
885            config: self.config,
886            steps: self.steps,
887        };
888        self.parent.steps.push(step);
889        self.parent
890    }
891}
892
893impl StepAccumulator for ThrottleBuilder {
894    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
895        &mut self.steps
896    }
897}
898
899/// Builder for the sub-pipeline within a `.load_balance()` ... `.end_load_balance()` block.
900///
901/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
902/// but NOT `.build()` and NOT `.load_balance()` (no nested load balancers).
903///
904/// Calling `.end_load_balance()` packages the sub-steps into a `BuilderStep::LoadBalance`
905/// and returns the parent `RouteBuilder`.
906pub struct LoadBalancerBuilder {
907    parent: RouteBuilder,
908    config: LoadBalancerConfig,
909    steps: Vec<BuilderStep>,
910}
911
912impl LoadBalancerBuilder {
913    /// Set the load balance strategy to round-robin (default).
914    pub fn round_robin(mut self) -> Self {
915        self.config = LoadBalancerConfig::round_robin();
916        self
917    }
918
919    /// Set the load balance strategy to random selection.
920    pub fn random(mut self) -> Self {
921        self.config = LoadBalancerConfig::random();
922        self
923    }
924
925    /// Set the load balance strategy to weighted selection.
926    ///
927    /// Each endpoint is assigned a weight that determines its probability
928    /// of being selected.
929    pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
930        self.config = LoadBalancerConfig::weighted(weights);
931        self
932    }
933
934    /// Set the load balance strategy to failover.
935    ///
936    /// Exchanges are sent to the first endpoint; on failure, the next endpoint
937    /// is tried.
938    pub fn failover(mut self) -> Self {
939        self.config = LoadBalancerConfig::failover();
940        self
941    }
942
943    /// Enable or disable parallel execution of endpoints.
944    ///
945    /// When enabled, all endpoints receive the exchange simultaneously.
946    /// When disabled (default), only one endpoint is selected per exchange.
947    pub fn parallel(mut self, parallel: bool) -> Self {
948        self.config = self.config.parallel(parallel);
949        self
950    }
951
952    /// Close the load balance scope. Packages the accumulated sub-steps into a
953    /// `BuilderStep::LoadBalance` and returns the parent `RouteBuilder`.
954    pub fn end_load_balance(mut self) -> RouteBuilder {
955        let step = BuilderStep::LoadBalance {
956            config: self.config,
957            steps: self.steps,
958        };
959        self.parent.steps.push(step);
960        self.parent
961    }
962}
963
964impl StepAccumulator for LoadBalancerBuilder {
965    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
966        &mut self.steps
967    }
968}
969
970// ---------------------------------------------------------------------------
971// Tests
972// ---------------------------------------------------------------------------
973
974#[cfg(test)]
975mod tests {
976    use super::*;
977    use camel_api::load_balancer::LoadBalanceStrategy;
978    use camel_api::{Exchange, Message};
979    use camel_core::route::BuilderStep;
980    use std::sync::Arc;
981    use tower::{Service, ServiceExt};
982
983    #[test]
984    fn test_builder_from_creates_definition() {
985        let definition = RouteBuilder::from("timer:tick")
986            .route_id("test-route")
987            .build()
988            .unwrap();
989        assert_eq!(definition.from_uri(), "timer:tick");
990    }
991
992    #[test]
993    fn test_builder_empty_from_uri_errors() {
994        let result = RouteBuilder::from("").route_id("test-route").build();
995        assert!(result.is_err());
996    }
997
998    #[test]
999    fn test_builder_to_adds_step() {
1000        let definition = RouteBuilder::from("timer:tick")
1001            .route_id("test-route")
1002            .to("log:info")
1003            .build()
1004            .unwrap();
1005
1006        assert_eq!(definition.from_uri(), "timer:tick");
1007        // We can verify steps were added by checking the structure
1008        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1009    }
1010
1011    #[test]
1012    fn test_builder_filter_adds_filter_step() {
1013        let definition = RouteBuilder::from("timer:tick")
1014            .route_id("test-route")
1015            .filter(|_ex| true)
1016            .to("mock:result")
1017            .end_filter()
1018            .build()
1019            .unwrap();
1020
1021        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1022    }
1023
1024    #[test]
1025    fn test_builder_set_header_adds_processor_step() {
1026        let definition = RouteBuilder::from("timer:tick")
1027            .route_id("test-route")
1028            .set_header("key", Value::String("value".into()))
1029            .build()
1030            .unwrap();
1031
1032        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1033    }
1034
1035    #[test]
1036    fn test_builder_map_body_adds_processor_step() {
1037        let definition = RouteBuilder::from("timer:tick")
1038            .route_id("test-route")
1039            .map_body(|body| body)
1040            .build()
1041            .unwrap();
1042
1043        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1044    }
1045
1046    #[test]
1047    fn test_builder_process_adds_processor_step() {
1048        let definition = RouteBuilder::from("timer:tick")
1049            .route_id("test-route")
1050            .process(|ex| async move { Ok(ex) })
1051            .build()
1052            .unwrap();
1053
1054        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1055    }
1056
1057    #[test]
1058    fn test_builder_chain_multiple_steps() {
1059        let definition = RouteBuilder::from("timer:tick")
1060            .route_id("test-route")
1061            .set_header("source", Value::String("timer".into()))
1062            .filter(|ex| ex.input.header("source").is_some())
1063            .to("log:info")
1064            .end_filter()
1065            .to("mock:result")
1066            .build()
1067            .unwrap();
1068
1069        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
1070        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
1071        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
1072        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1073    }
1074
1075    // -----------------------------------------------------------------------
1076    // Processor behavior tests — exercise the real Tower services directly
1077    // -----------------------------------------------------------------------
1078
1079    #[tokio::test]
1080    async fn test_set_header_processor_works() {
1081        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1082        let exchange = Exchange::new(Message::new("test"));
1083        let result = svc.call(exchange).await.unwrap();
1084        assert_eq!(
1085            result.input.header("greeting"),
1086            Some(&Value::String("hello".into()))
1087        );
1088    }
1089
1090    #[tokio::test]
1091    async fn test_filter_processor_passes() {
1092        use camel_api::BoxProcessorExt;
1093        use camel_processor::FilterService;
1094
1095        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1096        let mut svc =
1097            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1098        let exchange = Exchange::new(Message::new("pass"));
1099        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1100        assert_eq!(result.input.body.as_text(), Some("pass"));
1101    }
1102
1103    #[tokio::test]
1104    async fn test_filter_processor_blocks() {
1105        use camel_api::BoxProcessorExt;
1106        use camel_processor::FilterService;
1107
1108        let sub = BoxProcessor::from_fn(|_ex| {
1109            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1110        });
1111        let mut svc =
1112            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1113        let exchange = Exchange::new(Message::new("reject"));
1114        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1115        assert_eq!(result.input.body.as_text(), Some("reject"));
1116    }
1117
1118    #[tokio::test]
1119    async fn test_map_body_processor_works() {
1120        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1121            if let Some(text) = body.as_text() {
1122                Body::Text(text.to_uppercase())
1123            } else {
1124                body
1125            }
1126        });
1127        let exchange = Exchange::new(Message::new("hello"));
1128        let result = mapper.oneshot(exchange).await.unwrap();
1129        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1130    }
1131
1132    #[tokio::test]
1133    async fn test_process_custom_processor_works() {
1134        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1135            ex.set_property("custom", Value::Bool(true));
1136            Ok(ex)
1137        });
1138        let exchange = Exchange::new(Message::default());
1139        let result = processor.oneshot(exchange).await.unwrap();
1140        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1141    }
1142
1143    // -----------------------------------------------------------------------
1144    // Sequential pipeline test
1145    // -----------------------------------------------------------------------
1146
1147    #[tokio::test]
1148    async fn test_compose_pipeline_runs_steps_in_order() {
1149        use camel_core::route::compose_pipeline;
1150
1151        let processors = vec![
1152            BoxProcessor::new(SetHeader::new(
1153                IdentityProcessor,
1154                "step",
1155                Value::String("one".into()),
1156            )),
1157            BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1158                if let Some(text) = body.as_text() {
1159                    Body::Text(format!("{}-processed", text))
1160                } else {
1161                    body
1162                }
1163            })),
1164        ];
1165
1166        let pipeline = compose_pipeline(processors);
1167        let exchange = Exchange::new(Message::new("hello"));
1168        let result = pipeline.oneshot(exchange).await.unwrap();
1169
1170        assert_eq!(
1171            result.input.header("step"),
1172            Some(&Value::String("one".into()))
1173        );
1174        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1175    }
1176
1177    #[tokio::test]
1178    async fn test_compose_pipeline_empty_is_identity() {
1179        use camel_core::route::compose_pipeline;
1180
1181        let pipeline = compose_pipeline(vec![]);
1182        let exchange = Exchange::new(Message::new("unchanged"));
1183        let result = pipeline.oneshot(exchange).await.unwrap();
1184        assert_eq!(result.input.body.as_text(), Some("unchanged"));
1185    }
1186
1187    // -----------------------------------------------------------------------
1188    // Circuit breaker builder tests
1189    // -----------------------------------------------------------------------
1190
1191    #[test]
1192    fn test_builder_circuit_breaker_sets_config() {
1193        use camel_api::circuit_breaker::CircuitBreakerConfig;
1194
1195        let config = CircuitBreakerConfig::new().failure_threshold(5);
1196        let definition = RouteBuilder::from("timer:tick")
1197            .route_id("test-route")
1198            .circuit_breaker(config)
1199            .build()
1200            .unwrap();
1201
1202        let cb = definition
1203            .circuit_breaker_config()
1204            .expect("circuit breaker should be set");
1205        assert_eq!(cb.failure_threshold, 5);
1206    }
1207
1208    #[test]
1209    fn test_builder_circuit_breaker_with_error_handler() {
1210        use camel_api::circuit_breaker::CircuitBreakerConfig;
1211        use camel_api::error_handler::ErrorHandlerConfig;
1212
1213        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1214        let eh_config = ErrorHandlerConfig::log_only();
1215
1216        let definition = RouteBuilder::from("timer:tick")
1217            .route_id("test-route")
1218            .to("log:info")
1219            .circuit_breaker(cb_config)
1220            .error_handler(eh_config)
1221            .build()
1222            .unwrap();
1223
1224        assert!(
1225            definition.circuit_breaker_config().is_some(),
1226            "circuit breaker config should be set"
1227        );
1228        // Route definition was built successfully with both configs.
1229    }
1230
1231    // --- Splitter builder tests ---
1232
1233    #[test]
1234    fn test_split_builder_typestate() {
1235        use camel_api::splitter::{SplitterConfig, split_body_lines};
1236
1237        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
1238        let definition = RouteBuilder::from("timer:test?period=1000")
1239            .route_id("test-route")
1240            .split(SplitterConfig::new(split_body_lines()))
1241            .to("mock:per-fragment")
1242            .end_split()
1243            .to("mock:final")
1244            .build()
1245            .unwrap();
1246
1247        // Should have 2 top-level steps: Split + To("mock:final")
1248        assert_eq!(definition.steps().len(), 2);
1249    }
1250
1251    #[test]
1252    fn test_split_builder_steps_collected() {
1253        use camel_api::splitter::{SplitterConfig, split_body_lines};
1254
1255        let definition = RouteBuilder::from("timer:test?period=1000")
1256            .route_id("test-route")
1257            .split(SplitterConfig::new(split_body_lines()))
1258            .set_header("fragment", Value::String("yes".into()))
1259            .to("mock:per-fragment")
1260            .end_split()
1261            .build()
1262            .unwrap();
1263
1264        // Should have 1 top-level step: Split (containing 2 sub-steps)
1265        assert_eq!(definition.steps().len(), 1);
1266        match &definition.steps()[0] {
1267            BuilderStep::Split { steps, .. } => {
1268                assert_eq!(steps.len(), 2); // SetHeader + To
1269            }
1270            other => panic!("Expected Split, got {:?}", other),
1271        }
1272    }
1273
1274    #[test]
1275    fn test_split_builder_config_propagated() {
1276        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1277
1278        let definition = RouteBuilder::from("timer:test?period=1000")
1279            .route_id("test-route")
1280            .split(
1281                SplitterConfig::new(split_body_lines())
1282                    .parallel(true)
1283                    .parallel_limit(4)
1284                    .aggregation(AggregationStrategy::CollectAll),
1285            )
1286            .to("mock:per-fragment")
1287            .end_split()
1288            .build()
1289            .unwrap();
1290
1291        match &definition.steps()[0] {
1292            BuilderStep::Split { config, .. } => {
1293                assert!(config.parallel);
1294                assert_eq!(config.parallel_limit, Some(4));
1295                assert!(matches!(
1296                    config.aggregation,
1297                    AggregationStrategy::CollectAll
1298                ));
1299            }
1300            other => panic!("Expected Split, got {:?}", other),
1301        }
1302    }
1303
1304    #[test]
1305    fn test_aggregate_builder_adds_step() {
1306        use camel_api::aggregator::AggregatorConfig;
1307        use camel_core::route::BuilderStep;
1308
1309        let definition = RouteBuilder::from("timer:tick")
1310            .route_id("test-route")
1311            .aggregate(
1312                AggregatorConfig::correlate_by("key")
1313                    .complete_when_size(2)
1314                    .build(),
1315            )
1316            .build()
1317            .unwrap();
1318
1319        assert_eq!(definition.steps().len(), 1);
1320        assert!(matches!(
1321            definition.steps()[0],
1322            BuilderStep::Aggregate { .. }
1323        ));
1324    }
1325
1326    #[test]
1327    fn test_aggregate_in_split_builder() {
1328        use camel_api::aggregator::AggregatorConfig;
1329        use camel_api::splitter::{SplitterConfig, split_body_lines};
1330        use camel_core::route::BuilderStep;
1331
1332        let definition = RouteBuilder::from("timer:tick")
1333            .route_id("test-route")
1334            .split(SplitterConfig::new(split_body_lines()))
1335            .aggregate(
1336                AggregatorConfig::correlate_by("key")
1337                    .complete_when_size(1)
1338                    .build(),
1339            )
1340            .end_split()
1341            .build()
1342            .unwrap();
1343
1344        assert_eq!(definition.steps().len(), 1);
1345        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
1346            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
1347        } else {
1348            panic!("expected Split step");
1349        }
1350    }
1351
1352    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
1353
1354    #[test]
1355    fn test_builder_set_body_static_adds_processor() {
1356        let definition = RouteBuilder::from("timer:tick")
1357            .route_id("test-route")
1358            .set_body("fixed")
1359            .build()
1360            .unwrap();
1361        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1362    }
1363
1364    #[test]
1365    fn test_builder_set_body_fn_adds_processor() {
1366        let definition = RouteBuilder::from("timer:tick")
1367            .route_id("test-route")
1368            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
1369            .build()
1370            .unwrap();
1371        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1372    }
1373
1374    #[test]
1375    fn test_builder_set_header_fn_adds_processor() {
1376        let definition = RouteBuilder::from("timer:tick")
1377            .route_id("test-route")
1378            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
1379            .build()
1380            .unwrap();
1381        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1382    }
1383
1384    #[tokio::test]
1385    async fn test_set_body_static_processor_works() {
1386        use camel_core::route::compose_pipeline;
1387        let def = RouteBuilder::from("t:t")
1388            .route_id("test-route")
1389            .set_body("replaced")
1390            .build()
1391            .unwrap();
1392        let pipeline = compose_pipeline(
1393            def.steps()
1394                .iter()
1395                .filter_map(|s| {
1396                    if let BuilderStep::Processor(p) = s {
1397                        Some(p.clone())
1398                    } else {
1399                        None
1400                    }
1401                })
1402                .collect(),
1403        );
1404        let exchange = Exchange::new(Message::new("original"));
1405        let result = pipeline.oneshot(exchange).await.unwrap();
1406        assert_eq!(result.input.body.as_text(), Some("replaced"));
1407    }
1408
1409    #[tokio::test]
1410    async fn test_set_body_fn_processor_works() {
1411        use camel_core::route::compose_pipeline;
1412        let def = RouteBuilder::from("t:t")
1413            .route_id("test-route")
1414            .set_body_fn(|ex: &Exchange| {
1415                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
1416            })
1417            .build()
1418            .unwrap();
1419        let pipeline = compose_pipeline(
1420            def.steps()
1421                .iter()
1422                .filter_map(|s| {
1423                    if let BuilderStep::Processor(p) = s {
1424                        Some(p.clone())
1425                    } else {
1426                        None
1427                    }
1428                })
1429                .collect(),
1430        );
1431        let exchange = Exchange::new(Message::new("hello"));
1432        let result = pipeline.oneshot(exchange).await.unwrap();
1433        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1434    }
1435
1436    #[tokio::test]
1437    async fn test_set_header_fn_processor_works() {
1438        use camel_core::route::compose_pipeline;
1439        let def = RouteBuilder::from("t:t")
1440            .route_id("test-route")
1441            .set_header_fn("echo", |ex: &Exchange| {
1442                ex.input
1443                    .body
1444                    .as_text()
1445                    .map(|t| Value::String(t.into()))
1446                    .unwrap_or(Value::Null)
1447            })
1448            .build()
1449            .unwrap();
1450        let pipeline = compose_pipeline(
1451            def.steps()
1452                .iter()
1453                .filter_map(|s| {
1454                    if let BuilderStep::Processor(p) = s {
1455                        Some(p.clone())
1456                    } else {
1457                        None
1458                    }
1459                })
1460                .collect(),
1461        );
1462        let exchange = Exchange::new(Message::new("ping"));
1463        let result = pipeline.oneshot(exchange).await.unwrap();
1464        assert_eq!(
1465            result.input.header("echo"),
1466            Some(&Value::String("ping".into()))
1467        );
1468    }
1469
1470    // ── FilterBuilder typestate tests ─────────────────────────────────────
1471
1472    #[test]
1473    fn test_filter_builder_typestate() {
1474        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1475            .route_id("test-route")
1476            .filter(|_ex| true)
1477            .to("mock:inner")
1478            .end_filter()
1479            .to("mock:outer")
1480            .build();
1481        assert!(result.is_ok());
1482    }
1483
1484    #[test]
1485    fn test_filter_builder_steps_collected() {
1486        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1487            .route_id("test-route")
1488            .filter(|_ex| true)
1489            .to("mock:inner")
1490            .end_filter()
1491            .build()
1492            .unwrap();
1493
1494        assert_eq!(definition.steps().len(), 1);
1495        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1496    }
1497
1498    #[test]
1499    fn test_wire_tap_builder_adds_step() {
1500        let definition = RouteBuilder::from("timer:tick")
1501            .route_id("test-route")
1502            .wire_tap("mock:tap")
1503            .to("mock:result")
1504            .build()
1505            .unwrap();
1506
1507        assert_eq!(definition.steps().len(), 2);
1508        assert!(
1509            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
1510        );
1511        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
1512    }
1513
1514    // ── MulticastBuilder typestate tests ─────────────────────────────────────
1515
1516    #[test]
1517    fn test_multicast_builder_typestate() {
1518        let definition = RouteBuilder::from("timer:tick")
1519            .route_id("test-route")
1520            .multicast()
1521            .to("direct:a")
1522            .to("direct:b")
1523            .end_multicast()
1524            .to("mock:result")
1525            .build()
1526            .unwrap();
1527
1528        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
1529    }
1530
1531    #[test]
1532    fn test_multicast_builder_steps_collected() {
1533        let definition = RouteBuilder::from("timer:tick")
1534            .route_id("test-route")
1535            .multicast()
1536            .to("direct:a")
1537            .to("direct:b")
1538            .end_multicast()
1539            .build()
1540            .unwrap();
1541
1542        match &definition.steps()[0] {
1543            BuilderStep::Multicast { steps, .. } => {
1544                assert_eq!(steps.len(), 2);
1545            }
1546            other => panic!("Expected Multicast, got {:?}", other),
1547        }
1548    }
1549
1550    // ── Concurrency builder tests ─────────────────────────────────────
1551
1552    #[test]
1553    fn test_builder_concurrent_sets_concurrency() {
1554        use camel_component::ConcurrencyModel;
1555
1556        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1557            .route_id("test-route")
1558            .concurrent(16)
1559            .to("log:info")
1560            .build()
1561            .unwrap();
1562
1563        assert_eq!(
1564            definition.concurrency_override(),
1565            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
1566        );
1567    }
1568
1569    #[test]
1570    fn test_builder_concurrent_zero_means_unbounded() {
1571        use camel_component::ConcurrencyModel;
1572
1573        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1574            .route_id("test-route")
1575            .concurrent(0)
1576            .to("log:info")
1577            .build()
1578            .unwrap();
1579
1580        assert_eq!(
1581            definition.concurrency_override(),
1582            Some(&ConcurrencyModel::Concurrent { max: None })
1583        );
1584    }
1585
1586    #[test]
1587    fn test_builder_sequential_sets_concurrency() {
1588        use camel_component::ConcurrencyModel;
1589
1590        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1591            .route_id("test-route")
1592            .sequential()
1593            .to("log:info")
1594            .build()
1595            .unwrap();
1596
1597        assert_eq!(
1598            definition.concurrency_override(),
1599            Some(&ConcurrencyModel::Sequential)
1600        );
1601    }
1602
1603    #[test]
1604    fn test_builder_default_concurrency_is_none() {
1605        let definition = RouteBuilder::from("timer:tick")
1606            .route_id("test-route")
1607            .to("log:info")
1608            .build()
1609            .unwrap();
1610
1611        assert_eq!(definition.concurrency_override(), None);
1612    }
1613
1614    // ── Route lifecycle builder tests ─────────────────────────────────────
1615
1616    #[test]
1617    fn test_builder_route_id_sets_id() {
1618        let definition = RouteBuilder::from("timer:tick")
1619            .route_id("my-route")
1620            .build()
1621            .unwrap();
1622
1623        assert_eq!(definition.route_id(), "my-route");
1624    }
1625
1626    #[test]
1627    fn test_build_without_route_id_fails() {
1628        let result = RouteBuilder::from("timer:tick?period=1000")
1629            .to("log:info")
1630            .build();
1631        let err = match result {
1632            Err(e) => e.to_string(),
1633            Ok(_) => panic!("build() should fail without route_id"),
1634        };
1635        assert!(
1636            err.contains("route_id"),
1637            "error should mention route_id, got: {}",
1638            err
1639        );
1640    }
1641
1642    #[test]
1643    fn test_builder_auto_startup_false() {
1644        let definition = RouteBuilder::from("timer:tick")
1645            .route_id("test-route")
1646            .auto_startup(false)
1647            .build()
1648            .unwrap();
1649
1650        assert!(!definition.auto_startup());
1651    }
1652
1653    #[test]
1654    fn test_builder_startup_order_custom() {
1655        let definition = RouteBuilder::from("timer:tick")
1656            .route_id("test-route")
1657            .startup_order(50)
1658            .build()
1659            .unwrap();
1660
1661        assert_eq!(definition.startup_order(), 50);
1662    }
1663
1664    #[test]
1665    fn test_builder_defaults() {
1666        let definition = RouteBuilder::from("timer:tick")
1667            .route_id("test-route")
1668            .build()
1669            .unwrap();
1670
1671        assert_eq!(definition.route_id(), "test-route");
1672        assert!(definition.auto_startup());
1673        assert_eq!(definition.startup_order(), 1000);
1674    }
1675
1676    // ── Choice typestate tests ──────────────────────────────────────────────────
1677
1678    #[test]
1679    fn test_choice_builder_single_when() {
1680        let definition = RouteBuilder::from("timer:tick")
1681            .route_id("test-route")
1682            .choice()
1683            .when(|ex: &Exchange| ex.input.header("type").is_some())
1684            .to("mock:typed")
1685            .end_when()
1686            .end_choice()
1687            .build()
1688            .unwrap();
1689        assert_eq!(definition.steps().len(), 1);
1690        assert!(
1691            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
1692            if whens.len() == 1 && otherwise.is_none())
1693        );
1694    }
1695
1696    #[test]
1697    fn test_choice_builder_when_otherwise() {
1698        let definition = RouteBuilder::from("timer:tick")
1699            .route_id("test-route")
1700            .choice()
1701            .when(|ex: &Exchange| ex.input.header("a").is_some())
1702            .to("mock:a")
1703            .end_when()
1704            .otherwise()
1705            .to("mock:fallback")
1706            .end_otherwise()
1707            .end_choice()
1708            .build()
1709            .unwrap();
1710        assert!(
1711            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
1712            if whens.len() == 1 && otherwise.is_some())
1713        );
1714    }
1715
1716    #[test]
1717    fn test_choice_builder_multiple_whens() {
1718        let definition = RouteBuilder::from("timer:tick")
1719            .route_id("test-route")
1720            .choice()
1721            .when(|ex: &Exchange| ex.input.header("a").is_some())
1722            .to("mock:a")
1723            .end_when()
1724            .when(|ex: &Exchange| ex.input.header("b").is_some())
1725            .to("mock:b")
1726            .end_when()
1727            .end_choice()
1728            .build()
1729            .unwrap();
1730        assert!(
1731            matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
1732            if whens.len() == 2)
1733        );
1734    }
1735
1736    #[test]
1737    fn test_choice_step_after_choice() {
1738        // Steps after end_choice() are added to the outer pipeline, not inside choice.
1739        let definition = RouteBuilder::from("timer:tick")
1740            .route_id("test-route")
1741            .choice()
1742            .when(|_ex: &Exchange| true)
1743            .to("mock:inner")
1744            .end_when()
1745            .end_choice()
1746            .to("mock:outer") // must be step[1], not inside choice
1747            .build()
1748            .unwrap();
1749        assert_eq!(definition.steps().len(), 2);
1750        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
1751    }
1752
1753    // ── Throttle typestate tests ──────────────────────────────────────────────────
1754
1755    #[test]
1756    fn test_throttle_builder_typestate() {
1757        let definition = RouteBuilder::from("timer:tick")
1758            .route_id("test-route")
1759            .throttle(10, std::time::Duration::from_secs(1))
1760            .to("mock:result")
1761            .end_throttle()
1762            .build()
1763            .unwrap();
1764
1765        assert_eq!(definition.steps().len(), 1);
1766        assert!(matches!(
1767            &definition.steps()[0],
1768            BuilderStep::Throttle { .. }
1769        ));
1770    }
1771
1772    #[test]
1773    fn test_throttle_builder_with_strategy() {
1774        let definition = RouteBuilder::from("timer:tick")
1775            .route_id("test-route")
1776            .throttle(10, std::time::Duration::from_secs(1))
1777            .strategy(ThrottleStrategy::Reject)
1778            .to("mock:result")
1779            .end_throttle()
1780            .build()
1781            .unwrap();
1782
1783        if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
1784            assert_eq!(config.strategy, ThrottleStrategy::Reject);
1785        } else {
1786            panic!("Expected Throttle step");
1787        }
1788    }
1789
1790    #[test]
1791    fn test_throttle_builder_steps_collected() {
1792        let definition = RouteBuilder::from("timer:tick")
1793            .route_id("test-route")
1794            .throttle(5, std::time::Duration::from_secs(1))
1795            .set_header("throttled", Value::Bool(true))
1796            .to("mock:throttled")
1797            .end_throttle()
1798            .build()
1799            .unwrap();
1800
1801        match &definition.steps()[0] {
1802            BuilderStep::Throttle { steps, .. } => {
1803                assert_eq!(steps.len(), 2); // SetHeader + To
1804            }
1805            other => panic!("Expected Throttle, got {:?}", other),
1806        }
1807    }
1808
1809    #[test]
1810    fn test_throttle_step_after_throttle() {
1811        // Steps after end_throttle() are added to the outer pipeline, not inside throttle.
1812        let definition = RouteBuilder::from("timer:tick")
1813            .route_id("test-route")
1814            .throttle(10, std::time::Duration::from_secs(1))
1815            .to("mock:inner")
1816            .end_throttle()
1817            .to("mock:outer")
1818            .build()
1819            .unwrap();
1820
1821        assert_eq!(definition.steps().len(), 2);
1822        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
1823    }
1824
1825    // ── LoadBalance typestate tests ──────────────────────────────────────────────────
1826
1827    #[test]
1828    fn test_load_balance_builder_typestate() {
1829        let definition = RouteBuilder::from("timer:tick")
1830            .route_id("test-route")
1831            .load_balance()
1832            .round_robin()
1833            .to("mock:a")
1834            .to("mock:b")
1835            .end_load_balance()
1836            .build()
1837            .unwrap();
1838
1839        assert_eq!(definition.steps().len(), 1);
1840        assert!(matches!(
1841            &definition.steps()[0],
1842            BuilderStep::LoadBalance { .. }
1843        ));
1844    }
1845
1846    #[test]
1847    fn test_load_balance_builder_with_strategy() {
1848        let definition = RouteBuilder::from("timer:tick")
1849            .route_id("test-route")
1850            .load_balance()
1851            .random()
1852            .to("mock:result")
1853            .end_load_balance()
1854            .build()
1855            .unwrap();
1856
1857        if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
1858            assert_eq!(config.strategy, LoadBalanceStrategy::Random);
1859        } else {
1860            panic!("Expected LoadBalance step");
1861        }
1862    }
1863
1864    #[test]
1865    fn test_load_balance_builder_steps_collected() {
1866        let definition = RouteBuilder::from("timer:tick")
1867            .route_id("test-route")
1868            .load_balance()
1869            .set_header("lb", Value::Bool(true))
1870            .to("mock:a")
1871            .end_load_balance()
1872            .build()
1873            .unwrap();
1874
1875        match &definition.steps()[0] {
1876            BuilderStep::LoadBalance { steps, .. } => {
1877                assert_eq!(steps.len(), 2); // SetHeader + To
1878            }
1879            other => panic!("Expected LoadBalance, got {:?}", other),
1880        }
1881    }
1882
1883    #[test]
1884    fn test_load_balance_step_after_load_balance() {
1885        // Steps after end_load_balance() are added to the outer pipeline, not inside load_balance.
1886        let definition = RouteBuilder::from("timer:tick")
1887            .route_id("test-route")
1888            .load_balance()
1889            .to("mock:inner")
1890            .end_load_balance()
1891            .to("mock:outer")
1892            .build()
1893            .unwrap();
1894
1895        assert_eq!(definition.steps().len(), 2);
1896        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
1897    }
1898
1899    // ── DynamicRouter typestate tests ──────────────────────────────────────────────────
1900
1901    #[test]
1902    fn test_dynamic_router_builder() {
1903        let definition = RouteBuilder::from("timer:tick")
1904            .route_id("test-route")
1905            .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
1906            .build()
1907            .unwrap();
1908
1909        assert_eq!(definition.steps().len(), 1);
1910        assert!(matches!(
1911            &definition.steps()[0],
1912            BuilderStep::DynamicRouter { .. }
1913        ));
1914    }
1915
1916    #[test]
1917    fn test_dynamic_router_builder_with_config() {
1918        let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
1919            .max_iterations(100)
1920            .cache_size(500);
1921
1922        let definition = RouteBuilder::from("timer:tick")
1923            .route_id("test-route")
1924            .dynamic_router_with_config(config)
1925            .build()
1926            .unwrap();
1927
1928        assert_eq!(definition.steps().len(), 1);
1929        if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
1930            assert_eq!(config.max_iterations, 100);
1931            assert_eq!(config.cache_size, 500);
1932        } else {
1933            panic!("Expected DynamicRouter step");
1934        }
1935    }
1936
1937    #[test]
1938    fn test_dynamic_router_step_after_router() {
1939        // Steps after dynamic_router() are added to the outer pipeline.
1940        let definition = RouteBuilder::from("timer:tick")
1941            .route_id("test-route")
1942            .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
1943            .to("mock:outer")
1944            .build()
1945            .unwrap();
1946
1947        assert_eq!(definition.steps().len(), 2);
1948        assert!(matches!(
1949            &definition.steps()[0],
1950            BuilderStep::DynamicRouter { .. }
1951        ));
1952        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
1953    }
1954}