1use camel_api::aggregator::{AggregationStrategy, AggregatorConfig, CompletionCondition};
2use camel_api::body::Body;
3use camel_api::body_converter::BodyType;
4use camel_api::circuit_breaker::CircuitBreakerConfig;
5use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
6use camel_api::error_handler::ErrorHandlerConfig;
7use camel_api::load_balancer::LoadBalancerConfig;
8use camel_api::multicast::{MulticastConfig, MulticastStrategy};
9use camel_api::splitter::SplitterConfig;
10use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
11use camel_api::{
12 BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
13 ProcessorFn, Value,
14 runtime::{
15 CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
16 CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
17 CanonicalWhenSpec,
18 },
19};
20use camel_component::ConcurrencyModel;
21use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
22use camel_processor::{ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, SetBody, SetHeader};
23
24pub trait StepAccumulator: Sized {
30 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
31
32 fn to(mut self, endpoint: impl Into<String>) -> Self {
33 self.steps_mut().push(BuilderStep::To(endpoint.into()));
34 self
35 }
36
37 fn process<F, Fut>(mut self, f: F) -> Self
38 where
39 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
40 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
41 {
42 let svc = ProcessorFn::new(f);
43 self.steps_mut()
44 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
45 self
46 }
47
48 fn process_fn(mut self, processor: BoxProcessor) -> Self {
49 self.steps_mut().push(BuilderStep::Processor(processor));
50 self
51 }
52
53 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
54 let svc = SetHeader::new(IdentityProcessor, key, value);
55 self.steps_mut()
56 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
57 self
58 }
59
60 fn map_body<F>(mut self, mapper: F) -> Self
61 where
62 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
63 {
64 let svc = MapBody::new(IdentityProcessor, mapper);
65 self.steps_mut()
66 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
67 self
68 }
69
70 fn set_body<B>(mut self, body: B) -> Self
71 where
72 B: Into<Body> + Clone + Send + Sync + 'static,
73 {
74 let body: Body = body.into();
75 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
76 self.steps_mut()
77 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
78 self
79 }
80
81 fn set_body_fn<F>(mut self, expr: F) -> Self
82 where
83 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
84 {
85 let svc = SetBody::new(IdentityProcessor, expr);
86 self.steps_mut()
87 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
88 self
89 }
90
91 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
92 where
93 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
94 {
95 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
96 self.steps_mut()
97 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
98 self
99 }
100
101 fn aggregate(mut self, config: AggregatorConfig) -> Self {
102 self.steps_mut().push(BuilderStep::Aggregate { config });
103 self
104 }
105
106 fn stop(mut self) -> Self {
112 self.steps_mut().push(BuilderStep::Stop);
113 self
114 }
115
116 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
120 self.steps_mut().push(BuilderStep::Log {
121 level,
122 message: message.into(),
123 });
124 self
125 }
126
127 fn convert_body_to(mut self, target: BodyType) -> Self {
139 let svc = ConvertBodyTo::new(IdentityProcessor, target);
140 self.steps_mut()
141 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
142 self
143 }
144
145 fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
156 self.steps_mut().push(BuilderStep::Script {
157 language: language.into(),
158 script: script.into(),
159 });
160 self
161 }
162}
163
164pub struct RouteBuilder {
176 from_uri: String,
177 steps: Vec<BuilderStep>,
178 error_handler: Option<ErrorHandlerConfig>,
179 circuit_breaker_config: Option<CircuitBreakerConfig>,
180 concurrency: Option<ConcurrencyModel>,
181 route_id: Option<String>,
182 auto_startup: Option<bool>,
183 startup_order: Option<i32>,
184}
185
186impl RouteBuilder {
187 pub fn from(endpoint: &str) -> Self {
189 Self {
190 from_uri: endpoint.to_string(),
191 steps: Vec::new(),
192 error_handler: None,
193 circuit_breaker_config: None,
194 concurrency: None,
195 route_id: None,
196 auto_startup: None,
197 startup_order: None,
198 }
199 }
200
201 pub fn filter<F>(self, predicate: F) -> FilterBuilder
205 where
206 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
207 {
208 FilterBuilder {
209 parent: self,
210 predicate: std::sync::Arc::new(predicate),
211 steps: vec![],
212 }
213 }
214
215 pub fn choice(self) -> ChoiceBuilder {
221 ChoiceBuilder {
222 parent: self,
223 whens: vec![],
224 _otherwise: None,
225 }
226 }
227
228 pub fn wire_tap(mut self, endpoint: &str) -> Self {
232 self.steps.push(BuilderStep::WireTap {
233 uri: endpoint.to_string(),
234 });
235 self
236 }
237
238 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
240 self.error_handler = Some(config);
241 self
242 }
243
244 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
246 self.circuit_breaker_config = Some(config);
247 self
248 }
249
250 pub fn concurrent(mut self, max: usize) -> Self {
264 let max = if max == 0 { None } else { Some(max) };
265 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
266 self
267 }
268
269 pub fn sequential(mut self) -> Self {
274 self.concurrency = Some(ConcurrencyModel::Sequential);
275 self
276 }
277
278 pub fn route_id(mut self, id: impl Into<String>) -> Self {
282 self.route_id = Some(id.into());
283 self
284 }
285
286 pub fn auto_startup(mut self, auto: bool) -> Self {
290 self.auto_startup = Some(auto);
291 self
292 }
293
294 pub fn startup_order(mut self, order: i32) -> Self {
298 self.startup_order = Some(order);
299 self
300 }
301
302 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
308 SplitBuilder {
309 parent: self,
310 config,
311 steps: Vec::new(),
312 }
313 }
314
315 pub fn multicast(self) -> MulticastBuilder {
321 MulticastBuilder {
322 parent: self,
323 steps: Vec::new(),
324 config: MulticastConfig::new(),
325 }
326 }
327
328 pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
335 ThrottleBuilder {
336 parent: self,
337 config: ThrottlerConfig::new(max_requests, period),
338 steps: Vec::new(),
339 }
340 }
341
342 pub fn load_balance(self) -> LoadBalancerBuilder {
348 LoadBalancerBuilder {
349 parent: self,
350 config: LoadBalancerConfig::round_robin(),
351 steps: Vec::new(),
352 }
353 }
354
355 pub fn dynamic_router(self, expression: RouterExpression) -> Self {
371 self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
372 }
373
374 pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
378 self.steps.push(BuilderStep::DynamicRouter { config });
379 self
380 }
381
382 pub fn build(self) -> Result<RouteDefinition, CamelError> {
384 if self.from_uri.is_empty() {
385 return Err(CamelError::RouteError(
386 "route must have a 'from' URI".to_string(),
387 ));
388 }
389 let route_id = self.route_id.ok_or_else(|| {
390 CamelError::RouteError(
391 "route must have a 'route_id' — call .route_id(\"name\") on the builder"
392 .to_string(),
393 )
394 })?;
395 let definition = RouteDefinition::new(self.from_uri, self.steps);
396 let definition = if let Some(eh) = self.error_handler {
397 definition.with_error_handler(eh)
398 } else {
399 definition
400 };
401 let definition = if let Some(cb) = self.circuit_breaker_config {
402 definition.with_circuit_breaker(cb)
403 } else {
404 definition
405 };
406 let definition = if let Some(concurrency) = self.concurrency {
407 definition.with_concurrency(concurrency)
408 } else {
409 definition
410 };
411 let definition = definition.with_route_id(route_id);
412 let definition = if let Some(auto) = self.auto_startup {
413 definition.with_auto_startup(auto)
414 } else {
415 definition
416 };
417 let definition = if let Some(order) = self.startup_order {
418 definition.with_startup_order(order)
419 } else {
420 definition
421 };
422 Ok(definition)
423 }
424
425 pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
427 if self.from_uri.is_empty() {
428 return Err(CamelError::RouteError(
429 "route must have a 'from' URI".to_string(),
430 ));
431 }
432 let route_id = self.route_id.ok_or_else(|| {
433 CamelError::RouteError(
434 "route must have a 'route_id' — call .route_id(\"name\") on the builder"
435 .to_string(),
436 )
437 })?;
438
439 let steps = canonicalize_steps(self.steps)?;
440 let circuit_breaker = self
441 .circuit_breaker_config
442 .map(canonicalize_circuit_breaker);
443
444 let spec = CanonicalRouteSpec {
445 route_id,
446 from: self.from_uri,
447 steps,
448 circuit_breaker,
449 version: camel_api::CANONICAL_CONTRACT_VERSION,
450 };
451 spec.validate_contract()?;
452 Ok(spec)
453 }
454}
455
456fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
457 let mut canonical = Vec::with_capacity(steps.len());
458 for step in steps {
459 canonical.push(canonicalize_step(step)?);
460 }
461 Ok(canonical)
462}
463
464fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
465 match step {
466 BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
467 BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
468 BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
469 BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
470 BuilderStep::DeclarativeScript { expression } => {
471 Ok(CanonicalStepSpec::Script { expression })
472 }
473 BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
474 predicate,
475 steps: canonicalize_steps(steps)?,
476 }),
477 BuilderStep::DeclarativeChoice { whens, otherwise } => {
478 let mut canonical_whens = Vec::with_capacity(whens.len());
479 for DeclarativeWhenStep { predicate, steps } in whens {
480 canonical_whens.push(CanonicalWhenSpec {
481 predicate,
482 steps: canonicalize_steps(steps)?,
483 });
484 }
485 let otherwise = match otherwise {
486 Some(steps) => Some(canonicalize_steps(steps)?),
487 None => None,
488 };
489 Ok(CanonicalStepSpec::Choice {
490 whens: canonical_whens,
491 otherwise,
492 })
493 }
494 BuilderStep::DeclarativeSplit {
495 expression,
496 aggregation,
497 parallel,
498 parallel_limit,
499 stop_on_exception,
500 steps,
501 } => Ok(CanonicalStepSpec::Split {
502 expression: CanonicalSplitExpressionSpec::Language(expression),
503 aggregation: canonicalize_split_aggregation(aggregation)?,
504 parallel,
505 parallel_limit,
506 stop_on_exception,
507 steps: canonicalize_steps(steps)?,
508 }),
509 BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate {
510 config: canonicalize_aggregate(config)?,
511 }),
512 other => {
513 let step_name = canonical_step_name(&other);
514 let detail = camel_api::canonical_contract_rejection_reason(step_name)
515 .unwrap_or("not included in canonical v1");
516 Err(CamelError::RouteError(format!(
517 "canonical v1 does not support step `{step_name}`: {detail}"
518 )))
519 }
520 }
521}
522
523fn canonicalize_split_aggregation(
524 strategy: camel_api::splitter::AggregationStrategy,
525) -> Result<CanonicalSplitAggregationSpec, CamelError> {
526 match strategy {
527 camel_api::splitter::AggregationStrategy::LastWins => {
528 Ok(CanonicalSplitAggregationSpec::LastWins)
529 }
530 camel_api::splitter::AggregationStrategy::CollectAll => {
531 Ok(CanonicalSplitAggregationSpec::CollectAll)
532 }
533 camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
534 "canonical v1 does not support custom split aggregation".to_string(),
535 )),
536 camel_api::splitter::AggregationStrategy::Original => {
537 Ok(CanonicalSplitAggregationSpec::Original)
538 }
539 }
540}
541
542fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
543 let completion_size = match config.completion {
544 CompletionCondition::Size(size) => Some(size),
545 CompletionCondition::Predicate(_) => {
546 return Err(CamelError::RouteError(
547 "canonical v1 does not support aggregate predicate completion".to_string(),
548 ));
549 }
550 };
551 let strategy = match config.strategy {
552 AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
553 AggregationStrategy::Custom(_) => {
554 return Err(CamelError::RouteError(
555 "canonical v1 does not support custom aggregate strategy".to_string(),
556 ));
557 }
558 };
559 let bucket_ttl_ms = config
560 .bucket_ttl
561 .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
562
563 Ok(CanonicalAggregateSpec {
564 header: config.header_name,
565 completion_size,
566 strategy,
567 max_buckets: config.max_buckets,
568 bucket_ttl_ms,
569 })
570}
571
572fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
573 CanonicalCircuitBreakerSpec {
574 failure_threshold: config.failure_threshold,
575 open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
576 }
577}
578
579fn canonical_step_name(step: &BuilderStep) -> &'static str {
580 match step {
581 BuilderStep::Processor(_) => "processor",
582 BuilderStep::To(_) => "to",
583 BuilderStep::Stop => "stop",
584 BuilderStep::Log { .. } => "log",
585 BuilderStep::DeclarativeSetHeader { .. } => "set_header",
586 BuilderStep::DeclarativeSetBody { .. } => "set_body",
587 BuilderStep::DeclarativeFilter { .. } => "filter",
588 BuilderStep::DeclarativeChoice { .. } => "choice",
589 BuilderStep::DeclarativeScript { .. } => "script",
590 BuilderStep::DeclarativeSplit { .. } => "split",
591 BuilderStep::Split { .. } => "split",
592 BuilderStep::Aggregate { .. } => "aggregate",
593 BuilderStep::Filter { .. } => "filter",
594 BuilderStep::Choice { .. } => "choice",
595 BuilderStep::WireTap { .. } => "wire_tap",
596 BuilderStep::Multicast { .. } => "multicast",
597 BuilderStep::DeclarativeLog { .. } => "log",
598 BuilderStep::Bean { .. } => "bean",
599 BuilderStep::Script { .. } => "script",
600 BuilderStep::Throttle { .. } => "throttle",
601 BuilderStep::LoadBalance { .. } => "load_balancer",
602 BuilderStep::DynamicRouter { .. } => "dynamic_router",
603 }
604}
605
606impl StepAccumulator for RouteBuilder {
607 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
608 &mut self.steps
609 }
610}
611
612pub struct SplitBuilder {
620 parent: RouteBuilder,
621 config: SplitterConfig,
622 steps: Vec<BuilderStep>,
623}
624
625impl SplitBuilder {
626 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
628 where
629 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
630 {
631 FilterInSplitBuilder {
632 parent: self,
633 predicate: std::sync::Arc::new(predicate),
634 steps: vec![],
635 }
636 }
637
638 pub fn end_split(mut self) -> RouteBuilder {
641 let split_step = BuilderStep::Split {
642 config: self.config,
643 steps: self.steps,
644 };
645 self.parent.steps.push(split_step);
646 self.parent
647 }
648}
649
650impl StepAccumulator for SplitBuilder {
651 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
652 &mut self.steps
653 }
654}
655
656pub struct FilterBuilder {
658 parent: RouteBuilder,
659 predicate: FilterPredicate,
660 steps: Vec<BuilderStep>,
661}
662
663impl FilterBuilder {
664 pub fn end_filter(mut self) -> RouteBuilder {
667 let step = BuilderStep::Filter {
668 predicate: self.predicate,
669 steps: self.steps,
670 };
671 self.parent.steps.push(step);
672 self.parent
673 }
674}
675
676impl StepAccumulator for FilterBuilder {
677 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
678 &mut self.steps
679 }
680}
681
682pub struct FilterInSplitBuilder {
684 parent: SplitBuilder,
685 predicate: FilterPredicate,
686 steps: Vec<BuilderStep>,
687}
688
689impl FilterInSplitBuilder {
690 pub fn end_filter(mut self) -> SplitBuilder {
692 let step = BuilderStep::Filter {
693 predicate: self.predicate,
694 steps: self.steps,
695 };
696 self.parent.steps.push(step);
697 self.parent
698 }
699}
700
701impl StepAccumulator for FilterInSplitBuilder {
702 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
703 &mut self.steps
704 }
705}
706
707pub struct ChoiceBuilder {
714 parent: RouteBuilder,
715 whens: Vec<WhenStep>,
716 _otherwise: Option<Vec<BuilderStep>>,
717}
718
719impl ChoiceBuilder {
720 pub fn when<F>(self, predicate: F) -> WhenBuilder
723 where
724 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
725 {
726 WhenBuilder {
727 parent: self,
728 predicate: std::sync::Arc::new(predicate),
729 steps: vec![],
730 }
731 }
732
733 pub fn otherwise(self) -> OtherwiseBuilder {
737 OtherwiseBuilder {
738 parent: self,
739 steps: vec![],
740 }
741 }
742
743 pub fn end_choice(mut self) -> RouteBuilder {
747 let step = BuilderStep::Choice {
748 whens: self.whens,
749 otherwise: self._otherwise,
750 };
751 self.parent.steps.push(step);
752 self.parent
753 }
754}
755
756pub struct WhenBuilder {
758 parent: ChoiceBuilder,
759 predicate: camel_api::FilterPredicate,
760 steps: Vec<BuilderStep>,
761}
762
763impl WhenBuilder {
764 pub fn end_when(mut self) -> ChoiceBuilder {
767 self.parent.whens.push(WhenStep {
768 predicate: self.predicate,
769 steps: self.steps,
770 });
771 self.parent
772 }
773}
774
775impl StepAccumulator for WhenBuilder {
776 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
777 &mut self.steps
778 }
779}
780
781pub struct OtherwiseBuilder {
783 parent: ChoiceBuilder,
784 steps: Vec<BuilderStep>,
785}
786
787impl OtherwiseBuilder {
788 pub fn end_otherwise(self) -> ChoiceBuilder {
790 let OtherwiseBuilder { mut parent, steps } = self;
791 parent._otherwise = Some(steps);
792 parent
793 }
794}
795
796impl StepAccumulator for OtherwiseBuilder {
797 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
798 &mut self.steps
799 }
800}
801
802pub struct MulticastBuilder {
810 parent: RouteBuilder,
811 steps: Vec<BuilderStep>,
812 config: MulticastConfig,
813}
814
815impl MulticastBuilder {
816 pub fn parallel(mut self, parallel: bool) -> Self {
817 self.config = self.config.parallel(parallel);
818 self
819 }
820
821 pub fn parallel_limit(mut self, limit: usize) -> Self {
822 self.config = self.config.parallel_limit(limit);
823 self
824 }
825
826 pub fn stop_on_exception(mut self, stop: bool) -> Self {
827 self.config = self.config.stop_on_exception(stop);
828 self
829 }
830
831 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
832 self.config = self.config.timeout(duration);
833 self
834 }
835
836 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
837 self.config = self.config.aggregation(strategy);
838 self
839 }
840
841 pub fn end_multicast(mut self) -> RouteBuilder {
842 let step = BuilderStep::Multicast {
843 steps: self.steps,
844 config: self.config,
845 };
846 self.parent.steps.push(step);
847 self.parent
848 }
849}
850
851impl StepAccumulator for MulticastBuilder {
852 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
853 &mut self.steps
854 }
855}
856
857pub struct ThrottleBuilder {
865 parent: RouteBuilder,
866 config: ThrottlerConfig,
867 steps: Vec<BuilderStep>,
868}
869
870impl ThrottleBuilder {
871 pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
877 self.config = self.config.strategy(strategy);
878 self
879 }
880
881 pub fn end_throttle(mut self) -> RouteBuilder {
884 let step = BuilderStep::Throttle {
885 config: self.config,
886 steps: self.steps,
887 };
888 self.parent.steps.push(step);
889 self.parent
890 }
891}
892
893impl StepAccumulator for ThrottleBuilder {
894 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
895 &mut self.steps
896 }
897}
898
899pub struct LoadBalancerBuilder {
907 parent: RouteBuilder,
908 config: LoadBalancerConfig,
909 steps: Vec<BuilderStep>,
910}
911
912impl LoadBalancerBuilder {
913 pub fn round_robin(mut self) -> Self {
915 self.config = LoadBalancerConfig::round_robin();
916 self
917 }
918
919 pub fn random(mut self) -> Self {
921 self.config = LoadBalancerConfig::random();
922 self
923 }
924
925 pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
930 self.config = LoadBalancerConfig::weighted(weights);
931 self
932 }
933
934 pub fn failover(mut self) -> Self {
939 self.config = LoadBalancerConfig::failover();
940 self
941 }
942
943 pub fn parallel(mut self, parallel: bool) -> Self {
948 self.config = self.config.parallel(parallel);
949 self
950 }
951
952 pub fn end_load_balance(mut self) -> RouteBuilder {
955 let step = BuilderStep::LoadBalance {
956 config: self.config,
957 steps: self.steps,
958 };
959 self.parent.steps.push(step);
960 self.parent
961 }
962}
963
964impl StepAccumulator for LoadBalancerBuilder {
965 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
966 &mut self.steps
967 }
968}
969
970#[cfg(test)]
975mod tests {
976 use super::*;
977 use camel_api::load_balancer::LoadBalanceStrategy;
978 use camel_api::{Exchange, Message};
979 use camel_core::route::BuilderStep;
980 use std::sync::Arc;
981 use tower::{Service, ServiceExt};
982
983 #[test]
984 fn test_builder_from_creates_definition() {
985 let definition = RouteBuilder::from("timer:tick")
986 .route_id("test-route")
987 .build()
988 .unwrap();
989 assert_eq!(definition.from_uri(), "timer:tick");
990 }
991
992 #[test]
993 fn test_builder_empty_from_uri_errors() {
994 let result = RouteBuilder::from("").route_id("test-route").build();
995 assert!(result.is_err());
996 }
997
998 #[test]
999 fn test_builder_to_adds_step() {
1000 let definition = RouteBuilder::from("timer:tick")
1001 .route_id("test-route")
1002 .to("log:info")
1003 .build()
1004 .unwrap();
1005
1006 assert_eq!(definition.from_uri(), "timer:tick");
1007 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1009 }
1010
1011 #[test]
1012 fn test_builder_filter_adds_filter_step() {
1013 let definition = RouteBuilder::from("timer:tick")
1014 .route_id("test-route")
1015 .filter(|_ex| true)
1016 .to("mock:result")
1017 .end_filter()
1018 .build()
1019 .unwrap();
1020
1021 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1022 }
1023
1024 #[test]
1025 fn test_builder_set_header_adds_processor_step() {
1026 let definition = RouteBuilder::from("timer:tick")
1027 .route_id("test-route")
1028 .set_header("key", Value::String("value".into()))
1029 .build()
1030 .unwrap();
1031
1032 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1033 }
1034
1035 #[test]
1036 fn test_builder_map_body_adds_processor_step() {
1037 let definition = RouteBuilder::from("timer:tick")
1038 .route_id("test-route")
1039 .map_body(|body| body)
1040 .build()
1041 .unwrap();
1042
1043 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1044 }
1045
1046 #[test]
1047 fn test_builder_process_adds_processor_step() {
1048 let definition = RouteBuilder::from("timer:tick")
1049 .route_id("test-route")
1050 .process(|ex| async move { Ok(ex) })
1051 .build()
1052 .unwrap();
1053
1054 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1055 }
1056
1057 #[test]
1058 fn test_builder_chain_multiple_steps() {
1059 let definition = RouteBuilder::from("timer:tick")
1060 .route_id("test-route")
1061 .set_header("source", Value::String("timer".into()))
1062 .filter(|ex| ex.input.header("source").is_some())
1063 .to("log:info")
1064 .end_filter()
1065 .to("mock:result")
1066 .build()
1067 .unwrap();
1068
1069 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1073 }
1074
1075 #[tokio::test]
1080 async fn test_set_header_processor_works() {
1081 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1082 let exchange = Exchange::new(Message::new("test"));
1083 let result = svc.call(exchange).await.unwrap();
1084 assert_eq!(
1085 result.input.header("greeting"),
1086 Some(&Value::String("hello".into()))
1087 );
1088 }
1089
1090 #[tokio::test]
1091 async fn test_filter_processor_passes() {
1092 use camel_api::BoxProcessorExt;
1093 use camel_processor::FilterService;
1094
1095 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1096 let mut svc =
1097 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1098 let exchange = Exchange::new(Message::new("pass"));
1099 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1100 assert_eq!(result.input.body.as_text(), Some("pass"));
1101 }
1102
1103 #[tokio::test]
1104 async fn test_filter_processor_blocks() {
1105 use camel_api::BoxProcessorExt;
1106 use camel_processor::FilterService;
1107
1108 let sub = BoxProcessor::from_fn(|_ex| {
1109 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1110 });
1111 let mut svc =
1112 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1113 let exchange = Exchange::new(Message::new("reject"));
1114 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1115 assert_eq!(result.input.body.as_text(), Some("reject"));
1116 }
1117
1118 #[tokio::test]
1119 async fn test_map_body_processor_works() {
1120 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1121 if let Some(text) = body.as_text() {
1122 Body::Text(text.to_uppercase())
1123 } else {
1124 body
1125 }
1126 });
1127 let exchange = Exchange::new(Message::new("hello"));
1128 let result = mapper.oneshot(exchange).await.unwrap();
1129 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1130 }
1131
1132 #[tokio::test]
1133 async fn test_process_custom_processor_works() {
1134 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1135 ex.set_property("custom", Value::Bool(true));
1136 Ok(ex)
1137 });
1138 let exchange = Exchange::new(Message::default());
1139 let result = processor.oneshot(exchange).await.unwrap();
1140 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1141 }
1142
1143 #[tokio::test]
1148 async fn test_compose_pipeline_runs_steps_in_order() {
1149 use camel_core::route::compose_pipeline;
1150
1151 let processors = vec![
1152 BoxProcessor::new(SetHeader::new(
1153 IdentityProcessor,
1154 "step",
1155 Value::String("one".into()),
1156 )),
1157 BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1158 if let Some(text) = body.as_text() {
1159 Body::Text(format!("{}-processed", text))
1160 } else {
1161 body
1162 }
1163 })),
1164 ];
1165
1166 let pipeline = compose_pipeline(processors);
1167 let exchange = Exchange::new(Message::new("hello"));
1168 let result = pipeline.oneshot(exchange).await.unwrap();
1169
1170 assert_eq!(
1171 result.input.header("step"),
1172 Some(&Value::String("one".into()))
1173 );
1174 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1175 }
1176
1177 #[tokio::test]
1178 async fn test_compose_pipeline_empty_is_identity() {
1179 use camel_core::route::compose_pipeline;
1180
1181 let pipeline = compose_pipeline(vec![]);
1182 let exchange = Exchange::new(Message::new("unchanged"));
1183 let result = pipeline.oneshot(exchange).await.unwrap();
1184 assert_eq!(result.input.body.as_text(), Some("unchanged"));
1185 }
1186
1187 #[test]
1192 fn test_builder_circuit_breaker_sets_config() {
1193 use camel_api::circuit_breaker::CircuitBreakerConfig;
1194
1195 let config = CircuitBreakerConfig::new().failure_threshold(5);
1196 let definition = RouteBuilder::from("timer:tick")
1197 .route_id("test-route")
1198 .circuit_breaker(config)
1199 .build()
1200 .unwrap();
1201
1202 let cb = definition
1203 .circuit_breaker_config()
1204 .expect("circuit breaker should be set");
1205 assert_eq!(cb.failure_threshold, 5);
1206 }
1207
1208 #[test]
1209 fn test_builder_circuit_breaker_with_error_handler() {
1210 use camel_api::circuit_breaker::CircuitBreakerConfig;
1211 use camel_api::error_handler::ErrorHandlerConfig;
1212
1213 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1214 let eh_config = ErrorHandlerConfig::log_only();
1215
1216 let definition = RouteBuilder::from("timer:tick")
1217 .route_id("test-route")
1218 .to("log:info")
1219 .circuit_breaker(cb_config)
1220 .error_handler(eh_config)
1221 .build()
1222 .unwrap();
1223
1224 assert!(
1225 definition.circuit_breaker_config().is_some(),
1226 "circuit breaker config should be set"
1227 );
1228 }
1230
1231 #[test]
1234 fn test_split_builder_typestate() {
1235 use camel_api::splitter::{SplitterConfig, split_body_lines};
1236
1237 let definition = RouteBuilder::from("timer:test?period=1000")
1239 .route_id("test-route")
1240 .split(SplitterConfig::new(split_body_lines()))
1241 .to("mock:per-fragment")
1242 .end_split()
1243 .to("mock:final")
1244 .build()
1245 .unwrap();
1246
1247 assert_eq!(definition.steps().len(), 2);
1249 }
1250
1251 #[test]
1252 fn test_split_builder_steps_collected() {
1253 use camel_api::splitter::{SplitterConfig, split_body_lines};
1254
1255 let definition = RouteBuilder::from("timer:test?period=1000")
1256 .route_id("test-route")
1257 .split(SplitterConfig::new(split_body_lines()))
1258 .set_header("fragment", Value::String("yes".into()))
1259 .to("mock:per-fragment")
1260 .end_split()
1261 .build()
1262 .unwrap();
1263
1264 assert_eq!(definition.steps().len(), 1);
1266 match &definition.steps()[0] {
1267 BuilderStep::Split { steps, .. } => {
1268 assert_eq!(steps.len(), 2); }
1270 other => panic!("Expected Split, got {:?}", other),
1271 }
1272 }
1273
1274 #[test]
1275 fn test_split_builder_config_propagated() {
1276 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1277
1278 let definition = RouteBuilder::from("timer:test?period=1000")
1279 .route_id("test-route")
1280 .split(
1281 SplitterConfig::new(split_body_lines())
1282 .parallel(true)
1283 .parallel_limit(4)
1284 .aggregation(AggregationStrategy::CollectAll),
1285 )
1286 .to("mock:per-fragment")
1287 .end_split()
1288 .build()
1289 .unwrap();
1290
1291 match &definition.steps()[0] {
1292 BuilderStep::Split { config, .. } => {
1293 assert!(config.parallel);
1294 assert_eq!(config.parallel_limit, Some(4));
1295 assert!(matches!(
1296 config.aggregation,
1297 AggregationStrategy::CollectAll
1298 ));
1299 }
1300 other => panic!("Expected Split, got {:?}", other),
1301 }
1302 }
1303
1304 #[test]
1305 fn test_aggregate_builder_adds_step() {
1306 use camel_api::aggregator::AggregatorConfig;
1307 use camel_core::route::BuilderStep;
1308
1309 let definition = RouteBuilder::from("timer:tick")
1310 .route_id("test-route")
1311 .aggregate(
1312 AggregatorConfig::correlate_by("key")
1313 .complete_when_size(2)
1314 .build(),
1315 )
1316 .build()
1317 .unwrap();
1318
1319 assert_eq!(definition.steps().len(), 1);
1320 assert!(matches!(
1321 definition.steps()[0],
1322 BuilderStep::Aggregate { .. }
1323 ));
1324 }
1325
1326 #[test]
1327 fn test_aggregate_in_split_builder() {
1328 use camel_api::aggregator::AggregatorConfig;
1329 use camel_api::splitter::{SplitterConfig, split_body_lines};
1330 use camel_core::route::BuilderStep;
1331
1332 let definition = RouteBuilder::from("timer:tick")
1333 .route_id("test-route")
1334 .split(SplitterConfig::new(split_body_lines()))
1335 .aggregate(
1336 AggregatorConfig::correlate_by("key")
1337 .complete_when_size(1)
1338 .build(),
1339 )
1340 .end_split()
1341 .build()
1342 .unwrap();
1343
1344 assert_eq!(definition.steps().len(), 1);
1345 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
1346 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
1347 } else {
1348 panic!("expected Split step");
1349 }
1350 }
1351
1352 #[test]
1355 fn test_builder_set_body_static_adds_processor() {
1356 let definition = RouteBuilder::from("timer:tick")
1357 .route_id("test-route")
1358 .set_body("fixed")
1359 .build()
1360 .unwrap();
1361 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1362 }
1363
1364 #[test]
1365 fn test_builder_set_body_fn_adds_processor() {
1366 let definition = RouteBuilder::from("timer:tick")
1367 .route_id("test-route")
1368 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
1369 .build()
1370 .unwrap();
1371 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1372 }
1373
1374 #[test]
1375 fn test_builder_set_header_fn_adds_processor() {
1376 let definition = RouteBuilder::from("timer:tick")
1377 .route_id("test-route")
1378 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
1379 .build()
1380 .unwrap();
1381 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1382 }
1383
1384 #[tokio::test]
1385 async fn test_set_body_static_processor_works() {
1386 use camel_core::route::compose_pipeline;
1387 let def = RouteBuilder::from("t:t")
1388 .route_id("test-route")
1389 .set_body("replaced")
1390 .build()
1391 .unwrap();
1392 let pipeline = compose_pipeline(
1393 def.steps()
1394 .iter()
1395 .filter_map(|s| {
1396 if let BuilderStep::Processor(p) = s {
1397 Some(p.clone())
1398 } else {
1399 None
1400 }
1401 })
1402 .collect(),
1403 );
1404 let exchange = Exchange::new(Message::new("original"));
1405 let result = pipeline.oneshot(exchange).await.unwrap();
1406 assert_eq!(result.input.body.as_text(), Some("replaced"));
1407 }
1408
1409 #[tokio::test]
1410 async fn test_set_body_fn_processor_works() {
1411 use camel_core::route::compose_pipeline;
1412 let def = RouteBuilder::from("t:t")
1413 .route_id("test-route")
1414 .set_body_fn(|ex: &Exchange| {
1415 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
1416 })
1417 .build()
1418 .unwrap();
1419 let pipeline = compose_pipeline(
1420 def.steps()
1421 .iter()
1422 .filter_map(|s| {
1423 if let BuilderStep::Processor(p) = s {
1424 Some(p.clone())
1425 } else {
1426 None
1427 }
1428 })
1429 .collect(),
1430 );
1431 let exchange = Exchange::new(Message::new("hello"));
1432 let result = pipeline.oneshot(exchange).await.unwrap();
1433 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1434 }
1435
1436 #[tokio::test]
1437 async fn test_set_header_fn_processor_works() {
1438 use camel_core::route::compose_pipeline;
1439 let def = RouteBuilder::from("t:t")
1440 .route_id("test-route")
1441 .set_header_fn("echo", |ex: &Exchange| {
1442 ex.input
1443 .body
1444 .as_text()
1445 .map(|t| Value::String(t.into()))
1446 .unwrap_or(Value::Null)
1447 })
1448 .build()
1449 .unwrap();
1450 let pipeline = compose_pipeline(
1451 def.steps()
1452 .iter()
1453 .filter_map(|s| {
1454 if let BuilderStep::Processor(p) = s {
1455 Some(p.clone())
1456 } else {
1457 None
1458 }
1459 })
1460 .collect(),
1461 );
1462 let exchange = Exchange::new(Message::new("ping"));
1463 let result = pipeline.oneshot(exchange).await.unwrap();
1464 assert_eq!(
1465 result.input.header("echo"),
1466 Some(&Value::String("ping".into()))
1467 );
1468 }
1469
1470 #[test]
1473 fn test_filter_builder_typestate() {
1474 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1475 .route_id("test-route")
1476 .filter(|_ex| true)
1477 .to("mock:inner")
1478 .end_filter()
1479 .to("mock:outer")
1480 .build();
1481 assert!(result.is_ok());
1482 }
1483
1484 #[test]
1485 fn test_filter_builder_steps_collected() {
1486 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1487 .route_id("test-route")
1488 .filter(|_ex| true)
1489 .to("mock:inner")
1490 .end_filter()
1491 .build()
1492 .unwrap();
1493
1494 assert_eq!(definition.steps().len(), 1);
1495 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1496 }
1497
1498 #[test]
1499 fn test_wire_tap_builder_adds_step() {
1500 let definition = RouteBuilder::from("timer:tick")
1501 .route_id("test-route")
1502 .wire_tap("mock:tap")
1503 .to("mock:result")
1504 .build()
1505 .unwrap();
1506
1507 assert_eq!(definition.steps().len(), 2);
1508 assert!(
1509 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
1510 );
1511 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
1512 }
1513
1514 #[test]
1517 fn test_multicast_builder_typestate() {
1518 let definition = RouteBuilder::from("timer:tick")
1519 .route_id("test-route")
1520 .multicast()
1521 .to("direct:a")
1522 .to("direct:b")
1523 .end_multicast()
1524 .to("mock:result")
1525 .build()
1526 .unwrap();
1527
1528 assert_eq!(definition.steps().len(), 2); }
1530
1531 #[test]
1532 fn test_multicast_builder_steps_collected() {
1533 let definition = RouteBuilder::from("timer:tick")
1534 .route_id("test-route")
1535 .multicast()
1536 .to("direct:a")
1537 .to("direct:b")
1538 .end_multicast()
1539 .build()
1540 .unwrap();
1541
1542 match &definition.steps()[0] {
1543 BuilderStep::Multicast { steps, .. } => {
1544 assert_eq!(steps.len(), 2);
1545 }
1546 other => panic!("Expected Multicast, got {:?}", other),
1547 }
1548 }
1549
1550 #[test]
1553 fn test_builder_concurrent_sets_concurrency() {
1554 use camel_component::ConcurrencyModel;
1555
1556 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1557 .route_id("test-route")
1558 .concurrent(16)
1559 .to("log:info")
1560 .build()
1561 .unwrap();
1562
1563 assert_eq!(
1564 definition.concurrency_override(),
1565 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
1566 );
1567 }
1568
1569 #[test]
1570 fn test_builder_concurrent_zero_means_unbounded() {
1571 use camel_component::ConcurrencyModel;
1572
1573 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1574 .route_id("test-route")
1575 .concurrent(0)
1576 .to("log:info")
1577 .build()
1578 .unwrap();
1579
1580 assert_eq!(
1581 definition.concurrency_override(),
1582 Some(&ConcurrencyModel::Concurrent { max: None })
1583 );
1584 }
1585
1586 #[test]
1587 fn test_builder_sequential_sets_concurrency() {
1588 use camel_component::ConcurrencyModel;
1589
1590 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1591 .route_id("test-route")
1592 .sequential()
1593 .to("log:info")
1594 .build()
1595 .unwrap();
1596
1597 assert_eq!(
1598 definition.concurrency_override(),
1599 Some(&ConcurrencyModel::Sequential)
1600 );
1601 }
1602
1603 #[test]
1604 fn test_builder_default_concurrency_is_none() {
1605 let definition = RouteBuilder::from("timer:tick")
1606 .route_id("test-route")
1607 .to("log:info")
1608 .build()
1609 .unwrap();
1610
1611 assert_eq!(definition.concurrency_override(), None);
1612 }
1613
1614 #[test]
1617 fn test_builder_route_id_sets_id() {
1618 let definition = RouteBuilder::from("timer:tick")
1619 .route_id("my-route")
1620 .build()
1621 .unwrap();
1622
1623 assert_eq!(definition.route_id(), "my-route");
1624 }
1625
1626 #[test]
1627 fn test_build_without_route_id_fails() {
1628 let result = RouteBuilder::from("timer:tick?period=1000")
1629 .to("log:info")
1630 .build();
1631 let err = match result {
1632 Err(e) => e.to_string(),
1633 Ok(_) => panic!("build() should fail without route_id"),
1634 };
1635 assert!(
1636 err.contains("route_id"),
1637 "error should mention route_id, got: {}",
1638 err
1639 );
1640 }
1641
1642 #[test]
1643 fn test_builder_auto_startup_false() {
1644 let definition = RouteBuilder::from("timer:tick")
1645 .route_id("test-route")
1646 .auto_startup(false)
1647 .build()
1648 .unwrap();
1649
1650 assert!(!definition.auto_startup());
1651 }
1652
1653 #[test]
1654 fn test_builder_startup_order_custom() {
1655 let definition = RouteBuilder::from("timer:tick")
1656 .route_id("test-route")
1657 .startup_order(50)
1658 .build()
1659 .unwrap();
1660
1661 assert_eq!(definition.startup_order(), 50);
1662 }
1663
1664 #[test]
1665 fn test_builder_defaults() {
1666 let definition = RouteBuilder::from("timer:tick")
1667 .route_id("test-route")
1668 .build()
1669 .unwrap();
1670
1671 assert_eq!(definition.route_id(), "test-route");
1672 assert!(definition.auto_startup());
1673 assert_eq!(definition.startup_order(), 1000);
1674 }
1675
1676 #[test]
1679 fn test_choice_builder_single_when() {
1680 let definition = RouteBuilder::from("timer:tick")
1681 .route_id("test-route")
1682 .choice()
1683 .when(|ex: &Exchange| ex.input.header("type").is_some())
1684 .to("mock:typed")
1685 .end_when()
1686 .end_choice()
1687 .build()
1688 .unwrap();
1689 assert_eq!(definition.steps().len(), 1);
1690 assert!(
1691 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
1692 if whens.len() == 1 && otherwise.is_none())
1693 );
1694 }
1695
1696 #[test]
1697 fn test_choice_builder_when_otherwise() {
1698 let definition = RouteBuilder::from("timer:tick")
1699 .route_id("test-route")
1700 .choice()
1701 .when(|ex: &Exchange| ex.input.header("a").is_some())
1702 .to("mock:a")
1703 .end_when()
1704 .otherwise()
1705 .to("mock:fallback")
1706 .end_otherwise()
1707 .end_choice()
1708 .build()
1709 .unwrap();
1710 assert!(
1711 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
1712 if whens.len() == 1 && otherwise.is_some())
1713 );
1714 }
1715
1716 #[test]
1717 fn test_choice_builder_multiple_whens() {
1718 let definition = RouteBuilder::from("timer:tick")
1719 .route_id("test-route")
1720 .choice()
1721 .when(|ex: &Exchange| ex.input.header("a").is_some())
1722 .to("mock:a")
1723 .end_when()
1724 .when(|ex: &Exchange| ex.input.header("b").is_some())
1725 .to("mock:b")
1726 .end_when()
1727 .end_choice()
1728 .build()
1729 .unwrap();
1730 assert!(
1731 matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
1732 if whens.len() == 2)
1733 );
1734 }
1735
1736 #[test]
1737 fn test_choice_step_after_choice() {
1738 let definition = RouteBuilder::from("timer:tick")
1740 .route_id("test-route")
1741 .choice()
1742 .when(|_ex: &Exchange| true)
1743 .to("mock:inner")
1744 .end_when()
1745 .end_choice()
1746 .to("mock:outer") .build()
1748 .unwrap();
1749 assert_eq!(definition.steps().len(), 2);
1750 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
1751 }
1752
1753 #[test]
1756 fn test_throttle_builder_typestate() {
1757 let definition = RouteBuilder::from("timer:tick")
1758 .route_id("test-route")
1759 .throttle(10, std::time::Duration::from_secs(1))
1760 .to("mock:result")
1761 .end_throttle()
1762 .build()
1763 .unwrap();
1764
1765 assert_eq!(definition.steps().len(), 1);
1766 assert!(matches!(
1767 &definition.steps()[0],
1768 BuilderStep::Throttle { .. }
1769 ));
1770 }
1771
1772 #[test]
1773 fn test_throttle_builder_with_strategy() {
1774 let definition = RouteBuilder::from("timer:tick")
1775 .route_id("test-route")
1776 .throttle(10, std::time::Duration::from_secs(1))
1777 .strategy(ThrottleStrategy::Reject)
1778 .to("mock:result")
1779 .end_throttle()
1780 .build()
1781 .unwrap();
1782
1783 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
1784 assert_eq!(config.strategy, ThrottleStrategy::Reject);
1785 } else {
1786 panic!("Expected Throttle step");
1787 }
1788 }
1789
1790 #[test]
1791 fn test_throttle_builder_steps_collected() {
1792 let definition = RouteBuilder::from("timer:tick")
1793 .route_id("test-route")
1794 .throttle(5, std::time::Duration::from_secs(1))
1795 .set_header("throttled", Value::Bool(true))
1796 .to("mock:throttled")
1797 .end_throttle()
1798 .build()
1799 .unwrap();
1800
1801 match &definition.steps()[0] {
1802 BuilderStep::Throttle { steps, .. } => {
1803 assert_eq!(steps.len(), 2); }
1805 other => panic!("Expected Throttle, got {:?}", other),
1806 }
1807 }
1808
1809 #[test]
1810 fn test_throttle_step_after_throttle() {
1811 let definition = RouteBuilder::from("timer:tick")
1813 .route_id("test-route")
1814 .throttle(10, std::time::Duration::from_secs(1))
1815 .to("mock:inner")
1816 .end_throttle()
1817 .to("mock:outer")
1818 .build()
1819 .unwrap();
1820
1821 assert_eq!(definition.steps().len(), 2);
1822 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
1823 }
1824
1825 #[test]
1828 fn test_load_balance_builder_typestate() {
1829 let definition = RouteBuilder::from("timer:tick")
1830 .route_id("test-route")
1831 .load_balance()
1832 .round_robin()
1833 .to("mock:a")
1834 .to("mock:b")
1835 .end_load_balance()
1836 .build()
1837 .unwrap();
1838
1839 assert_eq!(definition.steps().len(), 1);
1840 assert!(matches!(
1841 &definition.steps()[0],
1842 BuilderStep::LoadBalance { .. }
1843 ));
1844 }
1845
1846 #[test]
1847 fn test_load_balance_builder_with_strategy() {
1848 let definition = RouteBuilder::from("timer:tick")
1849 .route_id("test-route")
1850 .load_balance()
1851 .random()
1852 .to("mock:result")
1853 .end_load_balance()
1854 .build()
1855 .unwrap();
1856
1857 if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
1858 assert_eq!(config.strategy, LoadBalanceStrategy::Random);
1859 } else {
1860 panic!("Expected LoadBalance step");
1861 }
1862 }
1863
1864 #[test]
1865 fn test_load_balance_builder_steps_collected() {
1866 let definition = RouteBuilder::from("timer:tick")
1867 .route_id("test-route")
1868 .load_balance()
1869 .set_header("lb", Value::Bool(true))
1870 .to("mock:a")
1871 .end_load_balance()
1872 .build()
1873 .unwrap();
1874
1875 match &definition.steps()[0] {
1876 BuilderStep::LoadBalance { steps, .. } => {
1877 assert_eq!(steps.len(), 2); }
1879 other => panic!("Expected LoadBalance, got {:?}", other),
1880 }
1881 }
1882
1883 #[test]
1884 fn test_load_balance_step_after_load_balance() {
1885 let definition = RouteBuilder::from("timer:tick")
1887 .route_id("test-route")
1888 .load_balance()
1889 .to("mock:inner")
1890 .end_load_balance()
1891 .to("mock:outer")
1892 .build()
1893 .unwrap();
1894
1895 assert_eq!(definition.steps().len(), 2);
1896 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
1897 }
1898
1899 #[test]
1902 fn test_dynamic_router_builder() {
1903 let definition = RouteBuilder::from("timer:tick")
1904 .route_id("test-route")
1905 .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
1906 .build()
1907 .unwrap();
1908
1909 assert_eq!(definition.steps().len(), 1);
1910 assert!(matches!(
1911 &definition.steps()[0],
1912 BuilderStep::DynamicRouter { .. }
1913 ));
1914 }
1915
1916 #[test]
1917 fn test_dynamic_router_builder_with_config() {
1918 let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
1919 .max_iterations(100)
1920 .cache_size(500);
1921
1922 let definition = RouteBuilder::from("timer:tick")
1923 .route_id("test-route")
1924 .dynamic_router_with_config(config)
1925 .build()
1926 .unwrap();
1927
1928 assert_eq!(definition.steps().len(), 1);
1929 if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
1930 assert_eq!(config.max_iterations, 100);
1931 assert_eq!(config.cache_size, 500);
1932 } else {
1933 panic!("Expected DynamicRouter step");
1934 }
1935 }
1936
1937 #[test]
1938 fn test_dynamic_router_step_after_router() {
1939 let definition = RouteBuilder::from("timer:tick")
1941 .route_id("test-route")
1942 .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
1943 .to("mock:outer")
1944 .build()
1945 .unwrap();
1946
1947 assert_eq!(definition.steps().len(), 2);
1948 assert!(matches!(
1949 &definition.steps()[0],
1950 BuilderStep::DynamicRouter { .. }
1951 ));
1952 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
1953 }
1954}