1use camel_api::aggregator::AggregatorConfig;
2use camel_api::body::Body;
3use camel_api::circuit_breaker::CircuitBreakerConfig;
4use camel_api::error_handler::ErrorHandlerConfig;
5use camel_api::multicast::{MulticastConfig, MulticastStrategy};
6use camel_api::splitter::SplitterConfig;
7use camel_api::{
8 BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor, ProcessorFn, Value,
9};
10use camel_component::ConcurrencyModel;
11use camel_core::route::{BuilderStep, RouteDefinition, WhenStep};
12use camel_processor::{DynamicSetHeader, LogLevel, MapBody, SetBody, SetHeader, StopService};
13
14pub trait StepAccumulator: Sized {
20 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
21
22 fn to(mut self, endpoint: impl Into<String>) -> Self {
23 self.steps_mut().push(BuilderStep::To(endpoint.into()));
24 self
25 }
26
27 fn process<F, Fut>(mut self, f: F) -> Self
28 where
29 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
30 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
31 {
32 let svc = ProcessorFn::new(f);
33 self.steps_mut()
34 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
35 self
36 }
37
38 fn process_fn(mut self, processor: BoxProcessor) -> Self {
39 self.steps_mut().push(BuilderStep::Processor(processor));
40 self
41 }
42
43 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
44 let svc = SetHeader::new(IdentityProcessor, key, value);
45 self.steps_mut()
46 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
47 self
48 }
49
50 fn map_body<F>(mut self, mapper: F) -> Self
51 where
52 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
53 {
54 let svc = MapBody::new(IdentityProcessor, mapper);
55 self.steps_mut()
56 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
57 self
58 }
59
60 fn set_body<B>(mut self, body: B) -> Self
61 where
62 B: Into<Body> + Clone + Send + Sync + 'static,
63 {
64 let body: Body = body.into();
65 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
66 self.steps_mut()
67 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
68 self
69 }
70
71 fn set_body_fn<F>(mut self, expr: F) -> Self
72 where
73 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
74 {
75 let svc = SetBody::new(IdentityProcessor, expr);
76 self.steps_mut()
77 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
78 self
79 }
80
81 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
82 where
83 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
84 {
85 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
86 self.steps_mut()
87 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
88 self
89 }
90
91 fn aggregate(mut self, config: AggregatorConfig) -> Self {
92 self.steps_mut().push(BuilderStep::Aggregate { config });
93 self
94 }
95
96 fn stop(mut self) -> Self {
102 self.steps_mut()
103 .push(BuilderStep::Processor(BoxProcessor::new(StopService)));
104 self
105 }
106
107 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
111 use camel_processor::LogProcessor;
112 let svc = LogProcessor::new(level, message.into());
113 self.steps_mut()
114 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
115 self
116 }
117}
118
119pub struct RouteBuilder {
131 from_uri: String,
132 steps: Vec<BuilderStep>,
133 error_handler: Option<ErrorHandlerConfig>,
134 circuit_breaker_config: Option<CircuitBreakerConfig>,
135 concurrency: Option<ConcurrencyModel>,
136 route_id: Option<String>,
137 auto_startup: Option<bool>,
138 startup_order: Option<i32>,
139}
140
141impl RouteBuilder {
142 pub fn from(endpoint: &str) -> Self {
144 Self {
145 from_uri: endpoint.to_string(),
146 steps: Vec::new(),
147 error_handler: None,
148 circuit_breaker_config: None,
149 concurrency: None,
150 route_id: None,
151 auto_startup: None,
152 startup_order: None,
153 }
154 }
155
156 pub fn filter<F>(self, predicate: F) -> FilterBuilder
160 where
161 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
162 {
163 FilterBuilder {
164 parent: self,
165 predicate: std::sync::Arc::new(predicate),
166 steps: vec![],
167 }
168 }
169
170 pub fn choice(self) -> ChoiceBuilder {
176 ChoiceBuilder {
177 parent: self,
178 whens: vec![],
179 _otherwise: None,
180 }
181 }
182
183 pub fn wire_tap(mut self, endpoint: &str) -> Self {
187 self.steps.push(BuilderStep::WireTap {
188 uri: endpoint.to_string(),
189 });
190 self
191 }
192
193 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
195 self.error_handler = Some(config);
196 self
197 }
198
199 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
201 self.circuit_breaker_config = Some(config);
202 self
203 }
204
205 pub fn concurrent(mut self, max: usize) -> Self {
219 let max = if max == 0 { None } else { Some(max) };
220 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
221 self
222 }
223
224 pub fn sequential(mut self) -> Self {
229 self.concurrency = Some(ConcurrencyModel::Sequential);
230 self
231 }
232
233 pub fn route_id(mut self, id: impl Into<String>) -> Self {
237 self.route_id = Some(id.into());
238 self
239 }
240
241 pub fn auto_startup(mut self, auto: bool) -> Self {
245 self.auto_startup = Some(auto);
246 self
247 }
248
249 pub fn startup_order(mut self, order: i32) -> Self {
253 self.startup_order = Some(order);
254 self
255 }
256
257 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
263 SplitBuilder {
264 parent: self,
265 config,
266 steps: Vec::new(),
267 }
268 }
269
270 pub fn multicast(self) -> MulticastBuilder {
276 MulticastBuilder {
277 parent: self,
278 steps: Vec::new(),
279 config: MulticastConfig::new(),
280 }
281 }
282
283 pub fn build(self) -> Result<RouteDefinition, CamelError> {
285 if self.from_uri.is_empty() {
286 return Err(CamelError::RouteError(
287 "route must have a 'from' URI".to_string(),
288 ));
289 }
290 let route_id = self.route_id.ok_or_else(|| {
291 CamelError::RouteError(
292 "route must have a 'route_id' — call .route_id(\"name\") on the builder"
293 .to_string(),
294 )
295 })?;
296 let definition = RouteDefinition::new(self.from_uri, self.steps);
297 let definition = if let Some(eh) = self.error_handler {
298 definition.with_error_handler(eh)
299 } else {
300 definition
301 };
302 let definition = if let Some(cb) = self.circuit_breaker_config {
303 definition.with_circuit_breaker(cb)
304 } else {
305 definition
306 };
307 let definition = if let Some(concurrency) = self.concurrency {
308 definition.with_concurrency(concurrency)
309 } else {
310 definition
311 };
312 let definition = definition.with_route_id(route_id);
313 let definition = if let Some(auto) = self.auto_startup {
314 definition.with_auto_startup(auto)
315 } else {
316 definition
317 };
318 let definition = if let Some(order) = self.startup_order {
319 definition.with_startup_order(order)
320 } else {
321 definition
322 };
323 Ok(definition)
324 }
325}
326
327impl StepAccumulator for RouteBuilder {
328 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
329 &mut self.steps
330 }
331}
332
333pub struct SplitBuilder {
341 parent: RouteBuilder,
342 config: SplitterConfig,
343 steps: Vec<BuilderStep>,
344}
345
346impl SplitBuilder {
347 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
349 where
350 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
351 {
352 FilterInSplitBuilder {
353 parent: self,
354 predicate: std::sync::Arc::new(predicate),
355 steps: vec![],
356 }
357 }
358
359 pub fn end_split(mut self) -> RouteBuilder {
362 let split_step = BuilderStep::Split {
363 config: self.config,
364 steps: self.steps,
365 };
366 self.parent.steps.push(split_step);
367 self.parent
368 }
369}
370
371impl StepAccumulator for SplitBuilder {
372 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
373 &mut self.steps
374 }
375}
376
377pub struct FilterBuilder {
379 parent: RouteBuilder,
380 predicate: FilterPredicate,
381 steps: Vec<BuilderStep>,
382}
383
384impl FilterBuilder {
385 pub fn end_filter(mut self) -> RouteBuilder {
388 let step = BuilderStep::Filter {
389 predicate: self.predicate,
390 steps: self.steps,
391 };
392 self.parent.steps.push(step);
393 self.parent
394 }
395}
396
397impl StepAccumulator for FilterBuilder {
398 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
399 &mut self.steps
400 }
401}
402
403pub struct FilterInSplitBuilder {
405 parent: SplitBuilder,
406 predicate: FilterPredicate,
407 steps: Vec<BuilderStep>,
408}
409
410impl FilterInSplitBuilder {
411 pub fn end_filter(mut self) -> SplitBuilder {
413 let step = BuilderStep::Filter {
414 predicate: self.predicate,
415 steps: self.steps,
416 };
417 self.parent.steps.push(step);
418 self.parent
419 }
420}
421
422impl StepAccumulator for FilterInSplitBuilder {
423 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
424 &mut self.steps
425 }
426}
427
428pub struct ChoiceBuilder {
435 parent: RouteBuilder,
436 whens: Vec<WhenStep>,
437 _otherwise: Option<Vec<BuilderStep>>,
438}
439
440impl ChoiceBuilder {
441 pub fn when<F>(self, predicate: F) -> WhenBuilder
444 where
445 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
446 {
447 WhenBuilder {
448 parent: self,
449 predicate: std::sync::Arc::new(predicate),
450 steps: vec![],
451 }
452 }
453
454 pub fn otherwise(self) -> OtherwiseBuilder {
458 OtherwiseBuilder {
459 parent: self,
460 steps: vec![],
461 }
462 }
463
464 pub fn end_choice(mut self) -> RouteBuilder {
468 let step = BuilderStep::Choice {
469 whens: self.whens,
470 otherwise: self._otherwise,
471 };
472 self.parent.steps.push(step);
473 self.parent
474 }
475}
476
477pub struct WhenBuilder {
479 parent: ChoiceBuilder,
480 predicate: camel_api::FilterPredicate,
481 steps: Vec<BuilderStep>,
482}
483
484impl WhenBuilder {
485 pub fn end_when(mut self) -> ChoiceBuilder {
488 self.parent.whens.push(WhenStep {
489 predicate: self.predicate,
490 steps: self.steps,
491 });
492 self.parent
493 }
494}
495
496impl StepAccumulator for WhenBuilder {
497 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
498 &mut self.steps
499 }
500}
501
502pub struct OtherwiseBuilder {
504 parent: ChoiceBuilder,
505 steps: Vec<BuilderStep>,
506}
507
508impl OtherwiseBuilder {
509 pub fn end_otherwise(self) -> ChoiceBuilder {
511 let OtherwiseBuilder { mut parent, steps } = self;
512 parent._otherwise = Some(steps);
513 parent
514 }
515}
516
517impl StepAccumulator for OtherwiseBuilder {
518 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
519 &mut self.steps
520 }
521}
522
523pub struct MulticastBuilder {
531 parent: RouteBuilder,
532 steps: Vec<BuilderStep>,
533 config: MulticastConfig,
534}
535
536impl MulticastBuilder {
537 pub fn parallel(mut self, parallel: bool) -> Self {
538 self.config = self.config.parallel(parallel);
539 self
540 }
541
542 pub fn parallel_limit(mut self, limit: usize) -> Self {
543 self.config = self.config.parallel_limit(limit);
544 self
545 }
546
547 pub fn stop_on_exception(mut self, stop: bool) -> Self {
548 self.config = self.config.stop_on_exception(stop);
549 self
550 }
551
552 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
553 self.config = self.config.timeout(duration);
554 self
555 }
556
557 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
558 self.config = self.config.aggregation(strategy);
559 self
560 }
561
562 pub fn end_multicast(mut self) -> RouteBuilder {
563 let step = BuilderStep::Multicast {
564 steps: self.steps,
565 config: self.config,
566 };
567 self.parent.steps.push(step);
568 self.parent
569 }
570}
571
572impl StepAccumulator for MulticastBuilder {
573 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
574 &mut self.steps
575 }
576}
577
578#[cfg(test)]
583mod tests {
584 use super::*;
585 use camel_api::{Exchange, Message};
586 use camel_core::route::BuilderStep;
587 use tower::{Service, ServiceExt};
588
589 #[test]
590 fn test_builder_from_creates_definition() {
591 let definition = RouteBuilder::from("timer:tick")
592 .route_id("test-route")
593 .build()
594 .unwrap();
595 assert_eq!(definition.from_uri(), "timer:tick");
596 }
597
598 #[test]
599 fn test_builder_empty_from_uri_errors() {
600 let result = RouteBuilder::from("").route_id("test-route").build();
601 assert!(result.is_err());
602 }
603
604 #[test]
605 fn test_builder_to_adds_step() {
606 let definition = RouteBuilder::from("timer:tick")
607 .route_id("test-route")
608 .to("log:info")
609 .build()
610 .unwrap();
611
612 assert_eq!(definition.from_uri(), "timer:tick");
613 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
615 }
616
617 #[test]
618 fn test_builder_filter_adds_filter_step() {
619 let definition = RouteBuilder::from("timer:tick")
620 .route_id("test-route")
621 .filter(|_ex| true)
622 .to("mock:result")
623 .end_filter()
624 .build()
625 .unwrap();
626
627 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
628 }
629
630 #[test]
631 fn test_builder_set_header_adds_processor_step() {
632 let definition = RouteBuilder::from("timer:tick")
633 .route_id("test-route")
634 .set_header("key", Value::String("value".into()))
635 .build()
636 .unwrap();
637
638 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
639 }
640
641 #[test]
642 fn test_builder_map_body_adds_processor_step() {
643 let definition = RouteBuilder::from("timer:tick")
644 .route_id("test-route")
645 .map_body(|body| body)
646 .build()
647 .unwrap();
648
649 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
650 }
651
652 #[test]
653 fn test_builder_process_adds_processor_step() {
654 let definition = RouteBuilder::from("timer:tick")
655 .route_id("test-route")
656 .process(|ex| async move { Ok(ex) })
657 .build()
658 .unwrap();
659
660 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
661 }
662
663 #[test]
664 fn test_builder_chain_multiple_steps() {
665 let definition = RouteBuilder::from("timer:tick")
666 .route_id("test-route")
667 .set_header("source", Value::String("timer".into()))
668 .filter(|ex| ex.input.header("source").is_some())
669 .to("log:info")
670 .end_filter()
671 .to("mock:result")
672 .build()
673 .unwrap();
674
675 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
679 }
680
681 #[tokio::test]
686 async fn test_set_header_processor_works() {
687 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
688 let exchange = Exchange::new(Message::new("test"));
689 let result = svc.call(exchange).await.unwrap();
690 assert_eq!(
691 result.input.header("greeting"),
692 Some(&Value::String("hello".into()))
693 );
694 }
695
696 #[tokio::test]
697 async fn test_filter_processor_passes() {
698 use camel_api::BoxProcessorExt;
699 use camel_processor::FilterService;
700
701 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
702 let mut svc =
703 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
704 let exchange = Exchange::new(Message::new("pass"));
705 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
706 assert_eq!(result.input.body.as_text(), Some("pass"));
707 }
708
709 #[tokio::test]
710 async fn test_filter_processor_blocks() {
711 use camel_api::BoxProcessorExt;
712 use camel_processor::FilterService;
713
714 let sub = BoxProcessor::from_fn(|_ex| {
715 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
716 });
717 let mut svc =
718 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
719 let exchange = Exchange::new(Message::new("reject"));
720 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
721 assert_eq!(result.input.body.as_text(), Some("reject"));
722 }
723
724 #[tokio::test]
725 async fn test_map_body_processor_works() {
726 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
727 if let Some(text) = body.as_text() {
728 Body::Text(text.to_uppercase())
729 } else {
730 body
731 }
732 });
733 let exchange = Exchange::new(Message::new("hello"));
734 let result = mapper.oneshot(exchange).await.unwrap();
735 assert_eq!(result.input.body.as_text(), Some("HELLO"));
736 }
737
738 #[tokio::test]
739 async fn test_process_custom_processor_works() {
740 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
741 ex.set_property("custom", Value::Bool(true));
742 Ok(ex)
743 });
744 let exchange = Exchange::new(Message::default());
745 let result = processor.oneshot(exchange).await.unwrap();
746 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
747 }
748
749 #[tokio::test]
754 async fn test_compose_pipeline_runs_steps_in_order() {
755 use camel_core::route::compose_pipeline;
756
757 let processors = vec![
758 BoxProcessor::new(SetHeader::new(
759 IdentityProcessor,
760 "step",
761 Value::String("one".into()),
762 )),
763 BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
764 if let Some(text) = body.as_text() {
765 Body::Text(format!("{}-processed", text))
766 } else {
767 body
768 }
769 })),
770 ];
771
772 let pipeline = compose_pipeline(processors);
773 let exchange = Exchange::new(Message::new("hello"));
774 let result = pipeline.oneshot(exchange).await.unwrap();
775
776 assert_eq!(
777 result.input.header("step"),
778 Some(&Value::String("one".into()))
779 );
780 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
781 }
782
783 #[tokio::test]
784 async fn test_compose_pipeline_empty_is_identity() {
785 use camel_core::route::compose_pipeline;
786
787 let pipeline = compose_pipeline(vec![]);
788 let exchange = Exchange::new(Message::new("unchanged"));
789 let result = pipeline.oneshot(exchange).await.unwrap();
790 assert_eq!(result.input.body.as_text(), Some("unchanged"));
791 }
792
793 #[test]
798 fn test_builder_circuit_breaker_sets_config() {
799 use camel_api::circuit_breaker::CircuitBreakerConfig;
800
801 let config = CircuitBreakerConfig::new().failure_threshold(5);
802 let definition = RouteBuilder::from("timer:tick")
803 .route_id("test-route")
804 .circuit_breaker(config)
805 .build()
806 .unwrap();
807
808 let cb = definition
809 .circuit_breaker_config()
810 .expect("circuit breaker should be set");
811 assert_eq!(cb.failure_threshold, 5);
812 }
813
814 #[test]
815 fn test_builder_circuit_breaker_with_error_handler() {
816 use camel_api::circuit_breaker::CircuitBreakerConfig;
817 use camel_api::error_handler::ErrorHandlerConfig;
818
819 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
820 let eh_config = ErrorHandlerConfig::log_only();
821
822 let definition = RouteBuilder::from("timer:tick")
823 .route_id("test-route")
824 .to("log:info")
825 .circuit_breaker(cb_config)
826 .error_handler(eh_config)
827 .build()
828 .unwrap();
829
830 assert!(
831 definition.circuit_breaker_config().is_some(),
832 "circuit breaker config should be set"
833 );
834 }
836
837 #[test]
840 fn test_split_builder_typestate() {
841 use camel_api::splitter::{SplitterConfig, split_body_lines};
842
843 let definition = RouteBuilder::from("timer:test?period=1000")
845 .route_id("test-route")
846 .split(SplitterConfig::new(split_body_lines()))
847 .to("mock:per-fragment")
848 .end_split()
849 .to("mock:final")
850 .build()
851 .unwrap();
852
853 assert_eq!(definition.steps().len(), 2);
855 }
856
857 #[test]
858 fn test_split_builder_steps_collected() {
859 use camel_api::splitter::{SplitterConfig, split_body_lines};
860
861 let definition = RouteBuilder::from("timer:test?period=1000")
862 .route_id("test-route")
863 .split(SplitterConfig::new(split_body_lines()))
864 .set_header("fragment", Value::String("yes".into()))
865 .to("mock:per-fragment")
866 .end_split()
867 .build()
868 .unwrap();
869
870 assert_eq!(definition.steps().len(), 1);
872 match &definition.steps()[0] {
873 BuilderStep::Split { steps, .. } => {
874 assert_eq!(steps.len(), 2); }
876 other => panic!("Expected Split, got {:?}", other),
877 }
878 }
879
880 #[test]
881 fn test_split_builder_config_propagated() {
882 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
883
884 let definition = RouteBuilder::from("timer:test?period=1000")
885 .route_id("test-route")
886 .split(
887 SplitterConfig::new(split_body_lines())
888 .parallel(true)
889 .parallel_limit(4)
890 .aggregation(AggregationStrategy::CollectAll),
891 )
892 .to("mock:per-fragment")
893 .end_split()
894 .build()
895 .unwrap();
896
897 match &definition.steps()[0] {
898 BuilderStep::Split { config, .. } => {
899 assert!(config.parallel);
900 assert_eq!(config.parallel_limit, Some(4));
901 assert!(matches!(
902 config.aggregation,
903 AggregationStrategy::CollectAll
904 ));
905 }
906 other => panic!("Expected Split, got {:?}", other),
907 }
908 }
909
910 #[test]
911 fn test_aggregate_builder_adds_step() {
912 use camel_api::aggregator::AggregatorConfig;
913 use camel_core::route::BuilderStep;
914
915 let definition = RouteBuilder::from("timer:tick")
916 .route_id("test-route")
917 .aggregate(
918 AggregatorConfig::correlate_by("key")
919 .complete_when_size(2)
920 .build(),
921 )
922 .build()
923 .unwrap();
924
925 assert_eq!(definition.steps().len(), 1);
926 assert!(matches!(
927 definition.steps()[0],
928 BuilderStep::Aggregate { .. }
929 ));
930 }
931
932 #[test]
933 fn test_aggregate_in_split_builder() {
934 use camel_api::aggregator::AggregatorConfig;
935 use camel_api::splitter::{SplitterConfig, split_body_lines};
936 use camel_core::route::BuilderStep;
937
938 let definition = RouteBuilder::from("timer:tick")
939 .route_id("test-route")
940 .split(SplitterConfig::new(split_body_lines()))
941 .aggregate(
942 AggregatorConfig::correlate_by("key")
943 .complete_when_size(1)
944 .build(),
945 )
946 .end_split()
947 .build()
948 .unwrap();
949
950 assert_eq!(definition.steps().len(), 1);
951 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
952 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
953 } else {
954 panic!("expected Split step");
955 }
956 }
957
958 #[test]
961 fn test_builder_set_body_static_adds_processor() {
962 let definition = RouteBuilder::from("timer:tick")
963 .route_id("test-route")
964 .set_body("fixed")
965 .build()
966 .unwrap();
967 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
968 }
969
970 #[test]
971 fn test_builder_set_body_fn_adds_processor() {
972 let definition = RouteBuilder::from("timer:tick")
973 .route_id("test-route")
974 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
975 .build()
976 .unwrap();
977 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
978 }
979
980 #[test]
981 fn test_builder_set_header_fn_adds_processor() {
982 let definition = RouteBuilder::from("timer:tick")
983 .route_id("test-route")
984 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
985 .build()
986 .unwrap();
987 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
988 }
989
990 #[tokio::test]
991 async fn test_set_body_static_processor_works() {
992 use camel_core::route::compose_pipeline;
993 let def = RouteBuilder::from("t:t")
994 .route_id("test-route")
995 .set_body("replaced")
996 .build()
997 .unwrap();
998 let pipeline = compose_pipeline(
999 def.steps()
1000 .iter()
1001 .filter_map(|s| {
1002 if let BuilderStep::Processor(p) = s {
1003 Some(p.clone())
1004 } else {
1005 None
1006 }
1007 })
1008 .collect(),
1009 );
1010 let exchange = Exchange::new(Message::new("original"));
1011 let result = pipeline.oneshot(exchange).await.unwrap();
1012 assert_eq!(result.input.body.as_text(), Some("replaced"));
1013 }
1014
1015 #[tokio::test]
1016 async fn test_set_body_fn_processor_works() {
1017 use camel_core::route::compose_pipeline;
1018 let def = RouteBuilder::from("t:t")
1019 .route_id("test-route")
1020 .set_body_fn(|ex: &Exchange| {
1021 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
1022 })
1023 .build()
1024 .unwrap();
1025 let pipeline = compose_pipeline(
1026 def.steps()
1027 .iter()
1028 .filter_map(|s| {
1029 if let BuilderStep::Processor(p) = s {
1030 Some(p.clone())
1031 } else {
1032 None
1033 }
1034 })
1035 .collect(),
1036 );
1037 let exchange = Exchange::new(Message::new("hello"));
1038 let result = pipeline.oneshot(exchange).await.unwrap();
1039 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1040 }
1041
1042 #[tokio::test]
1043 async fn test_set_header_fn_processor_works() {
1044 use camel_core::route::compose_pipeline;
1045 let def = RouteBuilder::from("t:t")
1046 .route_id("test-route")
1047 .set_header_fn("echo", |ex: &Exchange| {
1048 ex.input
1049 .body
1050 .as_text()
1051 .map(|t| Value::String(t.into()))
1052 .unwrap_or(Value::Null)
1053 })
1054 .build()
1055 .unwrap();
1056 let pipeline = compose_pipeline(
1057 def.steps()
1058 .iter()
1059 .filter_map(|s| {
1060 if let BuilderStep::Processor(p) = s {
1061 Some(p.clone())
1062 } else {
1063 None
1064 }
1065 })
1066 .collect(),
1067 );
1068 let exchange = Exchange::new(Message::new("ping"));
1069 let result = pipeline.oneshot(exchange).await.unwrap();
1070 assert_eq!(
1071 result.input.header("echo"),
1072 Some(&Value::String("ping".into()))
1073 );
1074 }
1075
1076 #[test]
1079 fn test_filter_builder_typestate() {
1080 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1081 .route_id("test-route")
1082 .filter(|_ex| true)
1083 .to("mock:inner")
1084 .end_filter()
1085 .to("mock:outer")
1086 .build();
1087 assert!(result.is_ok());
1088 }
1089
1090 #[test]
1091 fn test_filter_builder_steps_collected() {
1092 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1093 .route_id("test-route")
1094 .filter(|_ex| true)
1095 .to("mock:inner")
1096 .end_filter()
1097 .build()
1098 .unwrap();
1099
1100 assert_eq!(definition.steps().len(), 1);
1101 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1102 }
1103
1104 #[test]
1105 fn test_wire_tap_builder_adds_step() {
1106 let definition = RouteBuilder::from("timer:tick")
1107 .route_id("test-route")
1108 .wire_tap("mock:tap")
1109 .to("mock:result")
1110 .build()
1111 .unwrap();
1112
1113 assert_eq!(definition.steps().len(), 2);
1114 assert!(
1115 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
1116 );
1117 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
1118 }
1119
1120 #[test]
1123 fn test_multicast_builder_typestate() {
1124 let definition = RouteBuilder::from("timer:tick")
1125 .route_id("test-route")
1126 .multicast()
1127 .to("direct:a")
1128 .to("direct:b")
1129 .end_multicast()
1130 .to("mock:result")
1131 .build()
1132 .unwrap();
1133
1134 assert_eq!(definition.steps().len(), 2); }
1136
1137 #[test]
1138 fn test_multicast_builder_steps_collected() {
1139 let definition = RouteBuilder::from("timer:tick")
1140 .route_id("test-route")
1141 .multicast()
1142 .to("direct:a")
1143 .to("direct:b")
1144 .end_multicast()
1145 .build()
1146 .unwrap();
1147
1148 match &definition.steps()[0] {
1149 BuilderStep::Multicast { steps, .. } => {
1150 assert_eq!(steps.len(), 2);
1151 }
1152 other => panic!("Expected Multicast, got {:?}", other),
1153 }
1154 }
1155
1156 #[test]
1159 fn test_builder_concurrent_sets_concurrency() {
1160 use camel_component::ConcurrencyModel;
1161
1162 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1163 .route_id("test-route")
1164 .concurrent(16)
1165 .to("log:info")
1166 .build()
1167 .unwrap();
1168
1169 assert_eq!(
1170 definition.concurrency_override(),
1171 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
1172 );
1173 }
1174
1175 #[test]
1176 fn test_builder_concurrent_zero_means_unbounded() {
1177 use camel_component::ConcurrencyModel;
1178
1179 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1180 .route_id("test-route")
1181 .concurrent(0)
1182 .to("log:info")
1183 .build()
1184 .unwrap();
1185
1186 assert_eq!(
1187 definition.concurrency_override(),
1188 Some(&ConcurrencyModel::Concurrent { max: None })
1189 );
1190 }
1191
1192 #[test]
1193 fn test_builder_sequential_sets_concurrency() {
1194 use camel_component::ConcurrencyModel;
1195
1196 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1197 .route_id("test-route")
1198 .sequential()
1199 .to("log:info")
1200 .build()
1201 .unwrap();
1202
1203 assert_eq!(
1204 definition.concurrency_override(),
1205 Some(&ConcurrencyModel::Sequential)
1206 );
1207 }
1208
1209 #[test]
1210 fn test_builder_default_concurrency_is_none() {
1211 let definition = RouteBuilder::from("timer:tick")
1212 .route_id("test-route")
1213 .to("log:info")
1214 .build()
1215 .unwrap();
1216
1217 assert_eq!(definition.concurrency_override(), None);
1218 }
1219
1220 #[test]
1223 fn test_builder_route_id_sets_id() {
1224 let definition = RouteBuilder::from("timer:tick")
1225 .route_id("my-route")
1226 .build()
1227 .unwrap();
1228
1229 assert_eq!(definition.route_id(), "my-route");
1230 }
1231
1232 #[test]
1233 fn test_build_without_route_id_fails() {
1234 let result = RouteBuilder::from("timer:tick?period=1000")
1235 .to("log:info")
1236 .build();
1237 let err = match result {
1238 Err(e) => e.to_string(),
1239 Ok(_) => panic!("build() should fail without route_id"),
1240 };
1241 assert!(
1242 err.contains("route_id"),
1243 "error should mention route_id, got: {}",
1244 err
1245 );
1246 }
1247
1248 #[test]
1249 fn test_builder_auto_startup_false() {
1250 let definition = RouteBuilder::from("timer:tick")
1251 .route_id("test-route")
1252 .auto_startup(false)
1253 .build()
1254 .unwrap();
1255
1256 assert!(!definition.auto_startup());
1257 }
1258
1259 #[test]
1260 fn test_builder_startup_order_custom() {
1261 let definition = RouteBuilder::from("timer:tick")
1262 .route_id("test-route")
1263 .startup_order(50)
1264 .build()
1265 .unwrap();
1266
1267 assert_eq!(definition.startup_order(), 50);
1268 }
1269
1270 #[test]
1271 fn test_builder_defaults() {
1272 let definition = RouteBuilder::from("timer:tick")
1273 .route_id("test-route")
1274 .build()
1275 .unwrap();
1276
1277 assert_eq!(definition.route_id(), "test-route");
1278 assert!(definition.auto_startup());
1279 assert_eq!(definition.startup_order(), 1000);
1280 }
1281
1282 #[test]
1285 fn test_choice_builder_single_when() {
1286 let definition = RouteBuilder::from("timer:tick")
1287 .route_id("test-route")
1288 .choice()
1289 .when(|ex: &Exchange| ex.input.header("type").is_some())
1290 .to("mock:typed")
1291 .end_when()
1292 .end_choice()
1293 .build()
1294 .unwrap();
1295 assert_eq!(definition.steps().len(), 1);
1296 assert!(
1297 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
1298 if whens.len() == 1 && otherwise.is_none())
1299 );
1300 }
1301
1302 #[test]
1303 fn test_choice_builder_when_otherwise() {
1304 let definition = RouteBuilder::from("timer:tick")
1305 .route_id("test-route")
1306 .choice()
1307 .when(|ex: &Exchange| ex.input.header("a").is_some())
1308 .to("mock:a")
1309 .end_when()
1310 .otherwise()
1311 .to("mock:fallback")
1312 .end_otherwise()
1313 .end_choice()
1314 .build()
1315 .unwrap();
1316 assert!(
1317 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
1318 if whens.len() == 1 && otherwise.is_some())
1319 );
1320 }
1321
1322 #[test]
1323 fn test_choice_builder_multiple_whens() {
1324 let definition = RouteBuilder::from("timer:tick")
1325 .route_id("test-route")
1326 .choice()
1327 .when(|ex: &Exchange| ex.input.header("a").is_some())
1328 .to("mock:a")
1329 .end_when()
1330 .when(|ex: &Exchange| ex.input.header("b").is_some())
1331 .to("mock:b")
1332 .end_when()
1333 .end_choice()
1334 .build()
1335 .unwrap();
1336 assert!(
1337 matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
1338 if whens.len() == 2)
1339 );
1340 }
1341
1342 #[test]
1343 fn test_choice_step_after_choice() {
1344 let definition = RouteBuilder::from("timer:tick")
1346 .route_id("test-route")
1347 .choice()
1348 .when(|_ex: &Exchange| true)
1349 .to("mock:inner")
1350 .end_when()
1351 .end_choice()
1352 .to("mock:outer") .build()
1354 .unwrap();
1355 assert_eq!(definition.steps().len(), 2);
1356 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
1357 }
1358}