Skip to main content

camel_builder/
lib.rs

1use camel_api::aggregator::AggregatorConfig;
2use camel_api::body::Body;
3use camel_api::circuit_breaker::CircuitBreakerConfig;
4use camel_api::error_handler::ErrorHandlerConfig;
5use camel_api::multicast::{MulticastConfig, MulticastStrategy};
6use camel_api::splitter::SplitterConfig;
7use camel_api::{
8    BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor, ProcessorFn, Value,
9};
10use camel_component::ConcurrencyModel;
11use camel_core::route::{BuilderStep, RouteDefinition};
12use camel_processor::{DynamicSetHeader, LogLevel, MapBody, SetBody, SetHeader, StopService};
13
14/// Shared step-accumulation methods for all builder types.
15///
16/// Implementors provide `steps_mut()` and get step-adding methods for free.
17/// `filter()` and other branching methods are NOT included — they return
18/// different types per builder and stay as per-builder methods.
19pub trait StepAccumulator: Sized {
20    fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
21
22    fn to(mut self, endpoint: impl Into<String>) -> Self {
23        self.steps_mut().push(BuilderStep::To(endpoint.into()));
24        self
25    }
26
27    fn process<F, Fut>(mut self, f: F) -> Self
28    where
29        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
30        Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
31    {
32        let svc = ProcessorFn::new(f);
33        self.steps_mut()
34            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
35        self
36    }
37
38    fn process_fn(mut self, processor: BoxProcessor) -> Self {
39        self.steps_mut().push(BuilderStep::Processor(processor));
40        self
41    }
42
43    fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
44        let svc = SetHeader::new(IdentityProcessor, key, value);
45        self.steps_mut()
46            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
47        self
48    }
49
50    fn map_body<F>(mut self, mapper: F) -> Self
51    where
52        F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
53    {
54        let svc = MapBody::new(IdentityProcessor, mapper);
55        self.steps_mut()
56            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
57        self
58    }
59
60    fn set_body<B>(mut self, body: B) -> Self
61    where
62        B: Into<Body> + Clone + Send + Sync + 'static,
63    {
64        let body: Body = body.into();
65        let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
66        self.steps_mut()
67            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
68        self
69    }
70
71    fn set_body_fn<F>(mut self, expr: F) -> Self
72    where
73        F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
74    {
75        let svc = SetBody::new(IdentityProcessor, expr);
76        self.steps_mut()
77            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
78        self
79    }
80
81    fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
82    where
83        F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
84    {
85        let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
86        self.steps_mut()
87            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
88        self
89    }
90
91    fn aggregate(mut self, config: AggregatorConfig) -> Self {
92        self.steps_mut().push(BuilderStep::Aggregate { config });
93        self
94    }
95
96    /// Stop processing this exchange immediately. No further steps in the
97    /// current pipeline will run.
98    ///
99    /// Can be used at any point in the route: directly on RouteBuilder,
100    /// inside `.filter()`, inside `.split()`, etc.
101    fn stop(mut self) -> Self {
102        self.steps_mut()
103            .push(BuilderStep::Processor(BoxProcessor::new(StopService)));
104        self
105    }
106
107    /// Log a message at the specified level.
108    ///
109    /// The message will be logged when an exchange passes through this step.
110    fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
111        use camel_processor::LogProcessor;
112        let svc = LogProcessor::new(level, message.into());
113        self.steps_mut()
114            .push(BuilderStep::Processor(BoxProcessor::new(svc)));
115        self
116    }
117}
118
119/// A fluent builder for constructing routes.
120///
121/// # Example
122///
123/// ```ignore
124/// let definition = RouteBuilder::from("timer:tick?period=1000")
125///     .set_header("source", Value::String("timer".into()))
126///     .filter(|ex| ex.input.body.as_text().is_some())
127///     .to("log:info?showHeaders=true")
128///     .build()?;
129/// ```
130pub struct RouteBuilder {
131    from_uri: String,
132    steps: Vec<BuilderStep>,
133    error_handler: Option<ErrorHandlerConfig>,
134    circuit_breaker_config: Option<CircuitBreakerConfig>,
135    concurrency: Option<ConcurrencyModel>,
136    route_id: Option<String>,
137    auto_startup: Option<bool>,
138    startup_order: Option<i32>,
139}
140
141impl RouteBuilder {
142    /// Start building a route from the given source endpoint URI.
143    pub fn from(endpoint: &str) -> Self {
144        Self {
145            from_uri: endpoint.to_string(),
146            steps: Vec::new(),
147            error_handler: None,
148            circuit_breaker_config: None,
149            concurrency: None,
150            route_id: None,
151            auto_startup: None,
152            startup_order: None,
153        }
154    }
155
156    /// Open a filter scope. Only exchanges matching `predicate` will be processed
157    /// by the steps inside the scope. Non-matching exchanges skip the scope entirely
158    /// and continue to steps after `.end_filter()`.
159    pub fn filter<F>(self, predicate: F) -> FilterBuilder
160    where
161        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
162    {
163        FilterBuilder {
164            parent: self,
165            predicate: std::sync::Arc::new(predicate),
166            steps: vec![],
167        }
168    }
169
170    /// Add a WireTap step that sends a clone of the exchange to the given
171    /// endpoint URI (fire-and-forget). The original exchange continues
172    /// downstream unchanged.
173    pub fn wire_tap(mut self, endpoint: &str) -> Self {
174        self.steps.push(BuilderStep::WireTap {
175            uri: endpoint.to_string(),
176        });
177        self
178    }
179
180    /// Set a per-route error handler. Overrides the global error handler on `CamelContext`.
181    pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
182        self.error_handler = Some(config);
183        self
184    }
185
186    /// Set a circuit breaker for this route.
187    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
188        self.circuit_breaker_config = Some(config);
189        self
190    }
191
192    /// Override the consumer's default concurrency model.
193    ///
194    /// When set, the pipeline spawns a task per exchange, processing them
195    /// concurrently. `max` limits the number of simultaneously active
196    /// pipeline executions (0 = unbounded, channel buffer is backpressure).
197    ///
198    /// # Example
199    /// ```ignore
200    /// RouteBuilder::from("http://0.0.0.0:8080/api")
201    ///     .concurrent(16)  // max 16 in-flight pipeline executions
202    ///     .process(handle_request)
203    ///     .build()
204    /// ```
205    pub fn concurrent(mut self, max: usize) -> Self {
206        let max = if max == 0 { None } else { Some(max) };
207        self.concurrency = Some(ConcurrencyModel::Concurrent { max });
208        self
209    }
210
211    /// Force sequential processing, overriding a concurrent-capable consumer.
212    ///
213    /// Useful for HTTP routes that mutate shared state and need ordering
214    /// guarantees.
215    pub fn sequential(mut self) -> Self {
216        self.concurrency = Some(ConcurrencyModel::Sequential);
217        self
218    }
219
220    /// Set the route ID for this route.
221    ///
222    /// If not set, the route will be assigned an auto-generated ID.
223    pub fn route_id(mut self, id: impl Into<String>) -> Self {
224        self.route_id = Some(id.into());
225        self
226    }
227
228    /// Set whether this route should automatically start when the context starts.
229    ///
230    /// Default is `true`.
231    pub fn auto_startup(mut self, auto: bool) -> Self {
232        self.auto_startup = Some(auto);
233        self
234    }
235
236    /// Set the startup order for this route.
237    ///
238    /// Routes with lower values start first. Default is 1000.
239    pub fn startup_order(mut self, order: i32) -> Self {
240        self.startup_order = Some(order);
241        self
242    }
243
244    /// Begin a Splitter sub-pipeline. Steps added after this call (until
245    /// `.end_split()`) will be executed per-fragment.
246    ///
247    /// Returns a `SplitBuilder` — you cannot call `.build()` until
248    /// `.end_split()` closes the split scope (enforced by the type system).
249    pub fn split(self, config: SplitterConfig) -> SplitBuilder {
250        SplitBuilder {
251            parent: self,
252            config,
253            steps: Vec::new(),
254        }
255    }
256
257    /// Begin a Multicast sub-pipeline. Steps added after this call (until
258    /// `.end_multicast()`) will each receive a copy of the exchange.
259    ///
260    /// Returns a `MulticastBuilder` — you cannot call `.build()` until
261    /// `.end_multicast()` closes the multicast scope (enforced by the type system).
262    pub fn multicast(self) -> MulticastBuilder {
263        MulticastBuilder {
264            parent: self,
265            steps: Vec::new(),
266            config: MulticastConfig::new(),
267        }
268    }
269
270    /// Consume the builder and produce a [`RouteDefinition`].
271    pub fn build(self) -> Result<RouteDefinition, CamelError> {
272        if self.from_uri.is_empty() {
273            return Err(CamelError::RouteError(
274                "route must have a 'from' URI".to_string(),
275            ));
276        }
277        let definition = RouteDefinition::new(self.from_uri, self.steps);
278        let definition = if let Some(eh) = self.error_handler {
279            definition.with_error_handler(eh)
280        } else {
281            definition
282        };
283        let definition = if let Some(cb) = self.circuit_breaker_config {
284            definition.with_circuit_breaker(cb)
285        } else {
286            definition
287        };
288        let definition = if let Some(concurrency) = self.concurrency {
289            definition.with_concurrency(concurrency)
290        } else {
291            definition
292        };
293        let definition = if let Some(id) = self.route_id {
294            definition.with_route_id(id)
295        } else {
296            definition
297        };
298        let definition = if let Some(auto) = self.auto_startup {
299            definition.with_auto_startup(auto)
300        } else {
301            definition
302        };
303        let definition = if let Some(order) = self.startup_order {
304            definition.with_startup_order(order)
305        } else {
306            definition
307        };
308        Ok(definition)
309    }
310}
311
312impl StepAccumulator for RouteBuilder {
313    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
314        &mut self.steps
315    }
316}
317
318/// Builder for the sub-pipeline within a `.split()` ... `.end_split()` block.
319///
320/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
321/// but NOT `.build()` and NOT `.split()` (no nested splits).
322///
323/// Calling `.end_split()` packages the sub-steps into a `BuilderStep::Split`
324/// and returns the parent `RouteBuilder`.
325pub struct SplitBuilder {
326    parent: RouteBuilder,
327    config: SplitterConfig,
328    steps: Vec<BuilderStep>,
329}
330
331impl SplitBuilder {
332    /// Open a filter scope within the split sub-pipeline.
333    pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
334    where
335        F: Fn(&Exchange) -> bool + Send + Sync + 'static,
336    {
337        FilterInSplitBuilder {
338            parent: self,
339            predicate: std::sync::Arc::new(predicate),
340            steps: vec![],
341        }
342    }
343
344    /// Close the split scope. Packages the accumulated sub-steps into a
345    /// `BuilderStep::Split` and returns the parent `RouteBuilder`.
346    pub fn end_split(mut self) -> RouteBuilder {
347        let split_step = BuilderStep::Split {
348            config: self.config,
349            steps: self.steps,
350        };
351        self.parent.steps.push(split_step);
352        self.parent
353    }
354}
355
356impl StepAccumulator for SplitBuilder {
357    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
358        &mut self.steps
359    }
360}
361
362/// Builder for the sub-pipeline within a `.filter()` ... `.end_filter()` block.
363pub struct FilterBuilder {
364    parent: RouteBuilder,
365    predicate: FilterPredicate,
366    steps: Vec<BuilderStep>,
367}
368
369impl FilterBuilder {
370    /// Close the filter scope. Packages the accumulated sub-steps into a
371    /// `BuilderStep::Filter` and returns the parent `RouteBuilder`.
372    pub fn end_filter(mut self) -> RouteBuilder {
373        let step = BuilderStep::Filter {
374            predicate: self.predicate,
375            steps: self.steps,
376        };
377        self.parent.steps.push(step);
378        self.parent
379    }
380}
381
382impl StepAccumulator for FilterBuilder {
383    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
384        &mut self.steps
385    }
386}
387
388/// Builder for a filter scope nested inside a `.split()` block.
389pub struct FilterInSplitBuilder {
390    parent: SplitBuilder,
391    predicate: FilterPredicate,
392    steps: Vec<BuilderStep>,
393}
394
395impl FilterInSplitBuilder {
396    /// Close the filter scope and return the parent `SplitBuilder`.
397    pub fn end_filter(mut self) -> SplitBuilder {
398        let step = BuilderStep::Filter {
399            predicate: self.predicate,
400            steps: self.steps,
401        };
402        self.parent.steps.push(step);
403        self.parent
404    }
405}
406
407impl StepAccumulator for FilterInSplitBuilder {
408    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
409        &mut self.steps
410    }
411}
412
413/// Builder for the sub-pipeline within a `.multicast()` ... `.end_multicast()` block.
414///
415/// Exposes the same step methods as `RouteBuilder` (to, process, filter, etc.)
416/// but NOT `.build()` and NOT `.multicast()` (no nested multicasts).
417///
418/// Calling `.end_multicast()` packages the sub-steps into a `BuilderStep::Multicast`
419/// and returns the parent `RouteBuilder`.
420pub struct MulticastBuilder {
421    parent: RouteBuilder,
422    steps: Vec<BuilderStep>,
423    config: MulticastConfig,
424}
425
426impl MulticastBuilder {
427    pub fn parallel(mut self, parallel: bool) -> Self {
428        self.config = self.config.parallel(parallel);
429        self
430    }
431
432    pub fn parallel_limit(mut self, limit: usize) -> Self {
433        self.config = self.config.parallel_limit(limit);
434        self
435    }
436
437    pub fn stop_on_exception(mut self, stop: bool) -> Self {
438        self.config = self.config.stop_on_exception(stop);
439        self
440    }
441
442    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
443        self.config = self.config.timeout(duration);
444        self
445    }
446
447    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
448        self.config = self.config.aggregation(strategy);
449        self
450    }
451
452    pub fn end_multicast(mut self) -> RouteBuilder {
453        let step = BuilderStep::Multicast {
454            steps: self.steps,
455            config: self.config,
456        };
457        self.parent.steps.push(step);
458        self.parent
459    }
460}
461
462impl StepAccumulator for MulticastBuilder {
463    fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
464        &mut self.steps
465    }
466}
467
468// ---------------------------------------------------------------------------
469// Tests
470// ---------------------------------------------------------------------------
471
472#[cfg(test)]
473mod tests {
474    use super::*;
475    use camel_api::{Exchange, Message};
476    use camel_core::route::BuilderStep;
477    use tower::{Service, ServiceExt};
478
479    #[test]
480    fn test_builder_from_creates_definition() {
481        let definition = RouteBuilder::from("timer:tick").build().unwrap();
482        assert_eq!(definition.from_uri(), "timer:tick");
483    }
484
485    #[test]
486    fn test_builder_empty_from_uri_errors() {
487        let result = RouteBuilder::from("").build();
488        assert!(result.is_err());
489    }
490
491    #[test]
492    fn test_builder_to_adds_step() {
493        let definition = RouteBuilder::from("timer:tick")
494            .to("log:info")
495            .build()
496            .unwrap();
497
498        assert_eq!(definition.from_uri(), "timer:tick");
499        // We can verify steps were added by checking the structure
500        assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
501    }
502
503    #[test]
504    fn test_builder_filter_adds_filter_step() {
505        let definition = RouteBuilder::from("timer:tick")
506            .filter(|_ex| true)
507            .to("mock:result")
508            .end_filter()
509            .build()
510            .unwrap();
511
512        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
513    }
514
515    #[test]
516    fn test_builder_set_header_adds_processor_step() {
517        let definition = RouteBuilder::from("timer:tick")
518            .set_header("key", Value::String("value".into()))
519            .build()
520            .unwrap();
521
522        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
523    }
524
525    #[test]
526    fn test_builder_map_body_adds_processor_step() {
527        let definition = RouteBuilder::from("timer:tick")
528            .map_body(|body| body)
529            .build()
530            .unwrap();
531
532        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
533    }
534
535    #[test]
536    fn test_builder_process_adds_processor_step() {
537        let definition = RouteBuilder::from("timer:tick")
538            .process(|ex| async move { Ok(ex) })
539            .build()
540            .unwrap();
541
542        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
543    }
544
545    #[test]
546    fn test_builder_chain_multiple_steps() {
547        let definition = RouteBuilder::from("timer:tick")
548            .set_header("source", Value::String("timer".into()))
549            .filter(|ex| ex.input.header("source").is_some())
550            .to("log:info")
551            .end_filter()
552            .to("mock:result")
553            .build()
554            .unwrap();
555
556        assert_eq!(definition.steps().len(), 3); // set_header + Filter + To("mock:result")
557        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); // set_header
558        assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); // filter
559        assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
560    }
561
562    // -----------------------------------------------------------------------
563    // Processor behavior tests — exercise the real Tower services directly
564    // -----------------------------------------------------------------------
565
566    #[tokio::test]
567    async fn test_set_header_processor_works() {
568        let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
569        let exchange = Exchange::new(Message::new("test"));
570        let result = svc.call(exchange).await.unwrap();
571        assert_eq!(
572            result.input.header("greeting"),
573            Some(&Value::String("hello".into()))
574        );
575    }
576
577    #[tokio::test]
578    async fn test_filter_processor_passes() {
579        use camel_api::BoxProcessorExt;
580        use camel_processor::FilterService;
581
582        let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
583        let mut svc =
584            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
585        let exchange = Exchange::new(Message::new("pass"));
586        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
587        assert_eq!(result.input.body.as_text(), Some("pass"));
588    }
589
590    #[tokio::test]
591    async fn test_filter_processor_blocks() {
592        use camel_api::BoxProcessorExt;
593        use camel_processor::FilterService;
594
595        let sub = BoxProcessor::from_fn(|_ex| {
596            Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
597        });
598        let mut svc =
599            FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
600        let exchange = Exchange::new(Message::new("reject"));
601        let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
602        assert_eq!(result.input.body.as_text(), Some("reject"));
603    }
604
605    #[tokio::test]
606    async fn test_map_body_processor_works() {
607        let mapper = MapBody::new(IdentityProcessor, |body: Body| {
608            if let Some(text) = body.as_text() {
609                Body::Text(text.to_uppercase())
610            } else {
611                body
612            }
613        });
614        let exchange = Exchange::new(Message::new("hello"));
615        let result = mapper.oneshot(exchange).await.unwrap();
616        assert_eq!(result.input.body.as_text(), Some("HELLO"));
617    }
618
619    #[tokio::test]
620    async fn test_process_custom_processor_works() {
621        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
622            ex.set_property("custom", Value::Bool(true));
623            Ok(ex)
624        });
625        let exchange = Exchange::new(Message::default());
626        let result = processor.oneshot(exchange).await.unwrap();
627        assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
628    }
629
630    // -----------------------------------------------------------------------
631    // Sequential pipeline test
632    // -----------------------------------------------------------------------
633
634    #[tokio::test]
635    async fn test_compose_pipeline_runs_steps_in_order() {
636        use camel_core::route::compose_pipeline;
637
638        let processors = vec![
639            BoxProcessor::new(SetHeader::new(
640                IdentityProcessor,
641                "step",
642                Value::String("one".into()),
643            )),
644            BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
645                if let Some(text) = body.as_text() {
646                    Body::Text(format!("{}-processed", text))
647                } else {
648                    body
649                }
650            })),
651        ];
652
653        let pipeline = compose_pipeline(processors);
654        let exchange = Exchange::new(Message::new("hello"));
655        let result = pipeline.oneshot(exchange).await.unwrap();
656
657        assert_eq!(
658            result.input.header("step"),
659            Some(&Value::String("one".into()))
660        );
661        assert_eq!(result.input.body.as_text(), Some("hello-processed"));
662    }
663
664    #[tokio::test]
665    async fn test_compose_pipeline_empty_is_identity() {
666        use camel_core::route::compose_pipeline;
667
668        let pipeline = compose_pipeline(vec![]);
669        let exchange = Exchange::new(Message::new("unchanged"));
670        let result = pipeline.oneshot(exchange).await.unwrap();
671        assert_eq!(result.input.body.as_text(), Some("unchanged"));
672    }
673
674    // -----------------------------------------------------------------------
675    // Circuit breaker builder tests
676    // -----------------------------------------------------------------------
677
678    #[test]
679    fn test_builder_circuit_breaker_sets_config() {
680        use camel_api::circuit_breaker::CircuitBreakerConfig;
681
682        let config = CircuitBreakerConfig::new().failure_threshold(5);
683        let definition = RouteBuilder::from("timer:tick")
684            .circuit_breaker(config)
685            .build()
686            .unwrap();
687
688        let cb = definition
689            .circuit_breaker_config()
690            .expect("circuit breaker should be set");
691        assert_eq!(cb.failure_threshold, 5);
692    }
693
694    #[test]
695    fn test_builder_circuit_breaker_with_error_handler() {
696        use camel_api::circuit_breaker::CircuitBreakerConfig;
697        use camel_api::error_handler::ErrorHandlerConfig;
698
699        let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
700        let eh_config = ErrorHandlerConfig::log_only();
701
702        let definition = RouteBuilder::from("timer:tick")
703            .to("log:info")
704            .circuit_breaker(cb_config)
705            .error_handler(eh_config)
706            .build()
707            .unwrap();
708
709        assert!(
710            definition.circuit_breaker_config().is_some(),
711            "circuit breaker config should be set"
712        );
713        // Route definition was built successfully with both configs.
714    }
715
716    // --- Splitter builder tests ---
717
718    #[test]
719    fn test_split_builder_typestate() {
720        use camel_api::splitter::{SplitterConfig, split_body_lines};
721
722        // .split() returns SplitBuilder, .end_split() returns RouteBuilder
723        let definition = RouteBuilder::from("timer:test?period=1000")
724            .split(SplitterConfig::new(split_body_lines()))
725            .to("mock:per-fragment")
726            .end_split()
727            .to("mock:final")
728            .build()
729            .unwrap();
730
731        // Should have 2 top-level steps: Split + To("mock:final")
732        assert_eq!(definition.steps().len(), 2);
733    }
734
735    #[test]
736    fn test_split_builder_steps_collected() {
737        use camel_api::splitter::{SplitterConfig, split_body_lines};
738
739        let definition = RouteBuilder::from("timer:test?period=1000")
740            .split(SplitterConfig::new(split_body_lines()))
741            .set_header("fragment", Value::String("yes".into()))
742            .to("mock:per-fragment")
743            .end_split()
744            .build()
745            .unwrap();
746
747        // Should have 1 top-level step: Split (containing 2 sub-steps)
748        assert_eq!(definition.steps().len(), 1);
749        match &definition.steps()[0] {
750            BuilderStep::Split { steps, .. } => {
751                assert_eq!(steps.len(), 2); // SetHeader + To
752            }
753            other => panic!("Expected Split, got {:?}", other),
754        }
755    }
756
757    #[test]
758    fn test_split_builder_config_propagated() {
759        use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
760
761        let definition = RouteBuilder::from("timer:test?period=1000")
762            .split(
763                SplitterConfig::new(split_body_lines())
764                    .parallel(true)
765                    .parallel_limit(4)
766                    .aggregation(AggregationStrategy::CollectAll),
767            )
768            .to("mock:per-fragment")
769            .end_split()
770            .build()
771            .unwrap();
772
773        match &definition.steps()[0] {
774            BuilderStep::Split { config, .. } => {
775                assert!(config.parallel);
776                assert_eq!(config.parallel_limit, Some(4));
777                assert!(matches!(
778                    config.aggregation,
779                    AggregationStrategy::CollectAll
780                ));
781            }
782            other => panic!("Expected Split, got {:?}", other),
783        }
784    }
785
786    #[test]
787    fn test_aggregate_builder_adds_step() {
788        use camel_api::aggregator::AggregatorConfig;
789        use camel_core::route::BuilderStep;
790
791        let definition = RouteBuilder::from("timer:tick")
792            .aggregate(
793                AggregatorConfig::correlate_by("key")
794                    .complete_when_size(2)
795                    .build(),
796            )
797            .build()
798            .unwrap();
799
800        assert_eq!(definition.steps().len(), 1);
801        assert!(matches!(
802            definition.steps()[0],
803            BuilderStep::Aggregate { .. }
804        ));
805    }
806
807    #[test]
808    fn test_aggregate_in_split_builder() {
809        use camel_api::aggregator::AggregatorConfig;
810        use camel_api::splitter::{SplitterConfig, split_body_lines};
811        use camel_core::route::BuilderStep;
812
813        let definition = RouteBuilder::from("timer:tick")
814            .split(SplitterConfig::new(split_body_lines()))
815            .aggregate(
816                AggregatorConfig::correlate_by("key")
817                    .complete_when_size(1)
818                    .build(),
819            )
820            .end_split()
821            .build()
822            .unwrap();
823
824        assert_eq!(definition.steps().len(), 1);
825        if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
826            assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
827        } else {
828            panic!("expected Split step");
829        }
830    }
831
832    // ── set_body / set_body_fn / set_header_fn builder tests ────────────────────
833
834    #[test]
835    fn test_builder_set_body_static_adds_processor() {
836        let definition = RouteBuilder::from("timer:tick")
837            .set_body("fixed")
838            .build()
839            .unwrap();
840        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
841    }
842
843    #[test]
844    fn test_builder_set_body_fn_adds_processor() {
845        let definition = RouteBuilder::from("timer:tick")
846            .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
847            .build()
848            .unwrap();
849        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
850    }
851
852    #[test]
853    fn test_builder_set_header_fn_adds_processor() {
854        let definition = RouteBuilder::from("timer:tick")
855            .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
856            .build()
857            .unwrap();
858        assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
859    }
860
861    #[tokio::test]
862    async fn test_set_body_static_processor_works() {
863        use camel_core::route::compose_pipeline;
864        let def = RouteBuilder::from("t:t")
865            .set_body("replaced")
866            .build()
867            .unwrap();
868        let pipeline = compose_pipeline(
869            def.steps()
870                .iter()
871                .filter_map(|s| {
872                    if let BuilderStep::Processor(p) = s {
873                        Some(p.clone())
874                    } else {
875                        None
876                    }
877                })
878                .collect(),
879        );
880        let exchange = Exchange::new(Message::new("original"));
881        let result = pipeline.oneshot(exchange).await.unwrap();
882        assert_eq!(result.input.body.as_text(), Some("replaced"));
883    }
884
885    #[tokio::test]
886    async fn test_set_body_fn_processor_works() {
887        use camel_core::route::compose_pipeline;
888        let def = RouteBuilder::from("t:t")
889            .set_body_fn(|ex: &Exchange| {
890                Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
891            })
892            .build()
893            .unwrap();
894        let pipeline = compose_pipeline(
895            def.steps()
896                .iter()
897                .filter_map(|s| {
898                    if let BuilderStep::Processor(p) = s {
899                        Some(p.clone())
900                    } else {
901                        None
902                    }
903                })
904                .collect(),
905        );
906        let exchange = Exchange::new(Message::new("hello"));
907        let result = pipeline.oneshot(exchange).await.unwrap();
908        assert_eq!(result.input.body.as_text(), Some("HELLO"));
909    }
910
911    #[tokio::test]
912    async fn test_set_header_fn_processor_works() {
913        use camel_core::route::compose_pipeline;
914        let def = RouteBuilder::from("t:t")
915            .set_header_fn("echo", |ex: &Exchange| {
916                ex.input
917                    .body
918                    .as_text()
919                    .map(|t| Value::String(t.into()))
920                    .unwrap_or(Value::Null)
921            })
922            .build()
923            .unwrap();
924        let pipeline = compose_pipeline(
925            def.steps()
926                .iter()
927                .filter_map(|s| {
928                    if let BuilderStep::Processor(p) = s {
929                        Some(p.clone())
930                    } else {
931                        None
932                    }
933                })
934                .collect(),
935        );
936        let exchange = Exchange::new(Message::new("ping"));
937        let result = pipeline.oneshot(exchange).await.unwrap();
938        assert_eq!(
939            result.input.header("echo"),
940            Some(&Value::String("ping".into()))
941        );
942    }
943
944    // ── FilterBuilder typestate tests ─────────────────────────────────────
945
946    #[test]
947    fn test_filter_builder_typestate() {
948        let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
949            .filter(|_ex| true)
950            .to("mock:inner")
951            .end_filter()
952            .to("mock:outer")
953            .build();
954        assert!(result.is_ok());
955    }
956
957    #[test]
958    fn test_filter_builder_steps_collected() {
959        let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
960            .filter(|_ex| true)
961            .to("mock:inner")
962            .end_filter()
963            .build()
964            .unwrap();
965
966        assert_eq!(definition.steps().len(), 1);
967        assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
968    }
969
970    #[test]
971    fn test_wire_tap_builder_adds_step() {
972        let definition = RouteBuilder::from("timer:tick")
973            .wire_tap("mock:tap")
974            .to("mock:result")
975            .build()
976            .unwrap();
977
978        assert_eq!(definition.steps().len(), 2);
979        assert!(
980            matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
981        );
982        assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
983    }
984
985    // ── MulticastBuilder typestate tests ─────────────────────────────────────
986
987    #[test]
988    fn test_multicast_builder_typestate() {
989        let definition = RouteBuilder::from("timer:tick")
990            .multicast()
991            .to("direct:a")
992            .to("direct:b")
993            .end_multicast()
994            .to("mock:result")
995            .build()
996            .unwrap();
997
998        assert_eq!(definition.steps().len(), 2); // Multicast + To("mock:result")
999    }
1000
1001    #[test]
1002    fn test_multicast_builder_steps_collected() {
1003        let definition = RouteBuilder::from("timer:tick")
1004            .multicast()
1005            .to("direct:a")
1006            .to("direct:b")
1007            .end_multicast()
1008            .build()
1009            .unwrap();
1010
1011        match &definition.steps()[0] {
1012            BuilderStep::Multicast { steps, .. } => {
1013                assert_eq!(steps.len(), 2);
1014            }
1015            other => panic!("Expected Multicast, got {:?}", other),
1016        }
1017    }
1018
1019    // ── Concurrency builder tests ─────────────────────────────────────
1020
1021    #[test]
1022    fn test_builder_concurrent_sets_concurrency() {
1023        use camel_component::ConcurrencyModel;
1024
1025        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1026            .concurrent(16)
1027            .to("log:info")
1028            .build()
1029            .unwrap();
1030
1031        assert_eq!(
1032            definition.concurrency_override(),
1033            Some(&ConcurrencyModel::Concurrent { max: Some(16) })
1034        );
1035    }
1036
1037    #[test]
1038    fn test_builder_concurrent_zero_means_unbounded() {
1039        use camel_component::ConcurrencyModel;
1040
1041        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1042            .concurrent(0)
1043            .to("log:info")
1044            .build()
1045            .unwrap();
1046
1047        assert_eq!(
1048            definition.concurrency_override(),
1049            Some(&ConcurrencyModel::Concurrent { max: None })
1050        );
1051    }
1052
1053    #[test]
1054    fn test_builder_sequential_sets_concurrency() {
1055        use camel_component::ConcurrencyModel;
1056
1057        let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1058            .sequential()
1059            .to("log:info")
1060            .build()
1061            .unwrap();
1062
1063        assert_eq!(
1064            definition.concurrency_override(),
1065            Some(&ConcurrencyModel::Sequential)
1066        );
1067    }
1068
1069    #[test]
1070    fn test_builder_default_concurrency_is_none() {
1071        let definition = RouteBuilder::from("timer:tick")
1072            .to("log:info")
1073            .build()
1074            .unwrap();
1075
1076        assert_eq!(definition.concurrency_override(), None);
1077    }
1078
1079    // ── Route lifecycle builder tests ─────────────────────────────────────
1080
1081    #[test]
1082    fn test_builder_route_id_sets_id() {
1083        let definition = RouteBuilder::from("timer:tick")
1084            .route_id("my-route")
1085            .build()
1086            .unwrap();
1087
1088        assert_eq!(definition.route_id(), Some("my-route"));
1089    }
1090
1091    #[test]
1092    fn test_builder_auto_startup_false() {
1093        let definition = RouteBuilder::from("timer:tick")
1094            .auto_startup(false)
1095            .build()
1096            .unwrap();
1097
1098        assert!(!definition.auto_startup());
1099    }
1100
1101    #[test]
1102    fn test_builder_startup_order_custom() {
1103        let definition = RouteBuilder::from("timer:tick")
1104            .startup_order(50)
1105            .build()
1106            .unwrap();
1107
1108        assert_eq!(definition.startup_order(), 50);
1109    }
1110
1111    #[test]
1112    fn test_builder_defaults() {
1113        let definition = RouteBuilder::from("timer:tick").build().unwrap();
1114
1115        assert_eq!(definition.route_id(), None);
1116        assert!(definition.auto_startup());
1117        assert_eq!(definition.startup_order(), 1000);
1118    }
1119}