Skip to main content

camel_builder/
lib.rs

1use camel_api::aggregator::AggregatorConfig;
2use camel_api::body::Body;
3use camel_api::circuit_breaker::CircuitBreakerConfig;
4use camel_api::error_handler::ErrorHandlerConfig;
5use camel_api::multicast::{MulticastConfig, MulticastStrategy};
6use camel_api::splitter::SplitterConfig;
7use camel_api::{
8    BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor, ProcessorFn, Value,
9};
10use camel_component::ConcurrencyModel;
11use camel_core::route::{BuilderStep, RouteDefinition, WhenStep};
12use camel_processor::{DynamicSetHeader, LogLevel, MapBody, SetBody, SetHeader, StopService};
13
14/// Shared step-accumulation methods for all builder types.
15///
16/// Implementors provide `steps_mut()` and get step-adding methods for free.
17/// `filter()` and other branching methods are NOT included — they return
18/// different types per builder and stay as per-builder methods.
19pub trait StepAccumulator: Sized {
20    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
21
22    fn to(mut self, endpoint: impl Into<String>) -> Self {
23        self.steps_mut().push(BuilderStep::To(endpoint.into()));
24        self
25    }
26
27    fn process<F, Fut>(mut self, f: F) -> Self
28    where
29        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
30        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
31    {
32        let svc = ProcessorFn::new(f);
33        self.steps_mut()
34            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
35        self
36    }
37
38    fn process_fn(mut self, processor: BoxProcessor) -> Self {
39        self.steps_mut().push(BuilderStep::Processor(processor));
40        self
41    }
42
43    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
44        let svc = SetHeader::new(IdentityProcessor, key, value);
45        self.steps_mut()
46            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
47        self
48    }
49
50    fn map_body<F>(mut self, mapper: F) -> Self
51    where
52        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
53    {
54        let svc = MapBody::new(IdentityProcessor, mapper);
55        self.steps_mut()
56            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
57        self
58    }
59
60    fn set_body<B>(mut self, body: B) -> Self
61    where
62        B: Into<Body> + Clone + Send + Sync + 'static,
63    {
64        let body: Body = body.into();
65        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
66        self.steps_mut()
67            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
68        self
69    }
70
71    fn set_body_fn<F>(mut self, expr: F) -> Self
72    where
73        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
74    {
75        let svc = SetBody::new(IdentityProcessor, expr);
76        self.steps_mut()
77            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
78        self
79    }
80
81    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
82    where
83        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
84    {
85        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
86        self.steps_mut()
87            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
88        self
89    }
90
91    fn aggregate(mut self, config: AggregatorConfig) -> Self {
92        self.steps_mut().push(BuilderStep::Aggregate { config });
93        self
94    }
95
96    /// Stop processing this exchange immediately. No further steps in the
97    /// current pipeline will run.
98    ///
99    /// Can be used at any point in the route: directly on RouteBuilder,
100    /// inside `.filter()`, inside `.split()`, etc.
101    fn stop(mut self) -> Self {
102        self.steps_mut()
103            .push(BuilderStep::Processor(BoxProcessor::new(StopService)));
104        self
105    }
106
107    /// Log a message at the specified level.
108    ///
109    /// The message will be logged when an exchange passes through this step.
110    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
111        use camel_processor::LogProcessor;
112        let svc = LogProcessor::new(level, message.into());
113        self.steps_mut()
114            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
115        self
116    }
117}
118
119/// A fluent builder for constructing routes.
120///
121/// # Example
122///
123/// ```ignore
124/// let definition = RouteBuilder::from("timer:tick?period=1000")
125///     .set_header("source", Value::String("timer".into()))
126///     .filter(|ex| ex.input.body.as_text().is_some())
127///     .to("log:info?showHeaders=true")
128///     .build()?;
129/// ```
130pub struct RouteBuilder {
131    from_uri: String,
132    steps: Vec<BuilderStep>,
133    error_handler: Option<ErrorHandlerConfig>,
134    circuit_breaker_config: Option<CircuitBreakerConfig>,
135    concurrency: Option<ConcurrencyModel>,
136    route_id: Option<String>,
137    auto_startup: Option<bool>,
138    startup_order: Option<i32>,
139}
140
141impl RouteBuilder {
142    /// Start building a route from the given source endpoint URI.
143    pub fn from(endpoint: &str) -> Self {
144        Self {
145            from_uri: endpoint.to_string(),
146            steps: Vec::new(),
147            error_handler: None,
148            circuit_breaker_config: None,
149            concurrency: None,
150            route_id: None,
151            auto_startup: None,
152            startup_order: None,
153        }
154    }
155
156    /// Open a filter scope. Only exchanges matching `predicate` will be processed
157    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
158    /// and continue to steps after `.end_filter()`.
159    pub fn filter<F>(self, predicate: F) -> FilterBuilder
160    where
161        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
162    {
163        FilterBuilder {
164            parent: self,
165            predicate: std::sync::Arc::new(predicate),
166            steps: vec![],
167        }
168    }
169
170    /// Open a choice scope for content-based routing.
171    ///
172    /// Within the choice, you can define multiple `.when()` clauses and an
173    /// optional `.otherwise()` clause. The first matching `when` predicate
174    /// determines which sub-pipeline executes.
175    pub fn choice(self) -> ChoiceBuilder {
176        ChoiceBuilder {
177            parent: self,
178            whens: vec![],
179            _otherwise: None,
180        }
181    }
182
183    /// Add a WireTap step that sends a clone of the exchange to the given
184    /// endpoint URI (fire-and-forget). The original exchange continues
185    /// downstream unchanged.
186    pub fn wire_tap(mut self, endpoint: &str) -> Self {
187        self.steps.push(BuilderStep::WireTap {
188            uri: endpoint.to_string(),
189        });
190        self
191    }
192
193    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
194    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
195        self.error_handler = Some(config);
196        self
197    }
198
199    /// Set a circuit breaker for this route.
200    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
201        self.circuit_breaker_config = Some(config);
202        self
203    }
204
205    /// Override the consumer's default concurrency model.
206    ///
207    /// When set, the pipeline spawns a task per exchange, processing them
208    /// concurrently. `max` limits the number of simultaneously active
209    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
210    ///
211    /// # Example
212    /// ```ignore
213    /// RouteBuilder::from("http://0.0.0.0:8080/api")
214    ///     .concurrent(16)  // max 16 in-flight pipeline executions
215    ///     .process(handle_request)
216    ///     .build()
217    /// ```
218    pub fn concurrent(mut self, max: usize) -> Self {
219        let max = if max == 0 { None } else { Some(max) };
220        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
221        self
222    }
223
224    /// Force sequential processing, overriding a concurrent-capable consumer.
225    ///
226    /// Useful for HTTP routes that mutate shared state and need ordering
227    /// guarantees.
228    pub fn sequential(mut self) -> Self {
229        self.concurrency = Some(ConcurrencyModel::Sequential);
230        self
231    }
232
233    /// Set the route ID for this route.
234    ///
235    /// If not set, the route will be assigned an auto-generated ID.
236    pub fn route_id(mut self, id: impl Into<String>) -> Self {
237        self.route_id = Some(id.into());
238        self
239    }
240
241    /// Set whether this route should automatically start when the context starts.
242    ///
243    /// Default is `true`.
244    pub fn auto_startup(mut self, auto: bool) -> Self {
245        self.auto_startup = Some(auto);
246        self
247    }
248
249    /// Set the startup order for this route.
250    ///
251    /// Routes with lower values start first. Default is 1000.
252    pub fn startup_order(mut self, order: i32) -> Self {
253        self.startup_order = Some(order);
254        self
255    }
256
257    /// Begin a Splitter sub-pipeline. Steps added after this call (until
258    /// `.end_split()`) will be executed per-fragment.
259    ///
260    /// Returns a `SplitBuilder` — you cannot call `.build()` until
261    /// `.end_split()` closes the split scope (enforced by the type system).
262    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
263        SplitBuilder {
264            parent: self,
265            config,
266            steps: Vec::new(),
267        }
268    }
269
270    /// Begin a Multicast sub-pipeline. Steps added after this call (until
271    /// `.end_multicast()`) will each receive a copy of the exchange.
272    ///
273    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
274    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
275    pub fn multicast(self) -> MulticastBuilder {
276        MulticastBuilder {
277            parent: self,
278            steps: Vec::new(),
279            config: MulticastConfig::new(),
280        }
281    }
282
283    /// Consume the builder and produce a [`RouteDefinition`].
284    pub fn build(self) -> Result<RouteDefinition, CamelError> {
285        if self.from_uri.is_empty() {
286            return Err(CamelError::RouteError(
287                "route must have a 'from' URI".to_string(),
288            ));
289        }
290        let route_id = self.route_id.ok_or_else(|| {
291            CamelError::RouteError(
292                "route must have a 'route_id' — call .route_id(\"name\") on the builder"
293                    .to_string(),
294            )
295        })?;
296        let definition = RouteDefinition::new(self.from_uri, self.steps);
297        let definition = if let Some(eh) = self.error_handler {
298            definition.with_error_handler(eh)
299        } else {
300            definition
301        };
302        let definition = if let Some(cb) = self.circuit_breaker_config {
303            definition.with_circuit_breaker(cb)
304        } else {
305            definition
306        };
307        let definition = if let Some(concurrency) = self.concurrency {
308            definition.with_concurrency(concurrency)
309        } else {
310            definition
311        };
312        let definition = definition.with_route_id(route_id);
313        let definition = if let Some(auto) = self.auto_startup {
314            definition.with_auto_startup(auto)
315        } else {
316            definition
317        };
318        let definition = if let Some(order) = self.startup_order {
319            definition.with_startup_order(order)
320        } else {
321            definition
322        };
323        Ok(definition)
324    }
325}
326
327impl StepAccumulator for RouteBuilder {
328    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
329        &mut self.steps
330    }
331}
332
333/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
334///
335/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
336/// but NOT `.build()` and NOT `.split()` (no nested splits).
337///
338/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
339/// and returns the parent `RouteBuilder`.
340pub struct SplitBuilder {
341    parent: RouteBuilder,
342    config: SplitterConfig,
343    steps: Vec<BuilderStep>,
344}
345
346impl SplitBuilder {
347    /// Open a filter scope within the split sub-pipeline.
348    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
349    where
350        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
351    {
352        FilterInSplitBuilder {
353            parent: self,
354            predicate: std::sync::Arc::new(predicate),
355            steps: vec![],
356        }
357    }
358
359    /// Close the split scope. Packages the accumulated sub-steps into a
360    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
361    pub fn end_split(mut self) -> RouteBuilder {
362        let split_step = BuilderStep::Split {
363            config: self.config,
364            steps: self.steps,
365        };
366        self.parent.steps.push(split_step);
367        self.parent
368    }
369}
370
371impl StepAccumulator for SplitBuilder {
372    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
373        &mut self.steps
374    }
375}
376
377/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
378pub struct FilterBuilder {
379    parent: RouteBuilder,
380    predicate: FilterPredicate,
381    steps: Vec<BuilderStep>,
382}
383
384impl FilterBuilder {
385    /// Close the filter scope. Packages the accumulated sub-steps into a
386    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
387    pub fn end_filter(mut self) -> RouteBuilder {
388        let step = BuilderStep::Filter {
389            predicate: self.predicate,
390            steps: self.steps,
391        };
392        self.parent.steps.push(step);
393        self.parent
394    }
395}
396
397impl StepAccumulator for FilterBuilder {
398    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
399        &mut self.steps
400    }
401}
402
403/// Builder for a filter scope nested inside a `.split()` block.
404pub struct FilterInSplitBuilder {
405    parent: SplitBuilder,
406    predicate: FilterPredicate,
407    steps: Vec<BuilderStep>,
408}
409
410impl FilterInSplitBuilder {
411    /// Close the filter scope and return the parent `SplitBuilder`.
412    pub fn end_filter(mut self) -> SplitBuilder {
413        let step = BuilderStep::Filter {
414            predicate: self.predicate,
415            steps: self.steps,
416        };
417        self.parent.steps.push(step);
418        self.parent
419    }
420}
421
422impl StepAccumulator for FilterInSplitBuilder {
423    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
424        &mut self.steps
425    }
426}
427
428// ── Choice/When/Otherwise builders ─────────────────────────────────────────
429
430/// Builder for a `.choice()` ... `.end_choice()` block.
431///
432/// Accumulates `when` clauses and an optional `otherwise` clause.
433/// Cannot call `.build()` until `.end_choice()` is called.
434pub struct ChoiceBuilder {
435    parent: RouteBuilder,
436    whens: Vec<WhenStep>,
437    _otherwise: Option<Vec<BuilderStep>>,
438}
439
440impl ChoiceBuilder {
441    /// Open a `when` clause. Only exchanges matching `predicate` will be
442    /// processed by the steps inside the `.when()` ... `.end_when()` scope.
443    pub fn when<F>(self, predicate: F) -> WhenBuilder
444    where
445        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
446    {
447        WhenBuilder {
448            parent: self,
449            predicate: std::sync::Arc::new(predicate),
450            steps: vec![],
451        }
452    }
453
454    /// Open an `otherwise` clause. Executed when no `when` predicate matched.
455    ///
456    /// Only one `otherwise` is allowed per `choice`. Call this after all `.when()` clauses.
457    pub fn otherwise(self) -> OtherwiseBuilder {
458        OtherwiseBuilder {
459            parent: self,
460            steps: vec![],
461        }
462    }
463
464    /// Close the choice scope. Packages all accumulated `when` clauses and
465    /// optional `otherwise` into a `BuilderStep::Choice` and returns the
466    /// parent `RouteBuilder`.
467    pub fn end_choice(mut self) -> RouteBuilder {
468        let step = BuilderStep::Choice {
469            whens: self.whens,
470            otherwise: self._otherwise,
471        };
472        self.parent.steps.push(step);
473        self.parent
474    }
475}
476
477/// Builder for the sub-pipeline within a `.when()` ... `.end_when()` block.
478pub struct WhenBuilder {
479    parent: ChoiceBuilder,
480    predicate: camel_api::FilterPredicate,
481    steps: Vec<BuilderStep>,
482}
483
484impl WhenBuilder {
485    /// Close the when scope. Packages the accumulated sub-steps into a
486    /// `WhenStep` and returns the parent `ChoiceBuilder`.
487    pub fn end_when(mut self) -> ChoiceBuilder {
488        self.parent.whens.push(WhenStep {
489            predicate: self.predicate,
490            steps: self.steps,
491        });
492        self.parent
493    }
494}
495
496impl StepAccumulator for WhenBuilder {
497    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
498        &mut self.steps
499    }
500}
501
502/// Builder for the sub-pipeline within an `.otherwise()` ... `.end_otherwise()` block.
503pub struct OtherwiseBuilder {
504    parent: ChoiceBuilder,
505    steps: Vec<BuilderStep>,
506}
507
508impl OtherwiseBuilder {
509    /// Close the otherwise scope and return the parent `ChoiceBuilder`.
510    pub fn end_otherwise(self) -> ChoiceBuilder {
511        let OtherwiseBuilder { mut parent, steps } = self;
512        parent._otherwise = Some(steps);
513        parent
514    }
515}
516
517impl StepAccumulator for OtherwiseBuilder {
518    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
519        &mut self.steps
520    }
521}
522
523/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
524///
525/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
526/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
527///
528/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
529/// and returns the parent `RouteBuilder`.
530pub struct MulticastBuilder {
531    parent: RouteBuilder,
532    steps: Vec<BuilderStep>,
533    config: MulticastConfig,
534}
535
536impl MulticastBuilder {
537    pub fn parallel(mut self, parallel: bool) -> Self {
538        self.config = self.config.parallel(parallel);
539        self
540    }
541
542    pub fn parallel_limit(mut self, limit: usize) -> Self {
543        self.config = self.config.parallel_limit(limit);
544        self
545    }
546
547    pub fn stop_on_exception(mut self, stop: bool) -> Self {
548        self.config = self.config.stop_on_exception(stop);
549        self
550    }
551
552    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
553        self.config = self.config.timeout(duration);
554        self
555    }
556
557    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
558        self.config = self.config.aggregation(strategy);
559        self
560    }
561
562    pub fn end_multicast(mut self) -> RouteBuilder {
563        let step = BuilderStep::Multicast {
564            steps: self.steps,
565            config: self.config,
566        };
567        self.parent.steps.push(step);
568        self.parent
569    }
570}
571
572impl StepAccumulator for MulticastBuilder {
573    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
574        &mut self.steps
575    }
576}
577
578// ---------------------------------------------------------------------------
579// Tests
580// ---------------------------------------------------------------------------
581
582#[cfg(test)]
583mod tests {
584    use super::*;
585    use camel_api::{Exchange, Message};
586    use camel_core::route::BuilderStep;
587    use tower::{Service, ServiceExt};
588
589    #[test]
590    fn test_builder_from_creates_definition() {
591        let definition = RouteBuilder::from("timer:tick")
592            .route_id("test-route")
593            .build()
594            .unwrap();
595        assert_eq!(definition.from_uri(), "timer:tick");
596    }
597
598    #[test]
599    fn test_builder_empty_from_uri_errors() {
600        let result = RouteBuilder::from("").route_id("test-route").build();
601        assert!(result.is_err());
602    }
603
604    #[test]
605    fn test_builder_to_adds_step() {
606        let definition = RouteBuilder::from("timer:tick")
607            .route_id("test-route")
608            .to("log:info")
609            .build()
610            .unwrap();
611
612        assert_eq!(definition.from_uri(), "timer:tick");
613        // We can verify steps were added by checking the structure
614        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
615    }
616
617    #[test]
618    fn test_builder_filter_adds_filter_step() {
619        let definition = RouteBuilder::from("timer:tick")
620            .route_id("test-route")
621            .filter(|_ex| true)
622            .to("mock:result")
623            .end_filter()
624            .build()
625            .unwrap();
626
627        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
628    }
629
630    #[test]
631    fn test_builder_set_header_adds_processor_step() {
632        let definition = RouteBuilder::from("timer:tick")
633            .route_id("test-route")
634            .set_header("key", Value::String("value".into()))
635            .build()
636            .unwrap();
637
638        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
639    }
640
641    #[test]
642    fn test_builder_map_body_adds_processor_step() {
643        let definition = RouteBuilder::from("timer:tick")
644            .route_id("test-route")
645            .map_body(|body| body)
646            .build()
647            .unwrap();
648
649        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
650    }
651
652    #[test]
653    fn test_builder_process_adds_processor_step() {
654        let definition = RouteBuilder::from("timer:tick")
655            .route_id("test-route")
656            .process(|ex| async move { Ok(ex) })
657            .build()
658            .unwrap();
659
660        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
661    }
662
663    #[test]
664    fn test_builder_chain_multiple_steps() {
665        let definition = RouteBuilder::from("timer:tick")
666            .route_id("test-route")
667            .set_header("source", Value::String("timer".into()))
668            .filter(|ex| ex.input.header("source").is_some())
669            .to("log:info")
670            .end_filter()
671            .to("mock:result")
672            .build()
673            .unwrap();
674
675        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
676        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
677        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
678        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
679    }
680
681    // -----------------------------------------------------------------------
682    // Processor behavior tests — exercise the real Tower services directly
683    // -----------------------------------------------------------------------
684
685    #[tokio::test]
686    async fn test_set_header_processor_works() {
687        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
688        let exchange = Exchange::new(Message::new("test"));
689        let result = svc.call(exchange).await.unwrap();
690        assert_eq!(
691            result.input.header("greeting"),
692            Some(&Value::String("hello".into()))
693        );
694    }
695
696    #[tokio::test]
697    async fn test_filter_processor_passes() {
698        use camel_api::BoxProcessorExt;
699        use camel_processor::FilterService;
700
701        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
702        let mut svc =
703            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
704        let exchange = Exchange::new(Message::new("pass"));
705        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
706        assert_eq!(result.input.body.as_text(), Some("pass"));
707    }
708
709    #[tokio::test]
710    async fn test_filter_processor_blocks() {
711        use camel_api::BoxProcessorExt;
712        use camel_processor::FilterService;
713
714        let sub = BoxProcessor::from_fn(|_ex| {
715            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
716        });
717        let mut svc =
718            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
719        let exchange = Exchange::new(Message::new("reject"));
720        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
721        assert_eq!(result.input.body.as_text(), Some("reject"));
722    }
723
724    #[tokio::test]
725    async fn test_map_body_processor_works() {
726        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
727            if let Some(text) = body.as_text() {
728                Body::Text(text.to_uppercase())
729            } else {
730                body
731            }
732        });
733        let exchange = Exchange::new(Message::new("hello"));
734        let result = mapper.oneshot(exchange).await.unwrap();
735        assert_eq!(result.input.body.as_text(), Some("HELLO"));
736    }
737
738    #[tokio::test]
739    async fn test_process_custom_processor_works() {
740        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
741            ex.set_property("custom", Value::Bool(true));
742            Ok(ex)
743        });
744        let exchange = Exchange::new(Message::default());
745        let result = processor.oneshot(exchange).await.unwrap();
746        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
747    }
748
749    // -----------------------------------------------------------------------
750    // Sequential pipeline test
751    // -----------------------------------------------------------------------
752
753    #[tokio::test]
754    async fn test_compose_pipeline_runs_steps_in_order() {
755        use camel_core::route::compose_pipeline;
756
757        let processors = vec![
758            BoxProcessor::new(SetHeader::new(
759                IdentityProcessor,
760                "step",
761                Value::String("one".into()),
762            )),
763            BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
764                if let Some(text) = body.as_text() {
765                    Body::Text(format!("{}-processed", text))
766                } else {
767                    body
768                }
769            })),
770        ];
771
772        let pipeline = compose_pipeline(processors);
773        let exchange = Exchange::new(Message::new("hello"));
774        let result = pipeline.oneshot(exchange).await.unwrap();
775
776        assert_eq!(
777            result.input.header("step"),
778            Some(&Value::String("one".into()))
779        );
780        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
781    }
782
783    #[tokio::test]
784    async fn test_compose_pipeline_empty_is_identity() {
785        use camel_core::route::compose_pipeline;
786
787        let pipeline = compose_pipeline(vec![]);
788        let exchange = Exchange::new(Message::new("unchanged"));
789        let result = pipeline.oneshot(exchange).await.unwrap();
790        assert_eq!(result.input.body.as_text(), Some("unchanged"));
791    }
792
793    // -----------------------------------------------------------------------
794    // Circuit breaker builder tests
795    // -----------------------------------------------------------------------
796
797    #[test]
798    fn test_builder_circuit_breaker_sets_config() {
799        use camel_api::circuit_breaker::CircuitBreakerConfig;
800
801        let config = CircuitBreakerConfig::new().failure_threshold(5);
802        let definition = RouteBuilder::from("timer:tick")
803            .route_id("test-route")
804            .circuit_breaker(config)
805            .build()
806            .unwrap();
807
808        let cb = definition
809            .circuit_breaker_config()
810            .expect("circuit breaker should be set");
811        assert_eq!(cb.failure_threshold, 5);
812    }
813
814    #[test]
815    fn test_builder_circuit_breaker_with_error_handler() {
816        use camel_api::circuit_breaker::CircuitBreakerConfig;
817        use camel_api::error_handler::ErrorHandlerConfig;
818
819        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
820        let eh_config = ErrorHandlerConfig::log_only();
821
822        let definition = RouteBuilder::from("timer:tick")
823            .route_id("test-route")
824            .to("log:info")
825            .circuit_breaker(cb_config)
826            .error_handler(eh_config)
827            .build()
828            .unwrap();
829
830        assert!(
831            definition.circuit_breaker_config().is_some(),
832            "circuit breaker config should be set"
833        );
834        // Route definition was built successfully with both configs.
835    }
836
837    // --- Splitter builder tests ---
838
839    #[test]
840    fn test_split_builder_typestate() {
841        use camel_api::splitter::{SplitterConfig, split_body_lines};
842
843        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
844        let definition = RouteBuilder::from("timer:test?period=1000")
845            .route_id("test-route")
846            .split(SplitterConfig::new(split_body_lines()))
847            .to("mock:per-fragment")
848            .end_split()
849            .to("mock:final")
850            .build()
851            .unwrap();
852
853        // Should have 2 top-level steps: Split + To("mock:final")
854        assert_eq!(definition.steps().len(), 2);
855    }
856
857    #[test]
858    fn test_split_builder_steps_collected() {
859        use camel_api::splitter::{SplitterConfig, split_body_lines};
860
861        let definition = RouteBuilder::from("timer:test?period=1000")
862            .route_id("test-route")
863            .split(SplitterConfig::new(split_body_lines()))
864            .set_header("fragment", Value::String("yes".into()))
865            .to("mock:per-fragment")
866            .end_split()
867            .build()
868            .unwrap();
869
870        // Should have 1 top-level step: Split (containing 2 sub-steps)
871        assert_eq!(definition.steps().len(), 1);
872        match &definition.steps()[0] {
873            BuilderStep::Split { steps, .. } => {
874                assert_eq!(steps.len(), 2); // SetHeader + To
875            }
876            other => panic!("Expected Split, got {:?}", other),
877        }
878    }
879
880    #[test]
881    fn test_split_builder_config_propagated() {
882        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
883
884        let definition = RouteBuilder::from("timer:test?period=1000")
885            .route_id("test-route")
886            .split(
887                SplitterConfig::new(split_body_lines())
888                    .parallel(true)
889                    .parallel_limit(4)
890                    .aggregation(AggregationStrategy::CollectAll),
891            )
892            .to("mock:per-fragment")
893            .end_split()
894            .build()
895            .unwrap();
896
897        match &definition.steps()[0] {
898            BuilderStep::Split { config, .. } => {
899                assert!(config.parallel);
900                assert_eq!(config.parallel_limit, Some(4));
901                assert!(matches!(
902                    config.aggregation,
903                    AggregationStrategy::CollectAll
904                ));
905            }
906            other => panic!("Expected Split, got {:?}", other),
907        }
908    }
909
910    #[test]
911    fn test_aggregate_builder_adds_step() {
912        use camel_api::aggregator::AggregatorConfig;
913        use camel_core::route::BuilderStep;
914
915        let definition = RouteBuilder::from("timer:tick")
916            .route_id("test-route")
917            .aggregate(
918                AggregatorConfig::correlate_by("key")
919                    .complete_when_size(2)
920                    .build(),
921            )
922            .build()
923            .unwrap();
924
925        assert_eq!(definition.steps().len(), 1);
926        assert!(matches!(
927            definition.steps()[0],
928            BuilderStep::Aggregate { .. }
929        ));
930    }
931
932    #[test]
933    fn test_aggregate_in_split_builder() {
934        use camel_api::aggregator::AggregatorConfig;
935        use camel_api::splitter::{SplitterConfig, split_body_lines};
936        use camel_core::route::BuilderStep;
937
938        let definition = RouteBuilder::from("timer:tick")
939            .route_id("test-route")
940            .split(SplitterConfig::new(split_body_lines()))
941            .aggregate(
942                AggregatorConfig::correlate_by("key")
943                    .complete_when_size(1)
944                    .build(),
945            )
946            .end_split()
947            .build()
948            .unwrap();
949
950        assert_eq!(definition.steps().len(), 1);
951        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
952            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
953        } else {
954            panic!("expected Split step");
955        }
956    }
957
958    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
959
960    #[test]
961    fn test_builder_set_body_static_adds_processor() {
962        let definition = RouteBuilder::from("timer:tick")
963            .route_id("test-route")
964            .set_body("fixed")
965            .build()
966            .unwrap();
967        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
968    }
969
970    #[test]
971    fn test_builder_set_body_fn_adds_processor() {
972        let definition = RouteBuilder::from("timer:tick")
973            .route_id("test-route")
974            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
975            .build()
976            .unwrap();
977        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
978    }
979
980    #[test]
981    fn test_builder_set_header_fn_adds_processor() {
982        let definition = RouteBuilder::from("timer:tick")
983            .route_id("test-route")
984            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
985            .build()
986            .unwrap();
987        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
988    }
989
990    #[tokio::test]
991    async fn test_set_body_static_processor_works() {
992        use camel_core::route::compose_pipeline;
993        let def = RouteBuilder::from("t:t")
994            .route_id("test-route")
995            .set_body("replaced")
996            .build()
997            .unwrap();
998        let pipeline = compose_pipeline(
999            def.steps()
1000                .iter()
1001                .filter_map(|s| {
1002                    if let BuilderStep::Processor(p) = s {
1003                        Some(p.clone())
1004                    } else {
1005                        None
1006                    }
1007                })
1008                .collect(),
1009        );
1010        let exchange = Exchange::new(Message::new("original"));
1011        let result = pipeline.oneshot(exchange).await.unwrap();
1012        assert_eq!(result.input.body.as_text(), Some("replaced"));
1013    }
1014
1015    #[tokio::test]
1016    async fn test_set_body_fn_processor_works() {
1017        use camel_core::route::compose_pipeline;
1018        let def = RouteBuilder::from("t:t")
1019            .route_id("test-route")
1020            .set_body_fn(|ex: &Exchange| {
1021                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
1022            })
1023            .build()
1024            .unwrap();
1025        let pipeline = compose_pipeline(
1026            def.steps()
1027                .iter()
1028                .filter_map(|s| {
1029                    if let BuilderStep::Processor(p) = s {
1030                        Some(p.clone())
1031                    } else {
1032                        None
1033                    }
1034                })
1035                .collect(),
1036        );
1037        let exchange = Exchange::new(Message::new("hello"));
1038        let result = pipeline.oneshot(exchange).await.unwrap();
1039        assert_eq!(result.input.body.as_text(), Some("HELLO"));
1040    }
1041
1042    #[tokio::test]
1043    async fn test_set_header_fn_processor_works() {
1044        use camel_core::route::compose_pipeline;
1045        let def = RouteBuilder::from("t:t")
1046            .route_id("test-route")
1047            .set_header_fn("echo", |ex: &Exchange| {
1048                ex.input
1049                    .body
1050                    .as_text()
1051                    .map(|t| Value::String(t.into()))
1052                    .unwrap_or(Value::Null)
1053            })
1054            .build()
1055            .unwrap();
1056        let pipeline = compose_pipeline(
1057            def.steps()
1058                .iter()
1059                .filter_map(|s| {
1060                    if let BuilderStep::Processor(p) = s {
1061                        Some(p.clone())
1062                    } else {
1063                        None
1064                    }
1065                })
1066                .collect(),
1067        );
1068        let exchange = Exchange::new(Message::new("ping"));
1069        let result = pipeline.oneshot(exchange).await.unwrap();
1070        assert_eq!(
1071            result.input.header("echo"),
1072            Some(&Value::String("ping".into()))
1073        );
1074    }
1075
1076    // ── FilterBuilder typestate tests ─────────────────────────────────────
1077
1078    #[test]
1079    fn test_filter_builder_typestate() {
1080        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1081            .route_id("test-route")
1082            .filter(|_ex| true)
1083            .to("mock:inner")
1084            .end_filter()
1085            .to("mock:outer")
1086            .build();
1087        assert!(result.is_ok());
1088    }
1089
1090    #[test]
1091    fn test_filter_builder_steps_collected() {
1092        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1093            .route_id("test-route")
1094            .filter(|_ex| true)
1095            .to("mock:inner")
1096            .end_filter()
1097            .build()
1098            .unwrap();
1099
1100        assert_eq!(definition.steps().len(), 1);
1101        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1102    }
1103
1104    #[test]
1105    fn test_wire_tap_builder_adds_step() {
1106        let definition = RouteBuilder::from("timer:tick")
1107            .route_id("test-route")
1108            .wire_tap("mock:tap")
1109            .to("mock:result")
1110            .build()
1111            .unwrap();
1112
1113        assert_eq!(definition.steps().len(), 2);
1114        assert!(
1115            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
1116        );
1117        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
1118    }
1119
1120    // ── MulticastBuilder typestate tests ─────────────────────────────────────
1121
1122    #[test]
1123    fn test_multicast_builder_typestate() {
1124        let definition = RouteBuilder::from("timer:tick")
1125            .route_id("test-route")
1126            .multicast()
1127            .to("direct:a")
1128            .to("direct:b")
1129            .end_multicast()
1130            .to("mock:result")
1131            .build()
1132            .unwrap();
1133
1134        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
1135    }
1136
1137    #[test]
1138    fn test_multicast_builder_steps_collected() {
1139        let definition = RouteBuilder::from("timer:tick")
1140            .route_id("test-route")
1141            .multicast()
1142            .to("direct:a")
1143            .to("direct:b")
1144            .end_multicast()
1145            .build()
1146            .unwrap();
1147
1148        match &definition.steps()[0] {
1149            BuilderStep::Multicast { steps, .. } => {
1150                assert_eq!(steps.len(), 2);
1151            }
1152            other => panic!("Expected Multicast, got {:?}", other),
1153        }
1154    }
1155
1156    // ── Concurrency builder tests ─────────────────────────────────────
1157
1158    #[test]
1159    fn test_builder_concurrent_sets_concurrency() {
1160        use camel_component::ConcurrencyModel;
1161
1162        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1163            .route_id("test-route")
1164            .concurrent(16)
1165            .to("log:info")
1166            .build()
1167            .unwrap();
1168
1169        assert_eq!(
1170            definition.concurrency_override(),
1171            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
1172        );
1173    }
1174
1175    #[test]
1176    fn test_builder_concurrent_zero_means_unbounded() {
1177        use camel_component::ConcurrencyModel;
1178
1179        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1180            .route_id("test-route")
1181            .concurrent(0)
1182            .to("log:info")
1183            .build()
1184            .unwrap();
1185
1186        assert_eq!(
1187            definition.concurrency_override(),
1188            Some(&ConcurrencyModel::Concurrent { max: None })
1189        );
1190    }
1191
1192    #[test]
1193    fn test_builder_sequential_sets_concurrency() {
1194        use camel_component::ConcurrencyModel;
1195
1196        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1197            .route_id("test-route")
1198            .sequential()
1199            .to("log:info")
1200            .build()
1201            .unwrap();
1202
1203        assert_eq!(
1204            definition.concurrency_override(),
1205            Some(&ConcurrencyModel::Sequential)
1206        );
1207    }
1208
1209    #[test]
1210    fn test_builder_default_concurrency_is_none() {
1211        let definition = RouteBuilder::from("timer:tick")
1212            .route_id("test-route")
1213            .to("log:info")
1214            .build()
1215            .unwrap();
1216
1217        assert_eq!(definition.concurrency_override(), None);
1218    }
1219
1220    // ── Route lifecycle builder tests ─────────────────────────────────────
1221
1222    #[test]
1223    fn test_builder_route_id_sets_id() {
1224        let definition = RouteBuilder::from("timer:tick")
1225            .route_id("my-route")
1226            .build()
1227            .unwrap();
1228
1229        assert_eq!(definition.route_id(), "my-route");
1230    }
1231
1232    #[test]
1233    fn test_build_without_route_id_fails() {
1234        let result = RouteBuilder::from("timer:tick?period=1000")
1235            .to("log:info")
1236            .build();
1237        let err = match result {
1238            Err(e) => e.to_string(),
1239            Ok(_) => panic!("build() should fail without route_id"),
1240        };
1241        assert!(
1242            err.contains("route_id"),
1243            "error should mention route_id, got: {}",
1244            err
1245        );
1246    }
1247
1248    #[test]
1249    fn test_builder_auto_startup_false() {
1250        let definition = RouteBuilder::from("timer:tick")
1251            .route_id("test-route")
1252            .auto_startup(false)
1253            .build()
1254            .unwrap();
1255
1256        assert!(!definition.auto_startup());
1257    }
1258
1259    #[test]
1260    fn test_builder_startup_order_custom() {
1261        let definition = RouteBuilder::from("timer:tick")
1262            .route_id("test-route")
1263            .startup_order(50)
1264            .build()
1265            .unwrap();
1266
1267        assert_eq!(definition.startup_order(), 50);
1268    }
1269
1270    #[test]
1271    fn test_builder_defaults() {
1272        let definition = RouteBuilder::from("timer:tick")
1273            .route_id("test-route")
1274            .build()
1275            .unwrap();
1276
1277        assert_eq!(definition.route_id(), "test-route");
1278        assert!(definition.auto_startup());
1279        assert_eq!(definition.startup_order(), 1000);
1280    }
1281
1282    // ── Choice typestate tests ──────────────────────────────────────────────────
1283
1284    #[test]
1285    fn test_choice_builder_single_when() {
1286        let definition = RouteBuilder::from("timer:tick")
1287            .route_id("test-route")
1288            .choice()
1289            .when(|ex: &Exchange| ex.input.header("type").is_some())
1290            .to("mock:typed")
1291            .end_when()
1292            .end_choice()
1293            .build()
1294            .unwrap();
1295        assert_eq!(definition.steps().len(), 1);
1296        assert!(
1297            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
1298            if whens.len() == 1 && otherwise.is_none())
1299        );
1300    }
1301
1302    #[test]
1303    fn test_choice_builder_when_otherwise() {
1304        let definition = RouteBuilder::from("timer:tick")
1305            .route_id("test-route")
1306            .choice()
1307            .when(|ex: &Exchange| ex.input.header("a").is_some())
1308            .to("mock:a")
1309            .end_when()
1310            .otherwise()
1311            .to("mock:fallback")
1312            .end_otherwise()
1313            .end_choice()
1314            .build()
1315            .unwrap();
1316        assert!(
1317            matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
1318            if whens.len() == 1 && otherwise.is_some())
1319        );
1320    }
1321
1322    #[test]
1323    fn test_choice_builder_multiple_whens() {
1324        let definition = RouteBuilder::from("timer:tick")
1325            .route_id("test-route")
1326            .choice()
1327            .when(|ex: &Exchange| ex.input.header("a").is_some())
1328            .to("mock:a")
1329            .end_when()
1330            .when(|ex: &Exchange| ex.input.header("b").is_some())
1331            .to("mock:b")
1332            .end_when()
1333            .end_choice()
1334            .build()
1335            .unwrap();
1336        assert!(
1337            matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
1338            if whens.len() == 2)
1339        );
1340    }
1341
1342    #[test]
1343    fn test_choice_step_after_choice() {
1344        // Steps after end_choice() are added to the outer pipeline, not inside choice.
1345        let definition = RouteBuilder::from("timer:tick")
1346            .route_id("test-route")
1347            .choice()
1348            .when(|_ex: &Exchange| true)
1349            .to("mock:inner")
1350            .end_when()
1351            .end_choice()
1352            .to("mock:outer") // must be step[1], not inside choice
1353            .build()
1354            .unwrap();
1355        assert_eq!(definition.steps().len(), 2);
1356        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
1357    }
1358}