1use camel_api::aggregator::AggregatorConfig;
2use camel_api::body::Body;
3use camel_api::circuit_breaker::CircuitBreakerConfig;
4use camel_api::error_handler::ErrorHandlerConfig;
5use camel_api::multicast::{MulticastConfig, MulticastStrategy};
6use camel_api::splitter::SplitterConfig;
7use camel_api::{
8 BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor, ProcessorFn, Value,
9};
10use camel_component::ConcurrencyModel;
11use camel_core::route::{BuilderStep, RouteDefinition};
12use camel_processor::{DynamicSetHeader, LogLevel, MapBody, SetBody, SetHeader, StopService};
13
14pub trait StepAccumulator: Sized {
20 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
21
22 fn to(mut self, endpoint: impl Into<String>) -> Self {
23 self.steps_mut().push(BuilderStep::To(endpoint.into()));
24 self
25 }
26
27 fn process<F, Fut>(mut self, f: F) -> Self
28 where
29 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
30 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
31 {
32 let svc = ProcessorFn::new(f);
33 self.steps_mut()
34 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
35 self
36 }
37
38 fn process_fn(mut self, processor: BoxProcessor) -> Self {
39 self.steps_mut().push(BuilderStep::Processor(processor));
40 self
41 }
42
43 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
44 let svc = SetHeader::new(IdentityProcessor, key, value);
45 self.steps_mut()
46 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
47 self
48 }
49
50 fn map_body<F>(mut self, mapper: F) -> Self
51 where
52 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
53 {
54 let svc = MapBody::new(IdentityProcessor, mapper);
55 self.steps_mut()
56 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
57 self
58 }
59
60 fn set_body<B>(mut self, body: B) -> Self
61 where
62 B: Into<Body> + Clone + Send + Sync + 'static,
63 {
64 let body: Body = body.into();
65 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
66 self.steps_mut()
67 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
68 self
69 }
70
71 fn set_body_fn<F>(mut self, expr: F) -> Self
72 where
73 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
74 {
75 let svc = SetBody::new(IdentityProcessor, expr);
76 self.steps_mut()
77 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
78 self
79 }
80
81 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
82 where
83 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
84 {
85 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
86 self.steps_mut()
87 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
88 self
89 }
90
91 fn aggregate(mut self, config: AggregatorConfig) -> Self {
92 self.steps_mut().push(BuilderStep::Aggregate { config });
93 self
94 }
95
96 fn stop(mut self) -> Self {
102 self.steps_mut()
103 .push(BuilderStep::Processor(BoxProcessor::new(StopService)));
104 self
105 }
106
107 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
111 use camel_processor::LogProcessor;
112 let svc = LogProcessor::new(level, message.into());
113 self.steps_mut()
114 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
115 self
116 }
117}
118
119pub struct RouteBuilder {
131 from_uri: String,
132 steps: Vec<BuilderStep>,
133 error_handler: Option<ErrorHandlerConfig>,
134 circuit_breaker_config: Option<CircuitBreakerConfig>,
135 concurrency: Option<ConcurrencyModel>,
136 route_id: Option<String>,
137 auto_startup: Option<bool>,
138 startup_order: Option<i32>,
139}
140
141impl RouteBuilder {
142 pub fn from(endpoint: &str) -> Self {
144 Self {
145 from_uri: endpoint.to_string(),
146 steps: Vec::new(),
147 error_handler: None,
148 circuit_breaker_config: None,
149 concurrency: None,
150 route_id: None,
151 auto_startup: None,
152 startup_order: None,
153 }
154 }
155
156 pub fn filter<F>(self, predicate: F) -> FilterBuilder
160 where
161 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
162 {
163 FilterBuilder {
164 parent: self,
165 predicate: std::sync::Arc::new(predicate),
166 steps: vec![],
167 }
168 }
169
170 pub fn wire_tap(mut self, endpoint: &str) -> Self {
174 self.steps.push(BuilderStep::WireTap {
175 uri: endpoint.to_string(),
176 });
177 self
178 }
179
180 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
182 self.error_handler = Some(config);
183 self
184 }
185
186 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
188 self.circuit_breaker_config = Some(config);
189 self
190 }
191
192 pub fn concurrent(mut self, max: usize) -> Self {
206 let max = if max == 0 { None } else { Some(max) };
207 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
208 self
209 }
210
211 pub fn sequential(mut self) -> Self {
216 self.concurrency = Some(ConcurrencyModel::Sequential);
217 self
218 }
219
220 pub fn route_id(mut self, id: impl Into<String>) -> Self {
224 self.route_id = Some(id.into());
225 self
226 }
227
228 pub fn auto_startup(mut self, auto: bool) -> Self {
232 self.auto_startup = Some(auto);
233 self
234 }
235
236 pub fn startup_order(mut self, order: i32) -> Self {
240 self.startup_order = Some(order);
241 self
242 }
243
244 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
250 SplitBuilder {
251 parent: self,
252 config,
253 steps: Vec::new(),
254 }
255 }
256
257 pub fn multicast(self) -> MulticastBuilder {
263 MulticastBuilder {
264 parent: self,
265 steps: Vec::new(),
266 config: MulticastConfig::new(),
267 }
268 }
269
270 pub fn build(self) -> Result<RouteDefinition, CamelError> {
272 if self.from_uri.is_empty() {
273 return Err(CamelError::RouteError(
274 "route must have a 'from' URI".to_string(),
275 ));
276 }
277 let definition = RouteDefinition::new(self.from_uri, self.steps);
278 let definition = if let Some(eh) = self.error_handler {
279 definition.with_error_handler(eh)
280 } else {
281 definition
282 };
283 let definition = if let Some(cb) = self.circuit_breaker_config {
284 definition.with_circuit_breaker(cb)
285 } else {
286 definition
287 };
288 let definition = if let Some(concurrency) = self.concurrency {
289 definition.with_concurrency(concurrency)
290 } else {
291 definition
292 };
293 let definition = if let Some(id) = self.route_id {
294 definition.with_route_id(id)
295 } else {
296 definition
297 };
298 let definition = if let Some(auto) = self.auto_startup {
299 definition.with_auto_startup(auto)
300 } else {
301 definition
302 };
303 let definition = if let Some(order) = self.startup_order {
304 definition.with_startup_order(order)
305 } else {
306 definition
307 };
308 Ok(definition)
309 }
310}
311
312impl StepAccumulator for RouteBuilder {
313 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
314 &mut self.steps
315 }
316}
317
318pub struct SplitBuilder {
326 parent: RouteBuilder,
327 config: SplitterConfig,
328 steps: Vec<BuilderStep>,
329}
330
331impl SplitBuilder {
332 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
334 where
335 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
336 {
337 FilterInSplitBuilder {
338 parent: self,
339 predicate: std::sync::Arc::new(predicate),
340 steps: vec![],
341 }
342 }
343
344 pub fn end_split(mut self) -> RouteBuilder {
347 let split_step = BuilderStep::Split {
348 config: self.config,
349 steps: self.steps,
350 };
351 self.parent.steps.push(split_step);
352 self.parent
353 }
354}
355
356impl StepAccumulator for SplitBuilder {
357 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
358 &mut self.steps
359 }
360}
361
362pub struct FilterBuilder {
364 parent: RouteBuilder,
365 predicate: FilterPredicate,
366 steps: Vec<BuilderStep>,
367}
368
369impl FilterBuilder {
370 pub fn end_filter(mut self) -> RouteBuilder {
373 let step = BuilderStep::Filter {
374 predicate: self.predicate,
375 steps: self.steps,
376 };
377 self.parent.steps.push(step);
378 self.parent
379 }
380}
381
382impl StepAccumulator for FilterBuilder {
383 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
384 &mut self.steps
385 }
386}
387
388pub struct FilterInSplitBuilder {
390 parent: SplitBuilder,
391 predicate: FilterPredicate,
392 steps: Vec<BuilderStep>,
393}
394
395impl FilterInSplitBuilder {
396 pub fn end_filter(mut self) -> SplitBuilder {
398 let step = BuilderStep::Filter {
399 predicate: self.predicate,
400 steps: self.steps,
401 };
402 self.parent.steps.push(step);
403 self.parent
404 }
405}
406
407impl StepAccumulator for FilterInSplitBuilder {
408 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
409 &mut self.steps
410 }
411}
412
413pub struct MulticastBuilder {
421 parent: RouteBuilder,
422 steps: Vec<BuilderStep>,
423 config: MulticastConfig,
424}
425
426impl MulticastBuilder {
427 pub fn parallel(mut self, parallel: bool) -> Self {
428 self.config = self.config.parallel(parallel);
429 self
430 }
431
432 pub fn parallel_limit(mut self, limit: usize) -> Self {
433 self.config = self.config.parallel_limit(limit);
434 self
435 }
436
437 pub fn stop_on_exception(mut self, stop: bool) -> Self {
438 self.config = self.config.stop_on_exception(stop);
439 self
440 }
441
442 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
443 self.config = self.config.timeout(duration);
444 self
445 }
446
447 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
448 self.config = self.config.aggregation(strategy);
449 self
450 }
451
452 pub fn end_multicast(mut self) -> RouteBuilder {
453 let step = BuilderStep::Multicast {
454 steps: self.steps,
455 config: self.config,
456 };
457 self.parent.steps.push(step);
458 self.parent
459 }
460}
461
462impl StepAccumulator for MulticastBuilder {
463 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
464 &mut self.steps
465 }
466}
467
468#[cfg(test)]
473mod tests {
474 use super::*;
475 use camel_api::{Exchange, Message};
476 use camel_core::route::BuilderStep;
477 use tower::{Service, ServiceExt};
478
479 #[test]
480 fn test_builder_from_creates_definition() {
481 let definition = RouteBuilder::from("timer:tick").build().unwrap();
482 assert_eq!(definition.from_uri(), "timer:tick");
483 }
484
485 #[test]
486 fn test_builder_empty_from_uri_errors() {
487 let result = RouteBuilder::from("").build();
488 assert!(result.is_err());
489 }
490
491 #[test]
492 fn test_builder_to_adds_step() {
493 let definition = RouteBuilder::from("timer:tick")
494 .to("log:info")
495 .build()
496 .unwrap();
497
498 assert_eq!(definition.from_uri(), "timer:tick");
499 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
501 }
502
503 #[test]
504 fn test_builder_filter_adds_filter_step() {
505 let definition = RouteBuilder::from("timer:tick")
506 .filter(|_ex| true)
507 .to("mock:result")
508 .end_filter()
509 .build()
510 .unwrap();
511
512 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
513 }
514
515 #[test]
516 fn test_builder_set_header_adds_processor_step() {
517 let definition = RouteBuilder::from("timer:tick")
518 .set_header("key", Value::String("value".into()))
519 .build()
520 .unwrap();
521
522 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
523 }
524
525 #[test]
526 fn test_builder_map_body_adds_processor_step() {
527 let definition = RouteBuilder::from("timer:tick")
528 .map_body(|body| body)
529 .build()
530 .unwrap();
531
532 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
533 }
534
535 #[test]
536 fn test_builder_process_adds_processor_step() {
537 let definition = RouteBuilder::from("timer:tick")
538 .process(|ex| async move { Ok(ex) })
539 .build()
540 .unwrap();
541
542 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
543 }
544
545 #[test]
546 fn test_builder_chain_multiple_steps() {
547 let definition = RouteBuilder::from("timer:tick")
548 .set_header("source", Value::String("timer".into()))
549 .filter(|ex| ex.input.header("source").is_some())
550 .to("log:info")
551 .end_filter()
552 .to("mock:result")
553 .build()
554 .unwrap();
555
556 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
560 }
561
562 #[tokio::test]
567 async fn test_set_header_processor_works() {
568 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
569 let exchange = Exchange::new(Message::new("test"));
570 let result = svc.call(exchange).await.unwrap();
571 assert_eq!(
572 result.input.header("greeting"),
573 Some(&Value::String("hello".into()))
574 );
575 }
576
577 #[tokio::test]
578 async fn test_filter_processor_passes() {
579 use camel_api::BoxProcessorExt;
580 use camel_processor::FilterService;
581
582 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
583 let mut svc =
584 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
585 let exchange = Exchange::new(Message::new("pass"));
586 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
587 assert_eq!(result.input.body.as_text(), Some("pass"));
588 }
589
590 #[tokio::test]
591 async fn test_filter_processor_blocks() {
592 use camel_api::BoxProcessorExt;
593 use camel_processor::FilterService;
594
595 let sub = BoxProcessor::from_fn(|_ex| {
596 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
597 });
598 let mut svc =
599 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
600 let exchange = Exchange::new(Message::new("reject"));
601 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
602 assert_eq!(result.input.body.as_text(), Some("reject"));
603 }
604
605 #[tokio::test]
606 async fn test_map_body_processor_works() {
607 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
608 if let Some(text) = body.as_text() {
609 Body::Text(text.to_uppercase())
610 } else {
611 body
612 }
613 });
614 let exchange = Exchange::new(Message::new("hello"));
615 let result = mapper.oneshot(exchange).await.unwrap();
616 assert_eq!(result.input.body.as_text(), Some("HELLO"));
617 }
618
619 #[tokio::test]
620 async fn test_process_custom_processor_works() {
621 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
622 ex.set_property("custom", Value::Bool(true));
623 Ok(ex)
624 });
625 let exchange = Exchange::new(Message::default());
626 let result = processor.oneshot(exchange).await.unwrap();
627 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
628 }
629
630 #[tokio::test]
635 async fn test_compose_pipeline_runs_steps_in_order() {
636 use camel_core::route::compose_pipeline;
637
638 let processors = vec![
639 BoxProcessor::new(SetHeader::new(
640 IdentityProcessor,
641 "step",
642 Value::String("one".into()),
643 )),
644 BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
645 if let Some(text) = body.as_text() {
646 Body::Text(format!("{}-processed", text))
647 } else {
648 body
649 }
650 })),
651 ];
652
653 let pipeline = compose_pipeline(processors);
654 let exchange = Exchange::new(Message::new("hello"));
655 let result = pipeline.oneshot(exchange).await.unwrap();
656
657 assert_eq!(
658 result.input.header("step"),
659 Some(&Value::String("one".into()))
660 );
661 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
662 }
663
664 #[tokio::test]
665 async fn test_compose_pipeline_empty_is_identity() {
666 use camel_core::route::compose_pipeline;
667
668 let pipeline = compose_pipeline(vec![]);
669 let exchange = Exchange::new(Message::new("unchanged"));
670 let result = pipeline.oneshot(exchange).await.unwrap();
671 assert_eq!(result.input.body.as_text(), Some("unchanged"));
672 }
673
674 #[test]
679 fn test_builder_circuit_breaker_sets_config() {
680 use camel_api::circuit_breaker::CircuitBreakerConfig;
681
682 let config = CircuitBreakerConfig::new().failure_threshold(5);
683 let definition = RouteBuilder::from("timer:tick")
684 .circuit_breaker(config)
685 .build()
686 .unwrap();
687
688 let cb = definition
689 .circuit_breaker_config()
690 .expect("circuit breaker should be set");
691 assert_eq!(cb.failure_threshold, 5);
692 }
693
694 #[test]
695 fn test_builder_circuit_breaker_with_error_handler() {
696 use camel_api::circuit_breaker::CircuitBreakerConfig;
697 use camel_api::error_handler::ErrorHandlerConfig;
698
699 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
700 let eh_config = ErrorHandlerConfig::log_only();
701
702 let definition = RouteBuilder::from("timer:tick")
703 .to("log:info")
704 .circuit_breaker(cb_config)
705 .error_handler(eh_config)
706 .build()
707 .unwrap();
708
709 assert!(
710 definition.circuit_breaker_config().is_some(),
711 "circuit breaker config should be set"
712 );
713 }
715
716 #[test]
719 fn test_split_builder_typestate() {
720 use camel_api::splitter::{SplitterConfig, split_body_lines};
721
722 let definition = RouteBuilder::from("timer:test?period=1000")
724 .split(SplitterConfig::new(split_body_lines()))
725 .to("mock:per-fragment")
726 .end_split()
727 .to("mock:final")
728 .build()
729 .unwrap();
730
731 assert_eq!(definition.steps().len(), 2);
733 }
734
735 #[test]
736 fn test_split_builder_steps_collected() {
737 use camel_api::splitter::{SplitterConfig, split_body_lines};
738
739 let definition = RouteBuilder::from("timer:test?period=1000")
740 .split(SplitterConfig::new(split_body_lines()))
741 .set_header("fragment", Value::String("yes".into()))
742 .to("mock:per-fragment")
743 .end_split()
744 .build()
745 .unwrap();
746
747 assert_eq!(definition.steps().len(), 1);
749 match &definition.steps()[0] {
750 BuilderStep::Split { steps, .. } => {
751 assert_eq!(steps.len(), 2); }
753 other => panic!("Expected Split, got {:?}", other),
754 }
755 }
756
757 #[test]
758 fn test_split_builder_config_propagated() {
759 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
760
761 let definition = RouteBuilder::from("timer:test?period=1000")
762 .split(
763 SplitterConfig::new(split_body_lines())
764 .parallel(true)
765 .parallel_limit(4)
766 .aggregation(AggregationStrategy::CollectAll),
767 )
768 .to("mock:per-fragment")
769 .end_split()
770 .build()
771 .unwrap();
772
773 match &definition.steps()[0] {
774 BuilderStep::Split { config, .. } => {
775 assert!(config.parallel);
776 assert_eq!(config.parallel_limit, Some(4));
777 assert!(matches!(
778 config.aggregation,
779 AggregationStrategy::CollectAll
780 ));
781 }
782 other => panic!("Expected Split, got {:?}", other),
783 }
784 }
785
786 #[test]
787 fn test_aggregate_builder_adds_step() {
788 use camel_api::aggregator::AggregatorConfig;
789 use camel_core::route::BuilderStep;
790
791 let definition = RouteBuilder::from("timer:tick")
792 .aggregate(
793 AggregatorConfig::correlate_by("key")
794 .complete_when_size(2)
795 .build(),
796 )
797 .build()
798 .unwrap();
799
800 assert_eq!(definition.steps().len(), 1);
801 assert!(matches!(
802 definition.steps()[0],
803 BuilderStep::Aggregate { .. }
804 ));
805 }
806
807 #[test]
808 fn test_aggregate_in_split_builder() {
809 use camel_api::aggregator::AggregatorConfig;
810 use camel_api::splitter::{SplitterConfig, split_body_lines};
811 use camel_core::route::BuilderStep;
812
813 let definition = RouteBuilder::from("timer:tick")
814 .split(SplitterConfig::new(split_body_lines()))
815 .aggregate(
816 AggregatorConfig::correlate_by("key")
817 .complete_when_size(1)
818 .build(),
819 )
820 .end_split()
821 .build()
822 .unwrap();
823
824 assert_eq!(definition.steps().len(), 1);
825 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
826 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
827 } else {
828 panic!("expected Split step");
829 }
830 }
831
832 #[test]
835 fn test_builder_set_body_static_adds_processor() {
836 let definition = RouteBuilder::from("timer:tick")
837 .set_body("fixed")
838 .build()
839 .unwrap();
840 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
841 }
842
843 #[test]
844 fn test_builder_set_body_fn_adds_processor() {
845 let definition = RouteBuilder::from("timer:tick")
846 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
847 .build()
848 .unwrap();
849 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
850 }
851
852 #[test]
853 fn test_builder_set_header_fn_adds_processor() {
854 let definition = RouteBuilder::from("timer:tick")
855 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
856 .build()
857 .unwrap();
858 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
859 }
860
861 #[tokio::test]
862 async fn test_set_body_static_processor_works() {
863 use camel_core::route::compose_pipeline;
864 let def = RouteBuilder::from("t:t")
865 .set_body("replaced")
866 .build()
867 .unwrap();
868 let pipeline = compose_pipeline(
869 def.steps()
870 .iter()
871 .filter_map(|s| {
872 if let BuilderStep::Processor(p) = s {
873 Some(p.clone())
874 } else {
875 None
876 }
877 })
878 .collect(),
879 );
880 let exchange = Exchange::new(Message::new("original"));
881 let result = pipeline.oneshot(exchange).await.unwrap();
882 assert_eq!(result.input.body.as_text(), Some("replaced"));
883 }
884
885 #[tokio::test]
886 async fn test_set_body_fn_processor_works() {
887 use camel_core::route::compose_pipeline;
888 let def = RouteBuilder::from("t:t")
889 .set_body_fn(|ex: &Exchange| {
890 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
891 })
892 .build()
893 .unwrap();
894 let pipeline = compose_pipeline(
895 def.steps()
896 .iter()
897 .filter_map(|s| {
898 if let BuilderStep::Processor(p) = s {
899 Some(p.clone())
900 } else {
901 None
902 }
903 })
904 .collect(),
905 );
906 let exchange = Exchange::new(Message::new("hello"));
907 let result = pipeline.oneshot(exchange).await.unwrap();
908 assert_eq!(result.input.body.as_text(), Some("HELLO"));
909 }
910
911 #[tokio::test]
912 async fn test_set_header_fn_processor_works() {
913 use camel_core::route::compose_pipeline;
914 let def = RouteBuilder::from("t:t")
915 .set_header_fn("echo", |ex: &Exchange| {
916 ex.input
917 .body
918 .as_text()
919 .map(|t| Value::String(t.into()))
920 .unwrap_or(Value::Null)
921 })
922 .build()
923 .unwrap();
924 let pipeline = compose_pipeline(
925 def.steps()
926 .iter()
927 .filter_map(|s| {
928 if let BuilderStep::Processor(p) = s {
929 Some(p.clone())
930 } else {
931 None
932 }
933 })
934 .collect(),
935 );
936 let exchange = Exchange::new(Message::new("ping"));
937 let result = pipeline.oneshot(exchange).await.unwrap();
938 assert_eq!(
939 result.input.header("echo"),
940 Some(&Value::String("ping".into()))
941 );
942 }
943
944 #[test]
947 fn test_filter_builder_typestate() {
948 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
949 .filter(|_ex| true)
950 .to("mock:inner")
951 .end_filter()
952 .to("mock:outer")
953 .build();
954 assert!(result.is_ok());
955 }
956
957 #[test]
958 fn test_filter_builder_steps_collected() {
959 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
960 .filter(|_ex| true)
961 .to("mock:inner")
962 .end_filter()
963 .build()
964 .unwrap();
965
966 assert_eq!(definition.steps().len(), 1);
967 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
968 }
969
970 #[test]
971 fn test_wire_tap_builder_adds_step() {
972 let definition = RouteBuilder::from("timer:tick")
973 .wire_tap("mock:tap")
974 .to("mock:result")
975 .build()
976 .unwrap();
977
978 assert_eq!(definition.steps().len(), 2);
979 assert!(
980 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
981 );
982 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
983 }
984
985 #[test]
988 fn test_multicast_builder_typestate() {
989 let definition = RouteBuilder::from("timer:tick")
990 .multicast()
991 .to("direct:a")
992 .to("direct:b")
993 .end_multicast()
994 .to("mock:result")
995 .build()
996 .unwrap();
997
998 assert_eq!(definition.steps().len(), 2); }
1000
1001 #[test]
1002 fn test_multicast_builder_steps_collected() {
1003 let definition = RouteBuilder::from("timer:tick")
1004 .multicast()
1005 .to("direct:a")
1006 .to("direct:b")
1007 .end_multicast()
1008 .build()
1009 .unwrap();
1010
1011 match &definition.steps()[0] {
1012 BuilderStep::Multicast { steps, .. } => {
1013 assert_eq!(steps.len(), 2);
1014 }
1015 other => panic!("Expected Multicast, got {:?}", other),
1016 }
1017 }
1018
1019 #[test]
1022 fn test_builder_concurrent_sets_concurrency() {
1023 use camel_component::ConcurrencyModel;
1024
1025 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1026 .concurrent(16)
1027 .to("log:info")
1028 .build()
1029 .unwrap();
1030
1031 assert_eq!(
1032 definition.concurrency_override(),
1033 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
1034 );
1035 }
1036
1037 #[test]
1038 fn test_builder_concurrent_zero_means_unbounded() {
1039 use camel_component::ConcurrencyModel;
1040
1041 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1042 .concurrent(0)
1043 .to("log:info")
1044 .build()
1045 .unwrap();
1046
1047 assert_eq!(
1048 definition.concurrency_override(),
1049 Some(&ConcurrencyModel::Concurrent { max: None })
1050 );
1051 }
1052
1053 #[test]
1054 fn test_builder_sequential_sets_concurrency() {
1055 use camel_component::ConcurrencyModel;
1056
1057 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1058 .sequential()
1059 .to("log:info")
1060 .build()
1061 .unwrap();
1062
1063 assert_eq!(
1064 definition.concurrency_override(),
1065 Some(&ConcurrencyModel::Sequential)
1066 );
1067 }
1068
1069 #[test]
1070 fn test_builder_default_concurrency_is_none() {
1071 let definition = RouteBuilder::from("timer:tick")
1072 .to("log:info")
1073 .build()
1074 .unwrap();
1075
1076 assert_eq!(definition.concurrency_override(), None);
1077 }
1078
1079 #[test]
1082 fn test_builder_route_id_sets_id() {
1083 let definition = RouteBuilder::from("timer:tick")
1084 .route_id("my-route")
1085 .build()
1086 .unwrap();
1087
1088 assert_eq!(definition.route_id(), Some("my-route"));
1089 }
1090
1091 #[test]
1092 fn test_builder_auto_startup_false() {
1093 let definition = RouteBuilder::from("timer:tick")
1094 .auto_startup(false)
1095 .build()
1096 .unwrap();
1097
1098 assert!(!definition.auto_startup());
1099 }
1100
1101 #[test]
1102 fn test_builder_startup_order_custom() {
1103 let definition = RouteBuilder::from("timer:tick")
1104 .startup_order(50)
1105 .build()
1106 .unwrap();
1107
1108 assert_eq!(definition.startup_order(), 50);
1109 }
1110
1111 #[test]
1112 fn test_builder_defaults() {
1113 let definition = RouteBuilder::from("timer:tick").build().unwrap();
1114
1115 assert_eq!(definition.route_id(), None);
1116 assert!(definition.auto_startup());
1117 assert_eq!(definition.startup_order(), 1000);
1118 }
1119}