1use camel_api::aggregator::AggregatorConfig;
2use camel_api::body::Body;
3use camel_api::body_converter::BodyType;
4use camel_api::circuit_breaker::CircuitBreakerConfig;
5use camel_api::error_handler::ErrorHandlerConfig;
6use camel_api::multicast::{MulticastConfig, MulticastStrategy};
7use camel_api::splitter::SplitterConfig;
8use camel_api::{
9 BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor, ProcessorFn, Value,
10};
11use camel_component::ConcurrencyModel;
12use camel_core::route::{BuilderStep, RouteDefinition, WhenStep};
13use camel_processor::{
14 ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, SetBody, SetHeader, StopService,
15};
16
17pub trait StepAccumulator: Sized {
23 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
24
25 fn to(mut self, endpoint: impl Into<String>) -> Self {
26 self.steps_mut().push(BuilderStep::To(endpoint.into()));
27 self
28 }
29
30 fn process<F, Fut>(mut self, f: F) -> Self
31 where
32 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
33 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
34 {
35 let svc = ProcessorFn::new(f);
36 self.steps_mut()
37 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
38 self
39 }
40
41 fn process_fn(mut self, processor: BoxProcessor) -> Self {
42 self.steps_mut().push(BuilderStep::Processor(processor));
43 self
44 }
45
46 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
47 let svc = SetHeader::new(IdentityProcessor, key, value);
48 self.steps_mut()
49 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
50 self
51 }
52
53 fn map_body<F>(mut self, mapper: F) -> Self
54 where
55 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
56 {
57 let svc = MapBody::new(IdentityProcessor, mapper);
58 self.steps_mut()
59 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
60 self
61 }
62
63 fn set_body<B>(mut self, body: B) -> Self
64 where
65 B: Into<Body> + Clone + Send + Sync + 'static,
66 {
67 let body: Body = body.into();
68 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
69 self.steps_mut()
70 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
71 self
72 }
73
74 fn set_body_fn<F>(mut self, expr: F) -> Self
75 where
76 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
77 {
78 let svc = SetBody::new(IdentityProcessor, expr);
79 self.steps_mut()
80 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
81 self
82 }
83
84 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
85 where
86 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
87 {
88 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
89 self.steps_mut()
90 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
91 self
92 }
93
94 fn aggregate(mut self, config: AggregatorConfig) -> Self {
95 self.steps_mut().push(BuilderStep::Aggregate { config });
96 self
97 }
98
99 fn stop(mut self) -> Self {
105 self.steps_mut()
106 .push(BuilderStep::Processor(BoxProcessor::new(StopService)));
107 self
108 }
109
110 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
114 use camel_processor::LogProcessor;
115 let svc = LogProcessor::new(level, message.into());
116 self.steps_mut()
117 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
118 self
119 }
120
121 fn convert_body_to(mut self, target: BodyType) -> Self {
133 let svc = ConvertBodyTo::new(IdentityProcessor, target);
134 self.steps_mut()
135 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
136 self
137 }
138}
139
140pub struct RouteBuilder {
152 from_uri: String,
153 steps: Vec<BuilderStep>,
154 error_handler: Option<ErrorHandlerConfig>,
155 circuit_breaker_config: Option<CircuitBreakerConfig>,
156 concurrency: Option<ConcurrencyModel>,
157 route_id: Option<String>,
158 auto_startup: Option<bool>,
159 startup_order: Option<i32>,
160}
161
162impl RouteBuilder {
163 pub fn from(endpoint: &str) -> Self {
165 Self {
166 from_uri: endpoint.to_string(),
167 steps: Vec::new(),
168 error_handler: None,
169 circuit_breaker_config: None,
170 concurrency: None,
171 route_id: None,
172 auto_startup: None,
173 startup_order: None,
174 }
175 }
176
177 pub fn filter<F>(self, predicate: F) -> FilterBuilder
181 where
182 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
183 {
184 FilterBuilder {
185 parent: self,
186 predicate: std::sync::Arc::new(predicate),
187 steps: vec![],
188 }
189 }
190
191 pub fn choice(self) -> ChoiceBuilder {
197 ChoiceBuilder {
198 parent: self,
199 whens: vec![],
200 _otherwise: None,
201 }
202 }
203
204 pub fn wire_tap(mut self, endpoint: &str) -> Self {
208 self.steps.push(BuilderStep::WireTap {
209 uri: endpoint.to_string(),
210 });
211 self
212 }
213
214 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
216 self.error_handler = Some(config);
217 self
218 }
219
220 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
222 self.circuit_breaker_config = Some(config);
223 self
224 }
225
226 pub fn concurrent(mut self, max: usize) -> Self {
240 let max = if max == 0 { None } else { Some(max) };
241 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
242 self
243 }
244
245 pub fn sequential(mut self) -> Self {
250 self.concurrency = Some(ConcurrencyModel::Sequential);
251 self
252 }
253
254 pub fn route_id(mut self, id: impl Into<String>) -> Self {
258 self.route_id = Some(id.into());
259 self
260 }
261
262 pub fn auto_startup(mut self, auto: bool) -> Self {
266 self.auto_startup = Some(auto);
267 self
268 }
269
270 pub fn startup_order(mut self, order: i32) -> Self {
274 self.startup_order = Some(order);
275 self
276 }
277
278 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
284 SplitBuilder {
285 parent: self,
286 config,
287 steps: Vec::new(),
288 }
289 }
290
291 pub fn multicast(self) -> MulticastBuilder {
297 MulticastBuilder {
298 parent: self,
299 steps: Vec::new(),
300 config: MulticastConfig::new(),
301 }
302 }
303
304 pub fn build(self) -> Result<RouteDefinition, CamelError> {
306 if self.from_uri.is_empty() {
307 return Err(CamelError::RouteError(
308 "route must have a 'from' URI".to_string(),
309 ));
310 }
311 let route_id = self.route_id.ok_or_else(|| {
312 CamelError::RouteError(
313 "route must have a 'route_id' — call .route_id(\"name\") on the builder"
314 .to_string(),
315 )
316 })?;
317 let definition = RouteDefinition::new(self.from_uri, self.steps);
318 let definition = if let Some(eh) = self.error_handler {
319 definition.with_error_handler(eh)
320 } else {
321 definition
322 };
323 let definition = if let Some(cb) = self.circuit_breaker_config {
324 definition.with_circuit_breaker(cb)
325 } else {
326 definition
327 };
328 let definition = if let Some(concurrency) = self.concurrency {
329 definition.with_concurrency(concurrency)
330 } else {
331 definition
332 };
333 let definition = definition.with_route_id(route_id);
334 let definition = if let Some(auto) = self.auto_startup {
335 definition.with_auto_startup(auto)
336 } else {
337 definition
338 };
339 let definition = if let Some(order) = self.startup_order {
340 definition.with_startup_order(order)
341 } else {
342 definition
343 };
344 Ok(definition)
345 }
346}
347
348impl StepAccumulator for RouteBuilder {
349 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
350 &mut self.steps
351 }
352}
353
354pub struct SplitBuilder {
362 parent: RouteBuilder,
363 config: SplitterConfig,
364 steps: Vec<BuilderStep>,
365}
366
367impl SplitBuilder {
368 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
370 where
371 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
372 {
373 FilterInSplitBuilder {
374 parent: self,
375 predicate: std::sync::Arc::new(predicate),
376 steps: vec![],
377 }
378 }
379
380 pub fn end_split(mut self) -> RouteBuilder {
383 let split_step = BuilderStep::Split {
384 config: self.config,
385 steps: self.steps,
386 };
387 self.parent.steps.push(split_step);
388 self.parent
389 }
390}
391
392impl StepAccumulator for SplitBuilder {
393 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
394 &mut self.steps
395 }
396}
397
398pub struct FilterBuilder {
400 parent: RouteBuilder,
401 predicate: FilterPredicate,
402 steps: Vec<BuilderStep>,
403}
404
405impl FilterBuilder {
406 pub fn end_filter(mut self) -> RouteBuilder {
409 let step = BuilderStep::Filter {
410 predicate: self.predicate,
411 steps: self.steps,
412 };
413 self.parent.steps.push(step);
414 self.parent
415 }
416}
417
418impl StepAccumulator for FilterBuilder {
419 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
420 &mut self.steps
421 }
422}
423
424pub struct FilterInSplitBuilder {
426 parent: SplitBuilder,
427 predicate: FilterPredicate,
428 steps: Vec<BuilderStep>,
429}
430
431impl FilterInSplitBuilder {
432 pub fn end_filter(mut self) -> SplitBuilder {
434 let step = BuilderStep::Filter {
435 predicate: self.predicate,
436 steps: self.steps,
437 };
438 self.parent.steps.push(step);
439 self.parent
440 }
441}
442
443impl StepAccumulator for FilterInSplitBuilder {
444 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
445 &mut self.steps
446 }
447}
448
449pub struct ChoiceBuilder {
456 parent: RouteBuilder,
457 whens: Vec<WhenStep>,
458 _otherwise: Option<Vec<BuilderStep>>,
459}
460
461impl ChoiceBuilder {
462 pub fn when<F>(self, predicate: F) -> WhenBuilder
465 where
466 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
467 {
468 WhenBuilder {
469 parent: self,
470 predicate: std::sync::Arc::new(predicate),
471 steps: vec![],
472 }
473 }
474
475 pub fn otherwise(self) -> OtherwiseBuilder {
479 OtherwiseBuilder {
480 parent: self,
481 steps: vec![],
482 }
483 }
484
485 pub fn end_choice(mut self) -> RouteBuilder {
489 let step = BuilderStep::Choice {
490 whens: self.whens,
491 otherwise: self._otherwise,
492 };
493 self.parent.steps.push(step);
494 self.parent
495 }
496}
497
498pub struct WhenBuilder {
500 parent: ChoiceBuilder,
501 predicate: camel_api::FilterPredicate,
502 steps: Vec<BuilderStep>,
503}
504
505impl WhenBuilder {
506 pub fn end_when(mut self) -> ChoiceBuilder {
509 self.parent.whens.push(WhenStep {
510 predicate: self.predicate,
511 steps: self.steps,
512 });
513 self.parent
514 }
515}
516
517impl StepAccumulator for WhenBuilder {
518 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
519 &mut self.steps
520 }
521}
522
523pub struct OtherwiseBuilder {
525 parent: ChoiceBuilder,
526 steps: Vec<BuilderStep>,
527}
528
529impl OtherwiseBuilder {
530 pub fn end_otherwise(self) -> ChoiceBuilder {
532 let OtherwiseBuilder { mut parent, steps } = self;
533 parent._otherwise = Some(steps);
534 parent
535 }
536}
537
538impl StepAccumulator for OtherwiseBuilder {
539 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
540 &mut self.steps
541 }
542}
543
544pub struct MulticastBuilder {
552 parent: RouteBuilder,
553 steps: Vec<BuilderStep>,
554 config: MulticastConfig,
555}
556
557impl MulticastBuilder {
558 pub fn parallel(mut self, parallel: bool) -> Self {
559 self.config = self.config.parallel(parallel);
560 self
561 }
562
563 pub fn parallel_limit(mut self, limit: usize) -> Self {
564 self.config = self.config.parallel_limit(limit);
565 self
566 }
567
568 pub fn stop_on_exception(mut self, stop: bool) -> Self {
569 self.config = self.config.stop_on_exception(stop);
570 self
571 }
572
573 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
574 self.config = self.config.timeout(duration);
575 self
576 }
577
578 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
579 self.config = self.config.aggregation(strategy);
580 self
581 }
582
583 pub fn end_multicast(mut self) -> RouteBuilder {
584 let step = BuilderStep::Multicast {
585 steps: self.steps,
586 config: self.config,
587 };
588 self.parent.steps.push(step);
589 self.parent
590 }
591}
592
593impl StepAccumulator for MulticastBuilder {
594 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
595 &mut self.steps
596 }
597}
598
599#[cfg(test)]
604mod tests {
605 use super::*;
606 use camel_api::{Exchange, Message};
607 use camel_core::route::BuilderStep;
608 use tower::{Service, ServiceExt};
609
610 #[test]
611 fn test_builder_from_creates_definition() {
612 let definition = RouteBuilder::from("timer:tick")
613 .route_id("test-route")
614 .build()
615 .unwrap();
616 assert_eq!(definition.from_uri(), "timer:tick");
617 }
618
619 #[test]
620 fn test_builder_empty_from_uri_errors() {
621 let result = RouteBuilder::from("").route_id("test-route").build();
622 assert!(result.is_err());
623 }
624
625 #[test]
626 fn test_builder_to_adds_step() {
627 let definition = RouteBuilder::from("timer:tick")
628 .route_id("test-route")
629 .to("log:info")
630 .build()
631 .unwrap();
632
633 assert_eq!(definition.from_uri(), "timer:tick");
634 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
636 }
637
638 #[test]
639 fn test_builder_filter_adds_filter_step() {
640 let definition = RouteBuilder::from("timer:tick")
641 .route_id("test-route")
642 .filter(|_ex| true)
643 .to("mock:result")
644 .end_filter()
645 .build()
646 .unwrap();
647
648 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
649 }
650
651 #[test]
652 fn test_builder_set_header_adds_processor_step() {
653 let definition = RouteBuilder::from("timer:tick")
654 .route_id("test-route")
655 .set_header("key", Value::String("value".into()))
656 .build()
657 .unwrap();
658
659 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
660 }
661
662 #[test]
663 fn test_builder_map_body_adds_processor_step() {
664 let definition = RouteBuilder::from("timer:tick")
665 .route_id("test-route")
666 .map_body(|body| body)
667 .build()
668 .unwrap();
669
670 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
671 }
672
673 #[test]
674 fn test_builder_process_adds_processor_step() {
675 let definition = RouteBuilder::from("timer:tick")
676 .route_id("test-route")
677 .process(|ex| async move { Ok(ex) })
678 .build()
679 .unwrap();
680
681 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
682 }
683
684 #[test]
685 fn test_builder_chain_multiple_steps() {
686 let definition = RouteBuilder::from("timer:tick")
687 .route_id("test-route")
688 .set_header("source", Value::String("timer".into()))
689 .filter(|ex| ex.input.header("source").is_some())
690 .to("log:info")
691 .end_filter()
692 .to("mock:result")
693 .build()
694 .unwrap();
695
696 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
700 }
701
702 #[tokio::test]
707 async fn test_set_header_processor_works() {
708 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
709 let exchange = Exchange::new(Message::new("test"));
710 let result = svc.call(exchange).await.unwrap();
711 assert_eq!(
712 result.input.header("greeting"),
713 Some(&Value::String("hello".into()))
714 );
715 }
716
717 #[tokio::test]
718 async fn test_filter_processor_passes() {
719 use camel_api::BoxProcessorExt;
720 use camel_processor::FilterService;
721
722 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
723 let mut svc =
724 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
725 let exchange = Exchange::new(Message::new("pass"));
726 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
727 assert_eq!(result.input.body.as_text(), Some("pass"));
728 }
729
730 #[tokio::test]
731 async fn test_filter_processor_blocks() {
732 use camel_api::BoxProcessorExt;
733 use camel_processor::FilterService;
734
735 let sub = BoxProcessor::from_fn(|_ex| {
736 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
737 });
738 let mut svc =
739 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
740 let exchange = Exchange::new(Message::new("reject"));
741 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
742 assert_eq!(result.input.body.as_text(), Some("reject"));
743 }
744
745 #[tokio::test]
746 async fn test_map_body_processor_works() {
747 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
748 if let Some(text) = body.as_text() {
749 Body::Text(text.to_uppercase())
750 } else {
751 body
752 }
753 });
754 let exchange = Exchange::new(Message::new("hello"));
755 let result = mapper.oneshot(exchange).await.unwrap();
756 assert_eq!(result.input.body.as_text(), Some("HELLO"));
757 }
758
759 #[tokio::test]
760 async fn test_process_custom_processor_works() {
761 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
762 ex.set_property("custom", Value::Bool(true));
763 Ok(ex)
764 });
765 let exchange = Exchange::new(Message::default());
766 let result = processor.oneshot(exchange).await.unwrap();
767 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
768 }
769
770 #[tokio::test]
775 async fn test_compose_pipeline_runs_steps_in_order() {
776 use camel_core::route::compose_pipeline;
777
778 let processors = vec![
779 BoxProcessor::new(SetHeader::new(
780 IdentityProcessor,
781 "step",
782 Value::String("one".into()),
783 )),
784 BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
785 if let Some(text) = body.as_text() {
786 Body::Text(format!("{}-processed", text))
787 } else {
788 body
789 }
790 })),
791 ];
792
793 let pipeline = compose_pipeline(processors);
794 let exchange = Exchange::new(Message::new("hello"));
795 let result = pipeline.oneshot(exchange).await.unwrap();
796
797 assert_eq!(
798 result.input.header("step"),
799 Some(&Value::String("one".into()))
800 );
801 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
802 }
803
804 #[tokio::test]
805 async fn test_compose_pipeline_empty_is_identity() {
806 use camel_core::route::compose_pipeline;
807
808 let pipeline = compose_pipeline(vec![]);
809 let exchange = Exchange::new(Message::new("unchanged"));
810 let result = pipeline.oneshot(exchange).await.unwrap();
811 assert_eq!(result.input.body.as_text(), Some("unchanged"));
812 }
813
814 #[test]
819 fn test_builder_circuit_breaker_sets_config() {
820 use camel_api::circuit_breaker::CircuitBreakerConfig;
821
822 let config = CircuitBreakerConfig::new().failure_threshold(5);
823 let definition = RouteBuilder::from("timer:tick")
824 .route_id("test-route")
825 .circuit_breaker(config)
826 .build()
827 .unwrap();
828
829 let cb = definition
830 .circuit_breaker_config()
831 .expect("circuit breaker should be set");
832 assert_eq!(cb.failure_threshold, 5);
833 }
834
835 #[test]
836 fn test_builder_circuit_breaker_with_error_handler() {
837 use camel_api::circuit_breaker::CircuitBreakerConfig;
838 use camel_api::error_handler::ErrorHandlerConfig;
839
840 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
841 let eh_config = ErrorHandlerConfig::log_only();
842
843 let definition = RouteBuilder::from("timer:tick")
844 .route_id("test-route")
845 .to("log:info")
846 .circuit_breaker(cb_config)
847 .error_handler(eh_config)
848 .build()
849 .unwrap();
850
851 assert!(
852 definition.circuit_breaker_config().is_some(),
853 "circuit breaker config should be set"
854 );
855 }
857
858 #[test]
861 fn test_split_builder_typestate() {
862 use camel_api::splitter::{SplitterConfig, split_body_lines};
863
864 let definition = RouteBuilder::from("timer:test?period=1000")
866 .route_id("test-route")
867 .split(SplitterConfig::new(split_body_lines()))
868 .to("mock:per-fragment")
869 .end_split()
870 .to("mock:final")
871 .build()
872 .unwrap();
873
874 assert_eq!(definition.steps().len(), 2);
876 }
877
878 #[test]
879 fn test_split_builder_steps_collected() {
880 use camel_api::splitter::{SplitterConfig, split_body_lines};
881
882 let definition = RouteBuilder::from("timer:test?period=1000")
883 .route_id("test-route")
884 .split(SplitterConfig::new(split_body_lines()))
885 .set_header("fragment", Value::String("yes".into()))
886 .to("mock:per-fragment")
887 .end_split()
888 .build()
889 .unwrap();
890
891 assert_eq!(definition.steps().len(), 1);
893 match &definition.steps()[0] {
894 BuilderStep::Split { steps, .. } => {
895 assert_eq!(steps.len(), 2); }
897 other => panic!("Expected Split, got {:?}", other),
898 }
899 }
900
901 #[test]
902 fn test_split_builder_config_propagated() {
903 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
904
905 let definition = RouteBuilder::from("timer:test?period=1000")
906 .route_id("test-route")
907 .split(
908 SplitterConfig::new(split_body_lines())
909 .parallel(true)
910 .parallel_limit(4)
911 .aggregation(AggregationStrategy::CollectAll),
912 )
913 .to("mock:per-fragment")
914 .end_split()
915 .build()
916 .unwrap();
917
918 match &definition.steps()[0] {
919 BuilderStep::Split { config, .. } => {
920 assert!(config.parallel);
921 assert_eq!(config.parallel_limit, Some(4));
922 assert!(matches!(
923 config.aggregation,
924 AggregationStrategy::CollectAll
925 ));
926 }
927 other => panic!("Expected Split, got {:?}", other),
928 }
929 }
930
931 #[test]
932 fn test_aggregate_builder_adds_step() {
933 use camel_api::aggregator::AggregatorConfig;
934 use camel_core::route::BuilderStep;
935
936 let definition = RouteBuilder::from("timer:tick")
937 .route_id("test-route")
938 .aggregate(
939 AggregatorConfig::correlate_by("key")
940 .complete_when_size(2)
941 .build(),
942 )
943 .build()
944 .unwrap();
945
946 assert_eq!(definition.steps().len(), 1);
947 assert!(matches!(
948 definition.steps()[0],
949 BuilderStep::Aggregate { .. }
950 ));
951 }
952
953 #[test]
954 fn test_aggregate_in_split_builder() {
955 use camel_api::aggregator::AggregatorConfig;
956 use camel_api::splitter::{SplitterConfig, split_body_lines};
957 use camel_core::route::BuilderStep;
958
959 let definition = RouteBuilder::from("timer:tick")
960 .route_id("test-route")
961 .split(SplitterConfig::new(split_body_lines()))
962 .aggregate(
963 AggregatorConfig::correlate_by("key")
964 .complete_when_size(1)
965 .build(),
966 )
967 .end_split()
968 .build()
969 .unwrap();
970
971 assert_eq!(definition.steps().len(), 1);
972 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
973 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
974 } else {
975 panic!("expected Split step");
976 }
977 }
978
979 #[test]
982 fn test_builder_set_body_static_adds_processor() {
983 let definition = RouteBuilder::from("timer:tick")
984 .route_id("test-route")
985 .set_body("fixed")
986 .build()
987 .unwrap();
988 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
989 }
990
991 #[test]
992 fn test_builder_set_body_fn_adds_processor() {
993 let definition = RouteBuilder::from("timer:tick")
994 .route_id("test-route")
995 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
996 .build()
997 .unwrap();
998 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
999 }
1000
1001 #[test]
1002 fn test_builder_set_header_fn_adds_processor() {
1003 let definition = RouteBuilder::from("timer:tick")
1004 .route_id("test-route")
1005 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
1006 .build()
1007 .unwrap();
1008 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1009 }
1010
1011 #[tokio::test]
1012 async fn test_set_body_static_processor_works() {
1013 use camel_core::route::compose_pipeline;
1014 let def = RouteBuilder::from("t:t")
1015 .route_id("test-route")
1016 .set_body("replaced")
1017 .build()
1018 .unwrap();
1019 let pipeline = compose_pipeline(
1020 def.steps()
1021 .iter()
1022 .filter_map(|s| {
1023 if let BuilderStep::Processor(p) = s {
1024 Some(p.clone())
1025 } else {
1026 None
1027 }
1028 })
1029 .collect(),
1030 );
1031 let exchange = Exchange::new(Message::new("original"));
1032 let result = pipeline.oneshot(exchange).await.unwrap();
1033 assert_eq!(result.input.body.as_text(), Some("replaced"));
1034 }
1035
1036 #[tokio::test]
1037 async fn test_set_body_fn_processor_works() {
1038 use camel_core::route::compose_pipeline;
1039 let def = RouteBuilder::from("t:t")
1040 .route_id("test-route")
1041 .set_body_fn(|ex: &Exchange| {
1042 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
1043 })
1044 .build()
1045 .unwrap();
1046 let pipeline = compose_pipeline(
1047 def.steps()
1048 .iter()
1049 .filter_map(|s| {
1050 if let BuilderStep::Processor(p) = s {
1051 Some(p.clone())
1052 } else {
1053 None
1054 }
1055 })
1056 .collect(),
1057 );
1058 let exchange = Exchange::new(Message::new("hello"));
1059 let result = pipeline.oneshot(exchange).await.unwrap();
1060 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1061 }
1062
1063 #[tokio::test]
1064 async fn test_set_header_fn_processor_works() {
1065 use camel_core::route::compose_pipeline;
1066 let def = RouteBuilder::from("t:t")
1067 .route_id("test-route")
1068 .set_header_fn("echo", |ex: &Exchange| {
1069 ex.input
1070 .body
1071 .as_text()
1072 .map(|t| Value::String(t.into()))
1073 .unwrap_or(Value::Null)
1074 })
1075 .build()
1076 .unwrap();
1077 let pipeline = compose_pipeline(
1078 def.steps()
1079 .iter()
1080 .filter_map(|s| {
1081 if let BuilderStep::Processor(p) = s {
1082 Some(p.clone())
1083 } else {
1084 None
1085 }
1086 })
1087 .collect(),
1088 );
1089 let exchange = Exchange::new(Message::new("ping"));
1090 let result = pipeline.oneshot(exchange).await.unwrap();
1091 assert_eq!(
1092 result.input.header("echo"),
1093 Some(&Value::String("ping".into()))
1094 );
1095 }
1096
1097 #[test]
1100 fn test_filter_builder_typestate() {
1101 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1102 .route_id("test-route")
1103 .filter(|_ex| true)
1104 .to("mock:inner")
1105 .end_filter()
1106 .to("mock:outer")
1107 .build();
1108 assert!(result.is_ok());
1109 }
1110
1111 #[test]
1112 fn test_filter_builder_steps_collected() {
1113 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1114 .route_id("test-route")
1115 .filter(|_ex| true)
1116 .to("mock:inner")
1117 .end_filter()
1118 .build()
1119 .unwrap();
1120
1121 assert_eq!(definition.steps().len(), 1);
1122 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1123 }
1124
1125 #[test]
1126 fn test_wire_tap_builder_adds_step() {
1127 let definition = RouteBuilder::from("timer:tick")
1128 .route_id("test-route")
1129 .wire_tap("mock:tap")
1130 .to("mock:result")
1131 .build()
1132 .unwrap();
1133
1134 assert_eq!(definition.steps().len(), 2);
1135 assert!(
1136 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
1137 );
1138 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
1139 }
1140
1141 #[test]
1144 fn test_multicast_builder_typestate() {
1145 let definition = RouteBuilder::from("timer:tick")
1146 .route_id("test-route")
1147 .multicast()
1148 .to("direct:a")
1149 .to("direct:b")
1150 .end_multicast()
1151 .to("mock:result")
1152 .build()
1153 .unwrap();
1154
1155 assert_eq!(definition.steps().len(), 2); }
1157
1158 #[test]
1159 fn test_multicast_builder_steps_collected() {
1160 let definition = RouteBuilder::from("timer:tick")
1161 .route_id("test-route")
1162 .multicast()
1163 .to("direct:a")
1164 .to("direct:b")
1165 .end_multicast()
1166 .build()
1167 .unwrap();
1168
1169 match &definition.steps()[0] {
1170 BuilderStep::Multicast { steps, .. } => {
1171 assert_eq!(steps.len(), 2);
1172 }
1173 other => panic!("Expected Multicast, got {:?}", other),
1174 }
1175 }
1176
1177 #[test]
1180 fn test_builder_concurrent_sets_concurrency() {
1181 use camel_component::ConcurrencyModel;
1182
1183 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1184 .route_id("test-route")
1185 .concurrent(16)
1186 .to("log:info")
1187 .build()
1188 .unwrap();
1189
1190 assert_eq!(
1191 definition.concurrency_override(),
1192 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
1193 );
1194 }
1195
1196 #[test]
1197 fn test_builder_concurrent_zero_means_unbounded() {
1198 use camel_component::ConcurrencyModel;
1199
1200 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1201 .route_id("test-route")
1202 .concurrent(0)
1203 .to("log:info")
1204 .build()
1205 .unwrap();
1206
1207 assert_eq!(
1208 definition.concurrency_override(),
1209 Some(&ConcurrencyModel::Concurrent { max: None })
1210 );
1211 }
1212
1213 #[test]
1214 fn test_builder_sequential_sets_concurrency() {
1215 use camel_component::ConcurrencyModel;
1216
1217 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1218 .route_id("test-route")
1219 .sequential()
1220 .to("log:info")
1221 .build()
1222 .unwrap();
1223
1224 assert_eq!(
1225 definition.concurrency_override(),
1226 Some(&ConcurrencyModel::Sequential)
1227 );
1228 }
1229
1230 #[test]
1231 fn test_builder_default_concurrency_is_none() {
1232 let definition = RouteBuilder::from("timer:tick")
1233 .route_id("test-route")
1234 .to("log:info")
1235 .build()
1236 .unwrap();
1237
1238 assert_eq!(definition.concurrency_override(), None);
1239 }
1240
1241 #[test]
1244 fn test_builder_route_id_sets_id() {
1245 let definition = RouteBuilder::from("timer:tick")
1246 .route_id("my-route")
1247 .build()
1248 .unwrap();
1249
1250 assert_eq!(definition.route_id(), "my-route");
1251 }
1252
1253 #[test]
1254 fn test_build_without_route_id_fails() {
1255 let result = RouteBuilder::from("timer:tick?period=1000")
1256 .to("log:info")
1257 .build();
1258 let err = match result {
1259 Err(e) => e.to_string(),
1260 Ok(_) => panic!("build() should fail without route_id"),
1261 };
1262 assert!(
1263 err.contains("route_id"),
1264 "error should mention route_id, got: {}",
1265 err
1266 );
1267 }
1268
1269 #[test]
1270 fn test_builder_auto_startup_false() {
1271 let definition = RouteBuilder::from("timer:tick")
1272 .route_id("test-route")
1273 .auto_startup(false)
1274 .build()
1275 .unwrap();
1276
1277 assert!(!definition.auto_startup());
1278 }
1279
1280 #[test]
1281 fn test_builder_startup_order_custom() {
1282 let definition = RouteBuilder::from("timer:tick")
1283 .route_id("test-route")
1284 .startup_order(50)
1285 .build()
1286 .unwrap();
1287
1288 assert_eq!(definition.startup_order(), 50);
1289 }
1290
1291 #[test]
1292 fn test_builder_defaults() {
1293 let definition = RouteBuilder::from("timer:tick")
1294 .route_id("test-route")
1295 .build()
1296 .unwrap();
1297
1298 assert_eq!(definition.route_id(), "test-route");
1299 assert!(definition.auto_startup());
1300 assert_eq!(definition.startup_order(), 1000);
1301 }
1302
1303 #[test]
1306 fn test_choice_builder_single_when() {
1307 let definition = RouteBuilder::from("timer:tick")
1308 .route_id("test-route")
1309 .choice()
1310 .when(|ex: &Exchange| ex.input.header("type").is_some())
1311 .to("mock:typed")
1312 .end_when()
1313 .end_choice()
1314 .build()
1315 .unwrap();
1316 assert_eq!(definition.steps().len(), 1);
1317 assert!(
1318 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
1319 if whens.len() == 1 && otherwise.is_none())
1320 );
1321 }
1322
1323 #[test]
1324 fn test_choice_builder_when_otherwise() {
1325 let definition = RouteBuilder::from("timer:tick")
1326 .route_id("test-route")
1327 .choice()
1328 .when(|ex: &Exchange| ex.input.header("a").is_some())
1329 .to("mock:a")
1330 .end_when()
1331 .otherwise()
1332 .to("mock:fallback")
1333 .end_otherwise()
1334 .end_choice()
1335 .build()
1336 .unwrap();
1337 assert!(
1338 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
1339 if whens.len() == 1 && otherwise.is_some())
1340 );
1341 }
1342
1343 #[test]
1344 fn test_choice_builder_multiple_whens() {
1345 let definition = RouteBuilder::from("timer:tick")
1346 .route_id("test-route")
1347 .choice()
1348 .when(|ex: &Exchange| ex.input.header("a").is_some())
1349 .to("mock:a")
1350 .end_when()
1351 .when(|ex: &Exchange| ex.input.header("b").is_some())
1352 .to("mock:b")
1353 .end_when()
1354 .end_choice()
1355 .build()
1356 .unwrap();
1357 assert!(
1358 matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
1359 if whens.len() == 2)
1360 );
1361 }
1362
1363 #[test]
1364 fn test_choice_step_after_choice() {
1365 let definition = RouteBuilder::from("timer:tick")
1367 .route_id("test-route")
1368 .choice()
1369 .when(|_ex: &Exchange| true)
1370 .to("mock:inner")
1371 .end_when()
1372 .end_choice()
1373 .to("mock:outer") .build()
1375 .unwrap();
1376 assert_eq!(definition.steps().len(), 2);
1377 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
1378 }
1379}