1use camel_api::DelayConfig;
7use camel_api::aggregator::{
8 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
9};
10use camel_api::body::Body;
11use camel_api::body_converter::BodyType;
12use camel_api::circuit_breaker::CircuitBreakerConfig;
13use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
14use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
15use camel_api::load_balancer::LoadBalancerConfig;
16use camel_api::loop_eip::{LoopConfig, LoopMode};
17use camel_api::multicast::{MulticastConfig, MulticastStrategy};
18use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
19use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
20use camel_api::splitter::SplitterConfig;
21use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
22use camel_api::{
23 BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
24 ProcessorFn, Value,
25 runtime::{
26 CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
27 CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
28 CanonicalWhenSpec,
29 },
30};
31use camel_component_api::ConcurrencyModel;
32use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
33use camel_processor::{
34 ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
35 StreamCacheService, UnmarshalService, builtin_data_format,
36};
37
38pub mod do_try;
40pub use do_try::{DoCatchBuilder, DoFinallyBuilder, DoTryBuilder};
41
42pub trait StepAccumulator: Sized {
48 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
49
50 fn to(mut self, endpoint: impl Into<String>) -> Self {
51 self.steps_mut().push(BuilderStep::To(endpoint.into()));
52 self
53 }
54
55 fn process<F, Fut>(mut self, f: F) -> Self
56 where
57 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
58 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
59 {
60 let svc = ProcessorFn::new(f);
61 self.steps_mut()
62 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
63 self
64 }
65
66 fn process_fn(mut self, processor: BoxProcessor) -> Self {
67 self.steps_mut().push(BuilderStep::Processor(processor));
68 self
69 }
70
71 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
72 let svc = SetHeader::new(IdentityProcessor, key, value);
73 self.steps_mut()
74 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
75 self
76 }
77
78 fn map_body<F>(mut self, mapper: F) -> Self
79 where
80 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
81 {
82 let svc = MapBody::new(IdentityProcessor, mapper);
83 self.steps_mut()
84 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
85 self
86 }
87
88 fn set_body<B>(mut self, body: B) -> Self
89 where
90 B: Into<Body> + Clone + Send + Sync + 'static,
91 {
92 let body: Body = body.into();
93 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
94 self.steps_mut()
95 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
96 self
97 }
98
99 fn transform<B>(self, body: B) -> Self
104 where
105 B: Into<Body> + Clone + Send + Sync + 'static,
106 {
107 self.set_body(body)
108 }
109
110 fn set_body_fn<F>(mut self, expr: F) -> Self
111 where
112 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
113 {
114 let svc = SetBody::new(IdentityProcessor, expr);
115 self.steps_mut()
116 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
117 self
118 }
119
120 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
121 where
122 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
123 {
124 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
125 self.steps_mut()
126 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
127 self
128 }
129
130 fn aggregate(mut self, config: AggregatorConfig) -> Self {
131 self.steps_mut().push(BuilderStep::Aggregate { config });
132 self
133 }
134
135 fn stop(mut self) -> Self {
141 self.steps_mut().push(BuilderStep::Stop);
142 self
143 }
144
145 fn delay(mut self, duration: std::time::Duration) -> Self {
146 self.steps_mut().push(BuilderStep::Delay {
147 config: DelayConfig::from_duration(duration),
148 });
149 self
150 }
151
152 fn delay_with_header(
153 mut self,
154 duration: std::time::Duration,
155 header: impl Into<String>,
156 ) -> Self {
157 self.steps_mut().push(BuilderStep::Delay {
158 config: DelayConfig::from_duration_with_header(duration, header),
159 });
160 self
161 }
162
163 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
167 self.steps_mut().push(BuilderStep::Log {
168 level,
169 message: message.into(),
170 });
171 self
172 }
173
174 fn convert_body_to(mut self, target: BodyType) -> Self {
186 let svc = ConvertBodyTo::new(IdentityProcessor, target);
187 self.steps_mut()
188 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
189 self
190 }
191
192 fn stream_cache(mut self, threshold: usize) -> Self {
193 let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
194 let svc = StreamCacheService::new(IdentityProcessor, config);
195 self.steps_mut()
196 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
197 self
198 }
199
200 fn stream_cache_default(self) -> Self {
204 self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
205 }
206
207 fn marshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
218 let name = format.into();
219 let df = builtin_data_format(&name)
220 .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
221 let svc = MarshalService::new(IdentityProcessor, df);
222 self.steps_mut()
223 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
224 Ok(self)
225 }
226
227 fn unmarshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
238 let name = format.into();
239 let df = builtin_data_format(&name)
240 .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
241 let svc = UnmarshalService::new(IdentityProcessor, df);
242 self.steps_mut()
243 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
244 Ok(self)
245 }
246
247 fn validate(mut self, schema_path: impl Into<String>) -> Self {
256 let path = schema_path.into();
257 let uri = if path.starts_with("validator:") {
258 path
259 } else {
260 format!("validator:{path}")
261 };
262 self.steps_mut().push(BuilderStep::To(uri));
263 self
264 }
265
266 fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
277 self.steps_mut().push(BuilderStep::Script {
278 language: language.into(),
279 script: script.into(),
280 });
281 self
282 }
283
284 fn enrich(mut self, uri: impl Into<String>) -> Self {
290 self.steps_mut().push(BuilderStep::Enrich {
291 uri: uri.into(),
292 strategy: None,
293 timeout_ms: None,
294 });
295 self
296 }
297
298 fn poll_enrich(mut self, uri: impl Into<String>, timeout_ms: u64) -> Self {
304 self.steps_mut().push(BuilderStep::PollEnrich {
305 uri: uri.into(),
306 strategy: None,
307 timeout_ms: Some(timeout_ms),
308 });
309 self
310 }
311
312 fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
313 self.steps_mut().push(BuilderStep::Bean {
314 name: name.into(),
315 method: method.into(),
316 });
317 self
318 }
319}
320
321pub struct RouteBuilder {
336 from_uri: String,
337 steps: Vec<BuilderStep>,
338 error_handler: Option<ErrorHandlerConfig>,
339 error_handler_mode: ErrorHandlerMode,
340 circuit_breaker_config: Option<CircuitBreakerConfig>,
341 security_policy_config: Option<camel_api::security_policy::SecurityPolicyConfig>,
342 security_authenticator: Option<std::sync::Arc<dyn camel_auth::TokenAuthenticator>>,
343 concurrency: Option<ConcurrencyModel>,
344 route_id: Option<String>,
345 auto_startup: Option<bool>,
346 startup_order: Option<i32>,
347}
348
349#[derive(Default)]
350enum ErrorHandlerMode {
351 #[default]
352 None,
353 ExplicitConfig,
354 Shorthand {
355 dlc_uri: Option<String>,
356 specs: Vec<OnExceptionSpec>,
357 },
358 Mixed,
359}
360
361#[derive(Clone)]
362struct OnExceptionSpec {
363 matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
364 retry: Option<RedeliveryPolicy>,
365 handled_by: Option<String>,
366}
367
368impl RouteBuilder {
369 pub fn from(endpoint: &str) -> Self {
371 Self {
372 from_uri: endpoint.to_string(),
373 steps: Vec::new(),
374 error_handler: None,
375 error_handler_mode: ErrorHandlerMode::None,
376 circuit_breaker_config: None,
377 security_policy_config: None,
378 security_authenticator: None,
379 concurrency: None,
380 route_id: None,
381 auto_startup: None,
382 startup_order: None,
383 }
384 }
385
386 pub fn filter<F>(self, predicate: F) -> FilterBuilder
390 where
391 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
392 {
393 FilterBuilder {
394 parent: self,
395 predicate: std::sync::Arc::new(predicate),
396 steps: vec![],
397 }
398 }
399
400 pub fn choice(self) -> ChoiceBuilder {
406 ChoiceBuilder {
407 parent: self,
408 whens: vec![],
409 _otherwise: None,
410 }
411 }
412
413 pub fn wire_tap(mut self, endpoint: &str) -> Self {
417 self.steps.push(BuilderStep::WireTap {
418 uri: endpoint.to_string(),
419 });
420 self
421 }
422
423 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
425 self.error_handler_mode = match self.error_handler_mode {
426 ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
427 ErrorHandlerMode::ExplicitConfig
428 }
429 ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
430 };
431 self.error_handler = Some(config);
432 self
433 }
434
435 pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
437 let uri = uri.into();
438 self.error_handler_mode = match self.error_handler_mode {
439 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
440 dlc_uri: Some(uri),
441 specs: Vec::new(),
442 },
443 ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
444 dlc_uri: Some(uri),
445 specs,
446 },
447 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
448 };
449 self
450 }
451
452 pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
454 where
455 F: Fn(&CamelError) -> bool + Send + Sync + 'static,
456 {
457 self.error_handler_mode = match self.error_handler_mode {
458 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
459 dlc_uri: None,
460 specs: Vec::new(),
461 },
462 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
463 shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
464 };
465
466 OnExceptionBuilder {
467 parent: self,
468 policy: OnExceptionSpec {
469 matches: std::sync::Arc::new(matches),
470 retry: None,
471 handled_by: None,
472 },
473 }
474 }
475
476 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
478 self.circuit_breaker_config = Some(config);
479 self
480 }
481
482 pub fn security_policy(
483 mut self,
484 config: camel_api::security_policy::SecurityPolicyConfig,
485 ) -> Self {
486 self.security_policy_config = Some(config);
487 self
488 }
489
490 pub fn security_authenticator(
491 mut self,
492 auth: std::sync::Arc<dyn camel_auth::TokenAuthenticator>,
493 ) -> Self {
494 self.security_authenticator = Some(auth);
495 self
496 }
497
498 pub fn concurrent(mut self, max: usize) -> Self {
512 let max = if max == 0 { None } else { Some(max) };
513 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
514 self
515 }
516
517 pub fn sequential(mut self) -> Self {
522 self.concurrency = Some(ConcurrencyModel::Sequential);
523 self
524 }
525
526 pub fn route_id(mut self, id: impl Into<String>) -> Self {
530 self.route_id = Some(id.into());
531 self
532 }
533
534 pub fn auto_startup(mut self, auto: bool) -> Self {
538 self.auto_startup = Some(auto);
539 self
540 }
541
542 pub fn startup_order(mut self, order: i32) -> Self {
546 self.startup_order = Some(order);
547 self
548 }
549
550 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
556 SplitBuilder {
557 parent: self,
558 config,
559 steps: Vec::new(),
560 }
561 }
562
563 pub fn multicast(self) -> MulticastBuilder {
569 MulticastBuilder {
570 parent: self,
571 steps: Vec::new(),
572 config: MulticastConfig::new(),
573 }
574 }
575
576 pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
583 ThrottleBuilder {
584 parent: self,
585 config: ThrottlerConfig::new(max_requests, period),
586 steps: Vec::new(),
587 }
588 }
589
590 pub fn loop_count(self, count: usize) -> LoopBuilder {
592 LoopBuilder {
593 parent: self,
594 config: LoopConfig {
595 mode: LoopMode::Count(count),
596 },
597 steps: vec![],
598 }
599 }
600
601 pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
603 where
604 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
605 {
606 LoopBuilder {
607 parent: self,
608 config: LoopConfig {
609 mode: LoopMode::While(std::sync::Arc::new(predicate)),
610 },
611 steps: vec![],
612 }
613 }
614
615 pub fn load_balance(self) -> LoadBalancerBuilder {
621 LoadBalancerBuilder {
622 parent: self,
623 config: LoadBalancerConfig::round_robin(),
624 steps: Vec::new(),
625 }
626 }
627
628 pub fn dynamic_router(self, expression: RouterExpression) -> Self {
644 self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
645 }
646
647 pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
651 self.steps.push(BuilderStep::DynamicRouter { config });
652 self
653 }
654
655 pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
656 self.routing_slip_with_config(RoutingSlipConfig::new(expression))
657 }
658
659 pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
660 self.steps.push(BuilderStep::RoutingSlip { config });
661 self
662 }
663
664 pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
665 self.recipient_list_with_config(RecipientListConfig::new(expression))
666 }
667
668 pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
669 self.steps.push(BuilderStep::RecipientList { config });
670 self
671 }
672
673 pub fn build(self) -> Result<RouteDefinition, CamelError> {
679 validate_uri(&self.from_uri)?;
680 let route_id = self
681 .route_id
682 .filter(|s| !s.trim().is_empty())
683 .ok_or_else(|| {
684 CamelError::RouteError(
685 "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
686 .to_string(),
687 )
688 })?;
689 let resolved_error_handler = match self.error_handler_mode {
690 ErrorHandlerMode::None => self.error_handler,
691 ErrorHandlerMode::ExplicitConfig => self.error_handler,
692 ErrorHandlerMode::Mixed => {
693 return Err(CamelError::RouteError(
694 "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
695 ));
696 }
697 ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
698 let mut config = if let Some(uri) = dlc_uri {
699 ErrorHandlerConfig::dead_letter_channel(uri)
700 } else {
701 ErrorHandlerConfig::log_only()
702 };
703
704 for spec in specs {
705 let matcher = spec.matches.clone();
706 let mut builder = config.on_exception(move |e| matcher(e));
707
708 if let Some(retry) = spec.retry {
709 builder = builder.retry(retry.max_attempts).with_backoff(
710 retry.initial_delay,
711 retry.multiplier,
712 retry.max_delay,
713 );
714 if retry.jitter_factor > 0.0 {
715 builder = builder.with_jitter(retry.jitter_factor);
716 }
717 }
718
719 if let Some(uri) = spec.handled_by {
720 builder = builder.handled_by(uri);
721 }
722
723 config = builder.build();
724 }
725
726 Some(config)
727 }
728 };
729
730 let definition = RouteDefinition::new(self.from_uri, self.steps);
731 let definition = if let Some(eh) = resolved_error_handler {
732 definition.with_error_handler(eh)
733 } else {
734 definition
735 };
736 let definition = if let Some(cb) = self.circuit_breaker_config {
737 definition.with_circuit_breaker(cb)
738 } else {
739 definition
740 };
741 let definition = if let Some(sp) = self.security_policy_config {
742 definition.with_security_policy(sp)
743 } else {
744 definition
745 };
746 let definition = if let Some(auth) = self.security_authenticator {
747 definition.with_security_authenticator(auth)
748 } else {
749 definition
750 };
751 let definition = if let Some(concurrency) = self.concurrency {
752 definition.with_concurrency(concurrency)
753 } else {
754 definition
755 };
756 let definition = definition.with_route_id(route_id);
757 let definition = if let Some(auto) = self.auto_startup {
758 definition.with_auto_startup(auto)
759 } else {
760 definition
761 };
762 let definition = if let Some(order) = self.startup_order {
763 definition.with_startup_order(order)
764 } else {
765 definition
766 };
767 Ok(definition)
768 }
769
770 pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
772 validate_uri(&self.from_uri)?;
773 let route_id = self
774 .route_id
775 .filter(|s| !s.trim().is_empty())
776 .ok_or_else(|| {
777 CamelError::RouteError(
778 "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
779 .to_string(),
780 )
781 })?;
782
783 let steps = canonicalize_steps(self.steps)?;
784 let circuit_breaker = self
785 .circuit_breaker_config
786 .map(canonicalize_circuit_breaker);
787
788 if self.security_policy_config.is_some() {
789 return Err(CamelError::RouteError(
790 "routes with security_policy cannot use the canonical/hot-reload path (not yet supported)"
791 .into(),
792 ));
793 }
794
795 let spec = CanonicalRouteSpec {
796 route_id,
797 from: self.from_uri,
798 steps,
799 circuit_breaker,
800 auto_startup: None,
801 startup_order: None,
802 concurrency: None,
803 version: camel_api::CANONICAL_CONTRACT_VERSION,
804 };
805 spec.validate_contract()?;
806 Ok(spec)
807 }
808}
809
810pub struct OnExceptionBuilder {
811 parent: RouteBuilder,
812 policy: OnExceptionSpec,
813}
814
815impl OnExceptionBuilder {
816 pub fn retry(mut self, max_attempts: u32) -> Self {
817 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
818 self
819 }
820
821 pub fn with_backoff(
822 mut self,
823 initial: std::time::Duration,
824 multiplier: f64,
825 max: std::time::Duration,
826 ) -> Self {
827 if let Some(ref mut retry) = self.policy.retry {
828 retry.initial_delay = initial;
829 retry.multiplier = multiplier;
830 retry.max_delay = max;
831 } else {
832 tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
833 }
834 self
835 }
836
837 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
838 if let Some(ref mut retry) = self.policy.retry {
839 retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
840 } else {
841 tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
842 }
843 self
844 }
845
846 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
847 self.policy.handled_by = Some(uri.into());
848 self
849 }
850
851 pub fn end_on_exception(mut self) -> RouteBuilder {
852 if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
853 specs.push(self.policy);
854 }
855 self.parent
856 }
857}
858
859fn validate_uri(uri: &str) -> Result<(), CamelError> {
861 let trimmed = uri.trim();
862 if trimmed.is_empty() {
863 return Err(CamelError::RouteError(
864 "route must have a 'from' URI".to_string(),
865 ));
866 }
867 if !trimmed.contains(':') {
868 return Err(CamelError::RouteError(
869 "URI must have a scheme (e.g. 'timer:tick')".to_string(),
870 ));
871 }
872 let scheme = trimmed.split(':').next().unwrap_or("");
873 if scheme.trim().is_empty() {
874 return Err(CamelError::RouteError(
875 "URI scheme must not be empty".to_string(),
876 ));
877 }
878 Ok(())
879}
880
881fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
882 let mut canonical = Vec::with_capacity(steps.len());
883 for step in steps {
884 canonical.push(canonicalize_step(step)?);
885 }
886 Ok(canonical)
887}
888
889fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
890 match step {
891 BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
892 BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
893 BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
894 BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
895 BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
896 delay_ms: config.delay_ms,
897 dynamic_header: config.dynamic_header,
898 }),
899 BuilderStep::DeclarativeScript { expression } => {
900 Ok(CanonicalStepSpec::Script { expression })
901 }
902 BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
903 predicate,
904 steps: canonicalize_steps(steps)?,
905 }),
906 BuilderStep::DeclarativeChoice { whens, otherwise } => {
907 let mut canonical_whens = Vec::with_capacity(whens.len());
908 for DeclarativeWhenStep { predicate, steps } in whens {
909 canonical_whens.push(CanonicalWhenSpec {
910 predicate,
911 steps: canonicalize_steps(steps)?,
912 });
913 }
914 let otherwise = match otherwise {
915 Some(steps) => Some(canonicalize_steps(steps)?),
916 None => None,
917 };
918 Ok(CanonicalStepSpec::Choice {
919 whens: canonical_whens,
920 otherwise,
921 })
922 }
923 BuilderStep::DeclarativeSplit {
924 expression,
925 aggregation,
926 parallel,
927 parallel_limit,
928 stop_on_exception,
929 steps,
930 } => Ok(CanonicalStepSpec::Split {
931 expression: CanonicalSplitExpressionSpec::Language(expression),
932 aggregation: canonicalize_split_aggregation(aggregation)?,
933 parallel,
934 parallel_limit,
935 stop_on_exception,
936 steps: canonicalize_steps(steps)?,
937 }),
938 BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate(
939 canonicalize_aggregate(config)?,
940 )),
941 other => {
942 let step_name = canonical_step_name(&other);
943 let detail = camel_api::canonical_contract_rejection_reason(step_name)
944 .unwrap_or("not included in canonical v1");
945 Err(CamelError::RouteError(format!(
946 "canonical v1 does not support step `{step_name}`: {detail}"
947 )))
948 }
949 }
950}
951
952fn canonicalize_split_aggregation(
953 strategy: camel_api::splitter::AggregationStrategy,
954) -> Result<CanonicalSplitAggregationSpec, CamelError> {
955 match strategy {
956 camel_api::splitter::AggregationStrategy::LastWins => {
957 Ok(CanonicalSplitAggregationSpec::LastWins)
958 }
959 camel_api::splitter::AggregationStrategy::CollectAll => {
960 Ok(CanonicalSplitAggregationSpec::CollectAll)
961 }
962 camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
963 "canonical v1 does not support custom split aggregation".to_string(),
964 )),
965 camel_api::splitter::AggregationStrategy::Original => {
966 Ok(CanonicalSplitAggregationSpec::Original)
967 }
968 }
969}
970
971fn extract_completion_fields(
972 mode: &CompletionMode,
973) -> Result<(Option<usize>, Option<u64>), CamelError> {
974 match mode {
975 CompletionMode::Single(cond) => match cond {
976 CompletionCondition::Size(n) => Ok((Some(*n), None)),
977 CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
978 CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
979 "canonical v1 does not support aggregate predicate completion".to_string(),
980 )),
981 },
982 CompletionMode::Any(conds) => {
983 let mut size = None;
984 let mut timeout_ms = None;
985 for cond in conds {
986 match cond {
987 CompletionCondition::Size(n) => size = Some(*n),
988 CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
989 CompletionCondition::Predicate(_) => {
990 return Err(CamelError::RouteError(
991 "canonical v1 does not support aggregate predicate completion"
992 .to_string(),
993 ));
994 }
995 }
996 }
997 Ok((size, timeout_ms))
998 }
999 }
1000}
1001
1002fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
1003 let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
1004
1005 let header = match &config.correlation {
1006 CorrelationStrategy::HeaderName(h) => h.clone(),
1007 CorrelationStrategy::Expression { expr, .. } => expr.clone(),
1008 CorrelationStrategy::Fn(_) => {
1009 return Err(CamelError::RouteError(
1010 "canonical v1 does not support Fn correlation strategy".to_string(),
1011 ));
1012 }
1013 };
1014
1015 let correlation_key = match &config.correlation {
1016 CorrelationStrategy::HeaderName(_) => None,
1017 CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
1018 CorrelationStrategy::Fn(_) => unreachable!(),
1019 };
1020
1021 let strategy = match config.strategy {
1022 AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
1023 AggregationStrategy::Custom(_) => {
1024 return Err(CamelError::RouteError(
1025 "canonical v1 does not support custom aggregate strategy".to_string(),
1026 ));
1027 }
1028 };
1029 let bucket_ttl_ms = config
1030 .bucket_ttl
1031 .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
1032
1033 Ok(CanonicalAggregateSpec {
1034 header,
1035 completion_size,
1036 completion_timeout_ms,
1037 correlation_key,
1038 force_completion_on_stop: if config.force_completion_on_stop {
1039 Some(true)
1040 } else {
1041 None
1042 },
1043 discard_on_timeout: if config.discard_on_timeout {
1044 Some(true)
1045 } else {
1046 None
1047 },
1048 strategy,
1049 max_buckets: config.max_buckets,
1050 bucket_ttl_ms,
1051 })
1052}
1053
1054fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
1055 CanonicalCircuitBreakerSpec {
1056 failure_threshold: config.failure_threshold,
1057 open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
1058 }
1059}
1060
1061fn canonical_step_name(step: &BuilderStep) -> &'static str {
1062 match step {
1063 BuilderStep::Processor(_) => "processor",
1064 BuilderStep::To(_) => "to",
1065 BuilderStep::Stop => "stop",
1066 BuilderStep::Log { .. } => "log",
1067 BuilderStep::DeclarativeSetHeader { .. } => "set_header",
1068 BuilderStep::DeclarativeSetBody { .. } => "set_body",
1069 BuilderStep::DeclarativeFilter { .. } => "filter",
1070 BuilderStep::DeclarativeChoice { .. } => "choice",
1071 BuilderStep::DeclarativeScript { .. } => "script",
1072 BuilderStep::DeclarativeFunction { .. } => "function",
1073 BuilderStep::DeclarativeSplit { .. } => "split",
1074 BuilderStep::Split { .. } => "split",
1075 BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
1076 BuilderStep::Aggregate { .. } => "aggregate",
1077 BuilderStep::Filter { .. } => "filter",
1078 BuilderStep::Choice { .. } => "choice",
1079 BuilderStep::WireTap { .. } => "wire_tap",
1080 BuilderStep::Delay { .. } => "delay",
1081 BuilderStep::Multicast { .. } => "multicast",
1082 BuilderStep::DeclarativeLog { .. } => "log",
1083 BuilderStep::Bean { .. } => "bean",
1084 BuilderStep::Script { .. } => "script",
1085 BuilderStep::Throttle { .. } => "throttle",
1086 BuilderStep::LoadBalance { .. } => "load_balancer",
1087 BuilderStep::DynamicRouter { .. } => "dynamic_router",
1088 BuilderStep::RoutingSlip { .. } => "routing_slip",
1089 BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
1090 BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
1091 BuilderStep::RecipientList { .. } => "recipient_list",
1092 BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
1093 BuilderStep::DeclarativeSetProperty { .. } => "set_property",
1094 BuilderStep::DeclarativeStreamSplit { .. } => "stream_split",
1095 BuilderStep::Enrich { .. } => "enrich",
1096 BuilderStep::PollEnrich { .. } => "poll_enrich",
1097 BuilderStep::DeclarativeDoTry { .. } => "do_try",
1098 }
1099}
1100
1101impl StepAccumulator for RouteBuilder {
1102 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1103 &mut self.steps
1104 }
1105}
1106
1107pub struct SplitBuilder {
1115 parent: RouteBuilder,
1116 config: SplitterConfig,
1117 steps: Vec<BuilderStep>,
1118}
1119
1120impl SplitBuilder {
1121 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1123 where
1124 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1125 {
1126 FilterInSplitBuilder {
1127 parent: self,
1128 predicate: std::sync::Arc::new(predicate),
1129 steps: vec![],
1130 }
1131 }
1132
1133 pub fn end_split(mut self) -> RouteBuilder {
1136 let split_step = BuilderStep::Split {
1137 config: self.config,
1138 steps: self.steps,
1139 };
1140 self.parent.steps.push(split_step);
1141 self.parent
1142 }
1143}
1144
1145impl StepAccumulator for SplitBuilder {
1146 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1147 &mut self.steps
1148 }
1149}
1150
1151pub struct FilterBuilder {
1153 parent: RouteBuilder,
1154 predicate: FilterPredicate,
1155 steps: Vec<BuilderStep>,
1156}
1157
1158impl FilterBuilder {
1159 pub fn end_filter(mut self) -> RouteBuilder {
1162 let step = BuilderStep::Filter {
1163 predicate: self.predicate,
1164 steps: self.steps,
1165 };
1166 self.parent.steps.push(step);
1167 self.parent
1168 }
1169}
1170
1171impl StepAccumulator for FilterBuilder {
1172 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1173 &mut self.steps
1174 }
1175}
1176
1177pub struct FilterInSplitBuilder {
1179 parent: SplitBuilder,
1180 predicate: FilterPredicate,
1181 steps: Vec<BuilderStep>,
1182}
1183
1184impl FilterInSplitBuilder {
1185 pub fn end_filter(mut self) -> SplitBuilder {
1187 let step = BuilderStep::Filter {
1188 predicate: self.predicate,
1189 steps: self.steps,
1190 };
1191 self.parent.steps.push(step);
1192 self.parent
1193 }
1194}
1195
1196impl StepAccumulator for FilterInSplitBuilder {
1197 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1198 &mut self.steps
1199 }
1200}
1201
1202pub struct ChoiceBuilder {
1209 parent: RouteBuilder,
1210 whens: Vec<WhenStep>,
1211 _otherwise: Option<Vec<BuilderStep>>,
1212}
1213
1214impl ChoiceBuilder {
1215 pub fn when<F>(self, predicate: F) -> WhenBuilder
1218 where
1219 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1220 {
1221 WhenBuilder {
1222 parent: self,
1223 predicate: std::sync::Arc::new(predicate),
1224 steps: vec![],
1225 }
1226 }
1227
1228 pub fn otherwise(self) -> OtherwiseBuilder {
1232 OtherwiseBuilder {
1233 parent: self,
1234 steps: vec![],
1235 }
1236 }
1237
1238 pub fn end_choice(mut self) -> RouteBuilder {
1242 let step = BuilderStep::Choice {
1243 whens: self.whens,
1244 otherwise: self._otherwise,
1245 };
1246 self.parent.steps.push(step);
1247 self.parent
1248 }
1249}
1250
1251pub struct WhenBuilder {
1253 parent: ChoiceBuilder,
1254 predicate: camel_api::FilterPredicate,
1255 steps: Vec<BuilderStep>,
1256}
1257
1258impl WhenBuilder {
1259 pub fn end_when(mut self) -> ChoiceBuilder {
1262 self.parent.whens.push(WhenStep {
1263 predicate: self.predicate,
1264 steps: self.steps,
1265 });
1266 self.parent
1267 }
1268}
1269
1270impl StepAccumulator for WhenBuilder {
1271 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1272 &mut self.steps
1273 }
1274}
1275
1276pub struct OtherwiseBuilder {
1278 parent: ChoiceBuilder,
1279 steps: Vec<BuilderStep>,
1280}
1281
1282impl OtherwiseBuilder {
1283 pub fn end_otherwise(self) -> ChoiceBuilder {
1285 let OtherwiseBuilder { mut parent, steps } = self;
1286 parent._otherwise = Some(steps);
1287 parent
1288 }
1289}
1290
1291impl StepAccumulator for OtherwiseBuilder {
1292 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1293 &mut self.steps
1294 }
1295}
1296
1297pub struct MulticastBuilder {
1305 parent: RouteBuilder,
1306 steps: Vec<BuilderStep>,
1307 config: MulticastConfig,
1308}
1309
1310impl MulticastBuilder {
1311 pub fn parallel(mut self, parallel: bool) -> Self {
1312 self.config = self.config.parallel(parallel);
1313 self
1314 }
1315
1316 pub fn parallel_limit(mut self, limit: usize) -> Self {
1317 self.config = self.config.parallel_limit(limit);
1318 self
1319 }
1320
1321 pub fn stop_on_exception(mut self, stop: bool) -> Self {
1322 self.config = self.config.stop_on_exception(stop);
1323 self
1324 }
1325
1326 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1327 self.config = self.config.timeout(duration);
1328 self
1329 }
1330
1331 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1332 self.config = self.config.aggregation(strategy);
1333 self
1334 }
1335
1336 pub fn end_multicast(mut self) -> RouteBuilder {
1337 let step = BuilderStep::Multicast {
1338 steps: self.steps,
1339 config: self.config,
1340 };
1341 self.parent.steps.push(step);
1342 self.parent
1343 }
1344}
1345
1346impl StepAccumulator for MulticastBuilder {
1347 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1348 &mut self.steps
1349 }
1350}
1351
1352pub struct ThrottleBuilder {
1360 parent: RouteBuilder,
1361 config: ThrottlerConfig,
1362 steps: Vec<BuilderStep>,
1363}
1364
1365impl ThrottleBuilder {
1366 pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1372 self.config = self.config.strategy(strategy);
1373 self
1374 }
1375
1376 pub fn end_throttle(mut self) -> RouteBuilder {
1379 let step = BuilderStep::Throttle {
1380 config: self.config,
1381 steps: self.steps,
1382 };
1383 self.parent.steps.push(step);
1384 self.parent
1385 }
1386}
1387
1388impl StepAccumulator for ThrottleBuilder {
1389 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1390 &mut self.steps
1391 }
1392}
1393
1394pub struct LoopBuilder {
1396 parent: RouteBuilder,
1397 config: LoopConfig,
1398 steps: Vec<BuilderStep>,
1399}
1400
1401impl LoopBuilder {
1402 pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1403 LoopInLoopBuilder {
1404 parent: self,
1405 config: LoopConfig {
1406 mode: LoopMode::Count(count),
1407 },
1408 steps: vec![],
1409 }
1410 }
1411
1412 pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1413 where
1414 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1415 {
1416 LoopInLoopBuilder {
1417 parent: self,
1418 config: LoopConfig {
1419 mode: LoopMode::While(std::sync::Arc::new(predicate)),
1420 },
1421 steps: vec![],
1422 }
1423 }
1424
1425 pub fn end_loop(mut self) -> RouteBuilder {
1426 let step = BuilderStep::Loop {
1427 config: self.config,
1428 steps: self.steps,
1429 };
1430 self.parent.steps.push(step);
1431 self.parent
1432 }
1433}
1434
1435impl StepAccumulator for LoopBuilder {
1436 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1437 &mut self.steps
1438 }
1439}
1440
1441pub struct LoopInLoopBuilder {
1442 parent: LoopBuilder,
1443 config: LoopConfig,
1444 steps: Vec<BuilderStep>,
1445}
1446
1447impl LoopInLoopBuilder {
1448 pub fn end_loop(mut self) -> LoopBuilder {
1449 let step = BuilderStep::Loop {
1450 config: self.config,
1451 steps: self.steps,
1452 };
1453 self.parent.steps.push(step);
1454 self.parent
1455 }
1456}
1457
1458impl StepAccumulator for LoopInLoopBuilder {
1459 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1460 &mut self.steps
1461 }
1462}
1463
1464pub struct LoadBalancerBuilder {
1472 parent: RouteBuilder,
1473 config: LoadBalancerConfig,
1474 steps: Vec<BuilderStep>,
1475}
1476
1477impl LoadBalancerBuilder {
1478 pub fn round_robin(mut self) -> Self {
1480 self.config = LoadBalancerConfig::round_robin();
1481 self
1482 }
1483
1484 pub fn random(mut self) -> Self {
1486 self.config = LoadBalancerConfig::random();
1487 self
1488 }
1489
1490 pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1495 self.config = LoadBalancerConfig::weighted(weights);
1496 self
1497 }
1498
1499 pub fn failover(mut self) -> Self {
1504 self.config = LoadBalancerConfig::failover();
1505 self
1506 }
1507
1508 pub fn parallel(mut self, parallel: bool) -> Self {
1513 self.config = self.config.parallel(parallel);
1514 self
1515 }
1516
1517 pub fn end_load_balance(mut self) -> RouteBuilder {
1520 let step = BuilderStep::LoadBalance {
1521 config: self.config,
1522 steps: self.steps,
1523 };
1524 self.parent.steps.push(step);
1525 self.parent
1526 }
1527}
1528
1529impl StepAccumulator for LoadBalancerBuilder {
1530 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1531 &mut self.steps
1532 }
1533}
1534
1535#[cfg(test)]
1540mod tests {
1541 use super::*;
1542 use camel_api::error_handler::ErrorHandlerConfig;
1543 use camel_api::load_balancer::LoadBalanceStrategy;
1544 use camel_api::{Exchange, Message};
1545 use camel_core::route::BuilderStep;
1546 use std::sync::Arc;
1547 use std::time::Duration;
1548 use tower::{Service, ServiceExt};
1549
1550 #[test]
1551 fn test_builder_from_creates_definition() {
1552 let definition = RouteBuilder::from("timer:tick")
1553 .route_id("test-route")
1554 .build()
1555 .unwrap();
1556 assert_eq!(definition.from_uri(), "timer:tick");
1557 }
1558
1559 #[test]
1560 fn test_builder_empty_from_uri_errors() {
1561 let result = RouteBuilder::from("").route_id("test-route").build();
1562 assert!(result.is_err());
1563 }
1564
1565 #[test]
1566 fn test_build_rejects_schemeless_uri() {
1567 let result = RouteBuilder::from("no-scheme-here")
1568 .route_id("test-route")
1569 .build();
1570 match result {
1571 Err(err) => {
1572 let err_msg = format!("{err}");
1573 assert!(
1574 err_msg.contains("scheme"),
1575 "expected scheme-related error, got: {err_msg}"
1576 );
1577 }
1578 Ok(_) => panic!("schemeless URI should fail"),
1579 }
1580 }
1581
1582 #[test]
1583 fn test_build_rejects_empty_scheme_uri() {
1584 let result = RouteBuilder::from(":missing-scheme")
1585 .route_id("test-route")
1586 .build();
1587 match result {
1588 Err(err) => {
1589 let err_msg = format!("{err}");
1590 assert!(
1591 err_msg.contains("scheme"),
1592 "expected scheme-related error, got: {err_msg}"
1593 );
1594 }
1595 Ok(_) => panic!("empty-scheme URI should fail"),
1596 }
1597 }
1598
1599 #[test]
1600 fn test_build_accepts_valid_uri() {
1601 let result = RouteBuilder::from("timer:tick")
1602 .route_id("test-route")
1603 .build();
1604 assert!(result.is_ok());
1605 }
1606
1607 #[test]
1608 fn test_build_canonical_rejects_schemeless_uri() {
1609 let result = RouteBuilder::from("no-scheme-here")
1610 .route_id("test-route")
1611 .build_canonical();
1612 assert!(result.is_err());
1613 }
1614
1615 #[test]
1616 fn test_builder_to_adds_step() {
1617 let definition = RouteBuilder::from("timer:tick")
1618 .route_id("test-route")
1619 .to("log:info")
1620 .build()
1621 .unwrap();
1622
1623 assert_eq!(definition.from_uri(), "timer:tick");
1624 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1626 }
1627
1628 #[test]
1629 fn test_builder_filter_adds_filter_step() {
1630 let definition = RouteBuilder::from("timer:tick")
1631 .route_id("test-route")
1632 .filter(|_ex| true)
1633 .to("mock:result")
1634 .end_filter()
1635 .build()
1636 .unwrap();
1637
1638 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1639 }
1640
1641 #[test]
1642 fn test_builder_set_header_adds_processor_step() {
1643 let definition = RouteBuilder::from("timer:tick")
1644 .route_id("test-route")
1645 .set_header("key", Value::String("value".into()))
1646 .build()
1647 .unwrap();
1648
1649 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1650 }
1651
1652 #[test]
1653 fn test_builder_map_body_adds_processor_step() {
1654 let definition = RouteBuilder::from("timer:tick")
1655 .route_id("test-route")
1656 .map_body(|body| body)
1657 .build()
1658 .unwrap();
1659
1660 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1661 }
1662
1663 #[test]
1664 fn test_builder_process_adds_processor_step() {
1665 let definition = RouteBuilder::from("timer:tick")
1666 .route_id("test-route")
1667 .process(|ex| async move { Ok(ex) })
1668 .build()
1669 .unwrap();
1670
1671 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1672 }
1673
1674 #[test]
1675 fn test_builder_chain_multiple_steps() {
1676 let definition = RouteBuilder::from("timer:tick")
1677 .route_id("test-route")
1678 .set_header("source", Value::String("timer".into()))
1679 .filter(|ex| ex.input.header("source").is_some())
1680 .to("log:info")
1681 .end_filter()
1682 .to("mock:result")
1683 .build()
1684 .unwrap();
1685
1686 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1690 }
1691
1692 #[test]
1693 fn test_loop_count_builder() {
1694 use camel_api::loop_eip::LoopMode;
1695
1696 let def = RouteBuilder::from("direct:start")
1697 .route_id("loop-test")
1698 .loop_count(3)
1699 .to("mock:inside")
1700 .end_loop()
1701 .to("mock:after")
1702 .build()
1703 .unwrap();
1704
1705 assert_eq!(def.steps().len(), 2);
1706 match &def.steps()[0] {
1707 BuilderStep::Loop { config, steps } => {
1708 assert!(matches!(config.mode, LoopMode::Count(3)));
1709 assert_eq!(steps.len(), 1);
1710 }
1711 other => panic!("Expected Loop, got {:?}", other),
1712 }
1713 assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1714 }
1715
1716 #[test]
1717 fn test_loop_while_builder() {
1718 use camel_api::loop_eip::LoopMode;
1719
1720 let def = RouteBuilder::from("direct:start")
1721 .route_id("loop-while-test")
1722 .loop_while(|_ex| true)
1723 .to("mock:retry")
1724 .end_loop()
1725 .build()
1726 .unwrap();
1727
1728 assert_eq!(def.steps().len(), 1);
1729 match &def.steps()[0] {
1730 BuilderStep::Loop { config, steps } => {
1731 assert!(matches!(config.mode, LoopMode::While(_)));
1732 assert_eq!(steps.len(), 1);
1733 }
1734 other => panic!("Expected Loop, got {:?}", other),
1735 }
1736 }
1737
1738 #[test]
1739 fn test_nested_loop_builder() {
1740 use camel_api::loop_eip::LoopMode;
1741
1742 let def = RouteBuilder::from("direct:start")
1743 .route_id("nested-loop-test")
1744 .loop_count(2)
1745 .to("mock:outer")
1746 .loop_count(3)
1747 .to("mock:inner")
1748 .end_loop()
1749 .end_loop()
1750 .to("mock:after")
1751 .build()
1752 .unwrap();
1753
1754 assert_eq!(def.steps().len(), 2);
1755 match &def.steps()[0] {
1756 BuilderStep::Loop { steps, .. } => {
1757 assert_eq!(steps.len(), 2);
1758 match &steps[1] {
1759 BuilderStep::Loop {
1760 config,
1761 steps: inner_steps,
1762 } => {
1763 assert!(matches!(config.mode, LoopMode::Count(3)));
1764 assert_eq!(inner_steps.len(), 1);
1765 }
1766 other => panic!("Expected nested Loop, got {:?}", other),
1767 }
1768 }
1769 other => panic!("Expected outer Loop, got {:?}", other),
1770 }
1771 }
1772
1773 #[tokio::test]
1778 async fn test_set_header_processor_works() {
1779 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1780 let exchange = Exchange::new(Message::new("test"));
1781 let result = svc.call(exchange).await.unwrap();
1782 assert_eq!(
1783 result.input.header("greeting"),
1784 Some(&Value::String("hello".into()))
1785 );
1786 }
1787
1788 #[tokio::test]
1789 async fn test_filter_processor_passes() {
1790 use camel_api::BoxProcessorExt;
1791 use camel_processor::FilterService;
1792
1793 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1794 let mut svc =
1795 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1796 let exchange = Exchange::new(Message::new("pass"));
1797 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1798 assert_eq!(result.input.body.as_text(), Some("pass"));
1799 }
1800
1801 #[tokio::test]
1802 async fn test_filter_processor_blocks() {
1803 use camel_api::BoxProcessorExt;
1804 use camel_processor::FilterService;
1805
1806 let sub = BoxProcessor::from_fn(|_ex| {
1807 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1808 });
1809 let mut svc =
1810 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1811 let exchange = Exchange::new(Message::new("reject"));
1812 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1813 assert_eq!(result.input.body.as_text(), Some("reject"));
1814 }
1815
1816 #[tokio::test]
1817 async fn test_map_body_processor_works() {
1818 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1819 if let Some(text) = body.as_text() {
1820 Body::Text(text.to_uppercase())
1821 } else {
1822 body
1823 }
1824 });
1825 let exchange = Exchange::new(Message::new("hello"));
1826 let result = mapper.oneshot(exchange).await.unwrap();
1827 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1828 }
1829
1830 #[tokio::test]
1831 async fn test_process_custom_processor_works() {
1832 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1833 ex.set_property("custom", Value::Bool(true));
1834 Ok(ex)
1835 });
1836 let exchange = Exchange::new(Message::default());
1837 let result = processor.oneshot(exchange).await.unwrap();
1838 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1839 }
1840
1841 #[tokio::test]
1846 async fn test_compose_pipeline_runs_steps_in_order() {
1847 use camel_core::route::{CompiledStep, compose_pipeline};
1848
1849 let processors = vec![
1850 CompiledStep::Process {
1851 processor: BoxProcessor::new(SetHeader::new(
1852 IdentityProcessor,
1853 "step",
1854 Value::String("one".into()),
1855 )),
1856 body_contract: None,
1857 },
1858 CompiledStep::Process {
1859 processor: BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1860 if let Some(text) = body.as_text() {
1861 Body::Text(format!("{}-processed", text))
1862 } else {
1863 body
1864 }
1865 })),
1866 body_contract: None,
1867 },
1868 ];
1869
1870 let pipeline = compose_pipeline(processors);
1871 let exchange = Exchange::new(Message::new("hello"));
1872 let result = pipeline.oneshot(exchange).await.unwrap();
1873
1874 assert_eq!(
1875 result.input.header("step"),
1876 Some(&Value::String("one".into()))
1877 );
1878 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1879 }
1880
1881 #[tokio::test]
1882 async fn test_compose_pipeline_empty_is_identity() {
1883 use camel_core::route::compose_pipeline;
1884
1885 let pipeline = compose_pipeline(vec![]);
1886 let exchange = Exchange::new(Message::new("unchanged"));
1887 let result = pipeline.oneshot(exchange).await.unwrap();
1888 assert_eq!(result.input.body.as_text(), Some("unchanged"));
1889 }
1890
1891 #[test]
1896 fn test_builder_circuit_breaker_sets_config() {
1897 use camel_api::circuit_breaker::CircuitBreakerConfig;
1898
1899 let config = CircuitBreakerConfig::new().failure_threshold(5);
1900 let definition = RouteBuilder::from("timer:tick")
1901 .route_id("test-route")
1902 .circuit_breaker(config)
1903 .build()
1904 .unwrap();
1905
1906 let cb = definition
1907 .circuit_breaker_config()
1908 .expect("circuit breaker should be set");
1909 assert_eq!(cb.failure_threshold, 5);
1910 }
1911
1912 #[test]
1913 fn test_builder_circuit_breaker_with_error_handler() {
1914 use camel_api::circuit_breaker::CircuitBreakerConfig;
1915 use camel_api::error_handler::ErrorHandlerConfig;
1916
1917 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1918 let eh_config = ErrorHandlerConfig::log_only();
1919
1920 let definition = RouteBuilder::from("timer:tick")
1921 .route_id("test-route")
1922 .to("log:info")
1923 .circuit_breaker(cb_config)
1924 .error_handler(eh_config)
1925 .build()
1926 .unwrap();
1927
1928 assert!(
1929 definition.circuit_breaker_config().is_some(),
1930 "circuit breaker config should be set"
1931 );
1932 }
1934
1935 #[test]
1936 fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1937 let definition = RouteBuilder::from("direct:start")
1938 .route_id("test-route")
1939 .dead_letter_channel("log:dlc")
1940 .on_exception(|e| matches!(e, CamelError::Io(_)))
1941 .retry(3)
1942 .handled_by("log:io")
1943 .end_on_exception()
1944 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1945 .retry(1)
1946 .end_on_exception()
1947 .to("mock:out")
1948 .build()
1949 .expect("route should build");
1950
1951 let cfg = definition
1952 .error_handler_config()
1953 .expect("error handler should be set");
1954 assert_eq!(cfg.policies.len(), 2);
1955 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1956 assert_eq!(
1957 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1958 Some(3)
1959 );
1960 assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1961 assert_eq!(
1962 cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1963 Some(1)
1964 );
1965 }
1966
1967 #[test]
1968 fn test_builder_on_exception_mixed_mode_rejected() {
1969 let result = RouteBuilder::from("direct:start")
1970 .route_id("test-route")
1971 .error_handler(ErrorHandlerConfig::log_only())
1972 .on_exception(|_e| true)
1973 .end_on_exception()
1974 .to("mock:out")
1975 .build();
1976
1977 let err = result.err().expect("mixed mode should fail with an error");
1978
1979 assert!(
1980 format!("{err}").contains("mixed error handler modes"),
1981 "unexpected error: {err}"
1982 );
1983 }
1984
1985 #[test]
1986 fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1987 let definition = RouteBuilder::from("direct:start")
1988 .route_id("test-route")
1989 .on_exception(|_e| true)
1990 .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1991 .with_jitter(0.5)
1992 .end_on_exception()
1993 .to("mock:out")
1994 .build()
1995 .expect("route should build");
1996
1997 let cfg = definition
1998 .error_handler_config()
1999 .expect("error handler should be set");
2000 assert_eq!(cfg.policies.len(), 1);
2001 assert!(cfg.policies[0].retry.is_none());
2002 }
2003
2004 #[test]
2005 fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
2006 let definition = RouteBuilder::from("direct:start")
2007 .route_id("test-route")
2008 .dead_letter_channel("log:dlc")
2009 .to("mock:out")
2010 .build()
2011 .expect("route should build");
2012
2013 let cfg = definition
2014 .error_handler_config()
2015 .expect("error handler should be set");
2016 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
2017 assert!(cfg.policies.is_empty());
2018 }
2019
2020 #[test]
2021 fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
2022 let definition = RouteBuilder::from("direct:start")
2023 .route_id("test-route")
2024 .dead_letter_channel("log:first")
2025 .on_exception(|e| matches!(e, CamelError::Io(_)))
2026 .retry(2)
2027 .end_on_exception()
2028 .dead_letter_channel("log:second")
2029 .to("mock:out")
2030 .build()
2031 .expect("route should build");
2032
2033 let cfg = definition
2034 .error_handler_config()
2035 .expect("error handler should be set");
2036 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2037 assert_eq!(cfg.policies.len(), 1);
2038 assert_eq!(
2039 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
2040 Some(2)
2041 );
2042 }
2043
2044 #[test]
2045 fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
2046 let definition = RouteBuilder::from("direct:start")
2047 .route_id("test-route")
2048 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
2049 .retry(1)
2050 .end_on_exception()
2051 .to("mock:out")
2052 .build()
2053 .expect("route should build");
2054
2055 let cfg = definition
2056 .error_handler_config()
2057 .expect("error handler should be set");
2058 assert!(cfg.dlc_uri.is_none());
2059 assert_eq!(cfg.policies.len(), 1);
2060 }
2061
2062 #[test]
2063 fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
2064 let first = ErrorHandlerConfig::dead_letter_channel("log:first");
2065 let second = ErrorHandlerConfig::dead_letter_channel("log:second");
2066
2067 let definition = RouteBuilder::from("direct:start")
2068 .route_id("test-route")
2069 .error_handler(first)
2070 .error_handler(second)
2071 .to("mock:out")
2072 .build()
2073 .expect("route should build");
2074
2075 let cfg = definition
2076 .error_handler_config()
2077 .expect("error handler should be set");
2078 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2079 }
2080
2081 #[test]
2084 fn test_split_builder_typestate() {
2085 use camel_api::splitter::{SplitterConfig, split_body_lines};
2086
2087 let definition = RouteBuilder::from("timer:test?period=1000")
2089 .route_id("test-route")
2090 .split(SplitterConfig::new(split_body_lines()))
2091 .to("mock:per-fragment")
2092 .end_split()
2093 .to("mock:final")
2094 .build()
2095 .unwrap();
2096
2097 assert_eq!(definition.steps().len(), 2);
2099 }
2100
2101 #[test]
2102 fn test_split_builder_steps_collected() {
2103 use camel_api::splitter::{SplitterConfig, split_body_lines};
2104
2105 let definition = RouteBuilder::from("timer:test?period=1000")
2106 .route_id("test-route")
2107 .split(SplitterConfig::new(split_body_lines()))
2108 .set_header("fragment", Value::String("yes".into()))
2109 .to("mock:per-fragment")
2110 .end_split()
2111 .build()
2112 .unwrap();
2113
2114 assert_eq!(definition.steps().len(), 1);
2116 match &definition.steps()[0] {
2117 BuilderStep::Split { steps, .. } => {
2118 assert_eq!(steps.len(), 2); }
2120 other => panic!("Expected Split, got {:?}", other),
2121 }
2122 }
2123
2124 #[test]
2125 fn test_split_builder_config_propagated() {
2126 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
2127
2128 let definition = RouteBuilder::from("timer:test?period=1000")
2129 .route_id("test-route")
2130 .split(
2131 SplitterConfig::new(split_body_lines())
2132 .parallel(true)
2133 .parallel_limit(4)
2134 .aggregation(AggregationStrategy::CollectAll),
2135 )
2136 .to("mock:per-fragment")
2137 .end_split()
2138 .build()
2139 .unwrap();
2140
2141 match &definition.steps()[0] {
2142 BuilderStep::Split { config, .. } => {
2143 assert!(config.parallel);
2144 assert_eq!(config.parallel_limit, Some(4));
2145 assert!(matches!(
2146 config.aggregation,
2147 AggregationStrategy::CollectAll
2148 ));
2149 }
2150 other => panic!("Expected Split, got {:?}", other),
2151 }
2152 }
2153
2154 #[test]
2155 fn test_aggregate_builder_adds_step() {
2156 use camel_api::aggregator::AggregatorConfig;
2157 use camel_core::route::BuilderStep;
2158
2159 let definition = RouteBuilder::from("timer:tick")
2160 .route_id("test-route")
2161 .aggregate(
2162 AggregatorConfig::correlate_by("key")
2163 .complete_when_size(2)
2164 .build()
2165 .unwrap(),
2166 )
2167 .build()
2168 .unwrap();
2169
2170 assert_eq!(definition.steps().len(), 1);
2171 assert!(matches!(
2172 definition.steps()[0],
2173 BuilderStep::Aggregate { .. }
2174 ));
2175 }
2176
2177 #[test]
2178 fn test_aggregate_in_split_builder() {
2179 use camel_api::aggregator::AggregatorConfig;
2180 use camel_api::splitter::{SplitterConfig, split_body_lines};
2181 use camel_core::route::BuilderStep;
2182
2183 let definition = RouteBuilder::from("timer:tick")
2184 .route_id("test-route")
2185 .split(SplitterConfig::new(split_body_lines()))
2186 .aggregate(
2187 AggregatorConfig::correlate_by("key")
2188 .complete_when_size(1)
2189 .build()
2190 .unwrap(),
2191 )
2192 .end_split()
2193 .build()
2194 .unwrap();
2195
2196 assert_eq!(definition.steps().len(), 1);
2197 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2198 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2199 } else {
2200 panic!("expected Split step");
2201 }
2202 }
2203
2204 #[test]
2207 fn test_builder_set_body_static_adds_processor() {
2208 let definition = RouteBuilder::from("timer:tick")
2209 .route_id("test-route")
2210 .set_body("fixed")
2211 .build()
2212 .unwrap();
2213 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2214 }
2215
2216 #[test]
2217 fn test_builder_set_body_fn_adds_processor() {
2218 let definition = RouteBuilder::from("timer:tick")
2219 .route_id("test-route")
2220 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2221 .build()
2222 .unwrap();
2223 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2224 }
2225
2226 #[test]
2227 fn transform_alias_produces_same_as_set_body() {
2228 let route_transform = RouteBuilder::from("timer:tick")
2229 .route_id("test-route")
2230 .transform("hello")
2231 .build()
2232 .unwrap();
2233
2234 let route_set_body = RouteBuilder::from("timer:tick")
2235 .route_id("test-route")
2236 .set_body("hello")
2237 .build()
2238 .unwrap();
2239
2240 assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2241 }
2242
2243 #[test]
2244 fn test_builder_set_header_fn_adds_processor() {
2245 let definition = RouteBuilder::from("timer:tick")
2246 .route_id("test-route")
2247 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2248 .build()
2249 .unwrap();
2250 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2251 }
2252
2253 #[tokio::test]
2254 async fn test_set_body_static_processor_works() {
2255 use camel_core::route::{CompiledStep, compose_pipeline};
2256 let def = RouteBuilder::from("t:t")
2257 .route_id("test-route")
2258 .set_body("replaced")
2259 .build()
2260 .unwrap();
2261 let pipeline = compose_pipeline(
2262 def.steps()
2263 .iter()
2264 .filter_map(|s| {
2265 if let BuilderStep::Processor(p) = s {
2266 Some(p.clone())
2267 } else {
2268 None
2269 }
2270 })
2271 .map(|p| CompiledStep::Process {
2272 processor: p,
2273 body_contract: None,
2274 })
2275 .collect(),
2276 );
2277 let exchange = Exchange::new(Message::new("original"));
2278 let result = pipeline.oneshot(exchange).await.unwrap();
2279 assert_eq!(result.input.body.as_text(), Some("replaced"));
2280 }
2281
2282 #[tokio::test]
2283 async fn test_set_body_fn_processor_works() {
2284 use camel_core::route::{CompiledStep, compose_pipeline};
2285 let def = RouteBuilder::from("t:t")
2286 .route_id("test-route")
2287 .set_body_fn(|ex: &Exchange| {
2288 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2289 })
2290 .build()
2291 .unwrap();
2292 let pipeline = compose_pipeline(
2293 def.steps()
2294 .iter()
2295 .filter_map(|s| {
2296 if let BuilderStep::Processor(p) = s {
2297 Some(p.clone())
2298 } else {
2299 None
2300 }
2301 })
2302 .map(|p| CompiledStep::Process {
2303 processor: p,
2304 body_contract: None,
2305 })
2306 .collect(),
2307 );
2308 let exchange = Exchange::new(Message::new("hello"));
2309 let result = pipeline.oneshot(exchange).await.unwrap();
2310 assert_eq!(result.input.body.as_text(), Some("HELLO"));
2311 }
2312
2313 #[tokio::test]
2314 async fn test_set_header_fn_processor_works() {
2315 use camel_core::route::{CompiledStep, compose_pipeline};
2316 let def = RouteBuilder::from("t:t")
2317 .route_id("test-route")
2318 .set_header_fn("echo", |ex: &Exchange| {
2319 ex.input
2320 .body
2321 .as_text()
2322 .map(|t| Value::String(t.into()))
2323 .unwrap_or(Value::Null)
2324 })
2325 .build()
2326 .unwrap();
2327 let pipeline = compose_pipeline(
2328 def.steps()
2329 .iter()
2330 .filter_map(|s| {
2331 if let BuilderStep::Processor(p) = s {
2332 Some(p.clone())
2333 } else {
2334 None
2335 }
2336 })
2337 .map(|p| CompiledStep::Process {
2338 processor: p,
2339 body_contract: None,
2340 })
2341 .collect(),
2342 );
2343 let exchange = Exchange::new(Message::new("ping"));
2344 let result = pipeline.oneshot(exchange).await.unwrap();
2345 assert_eq!(
2346 result.input.header("echo"),
2347 Some(&Value::String("ping".into()))
2348 );
2349 }
2350
2351 #[test]
2354 fn test_filter_builder_typestate() {
2355 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2356 .route_id("test-route")
2357 .filter(|_ex| true)
2358 .to("mock:inner")
2359 .end_filter()
2360 .to("mock:outer")
2361 .build();
2362 assert!(result.is_ok());
2363 }
2364
2365 #[test]
2366 fn test_filter_builder_steps_collected() {
2367 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2368 .route_id("test-route")
2369 .filter(|_ex| true)
2370 .to("mock:inner")
2371 .end_filter()
2372 .build()
2373 .unwrap();
2374
2375 assert_eq!(definition.steps().len(), 1);
2376 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2377 }
2378
2379 #[test]
2380 fn test_wire_tap_builder_adds_step() {
2381 let definition = RouteBuilder::from("timer:tick")
2382 .route_id("test-route")
2383 .wire_tap("mock:tap")
2384 .to("mock:result")
2385 .build()
2386 .unwrap();
2387
2388 assert_eq!(definition.steps().len(), 2);
2389 assert!(
2390 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2391 );
2392 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2393 }
2394
2395 #[test]
2398 fn test_multicast_builder_typestate() {
2399 let definition = RouteBuilder::from("timer:tick")
2400 .route_id("test-route")
2401 .multicast()
2402 .to("direct:a")
2403 .to("direct:b")
2404 .end_multicast()
2405 .to("mock:result")
2406 .build()
2407 .unwrap();
2408
2409 assert_eq!(definition.steps().len(), 2); }
2411
2412 #[test]
2413 fn test_multicast_builder_steps_collected() {
2414 let definition = RouteBuilder::from("timer:tick")
2415 .route_id("test-route")
2416 .multicast()
2417 .to("direct:a")
2418 .to("direct:b")
2419 .end_multicast()
2420 .build()
2421 .unwrap();
2422
2423 match &definition.steps()[0] {
2424 BuilderStep::Multicast { steps, .. } => {
2425 assert_eq!(steps.len(), 2);
2426 }
2427 other => panic!("Expected Multicast, got {:?}", other),
2428 }
2429 }
2430
2431 #[test]
2434 fn test_builder_concurrent_sets_concurrency() {
2435 use camel_component_api::ConcurrencyModel;
2436
2437 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2438 .route_id("test-route")
2439 .concurrent(16)
2440 .to("log:info")
2441 .build()
2442 .unwrap();
2443
2444 assert_eq!(
2445 definition.concurrency_override(),
2446 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2447 );
2448 }
2449
2450 #[test]
2451 fn test_builder_concurrent_zero_means_unbounded() {
2452 use camel_component_api::ConcurrencyModel;
2453
2454 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2455 .route_id("test-route")
2456 .concurrent(0)
2457 .to("log:info")
2458 .build()
2459 .unwrap();
2460
2461 assert_eq!(
2462 definition.concurrency_override(),
2463 Some(&ConcurrencyModel::Concurrent { max: None })
2464 );
2465 }
2466
2467 #[test]
2468 fn test_builder_sequential_sets_concurrency() {
2469 use camel_component_api::ConcurrencyModel;
2470
2471 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2472 .route_id("test-route")
2473 .sequential()
2474 .to("log:info")
2475 .build()
2476 .unwrap();
2477
2478 assert_eq!(
2479 definition.concurrency_override(),
2480 Some(&ConcurrencyModel::Sequential)
2481 );
2482 }
2483
2484 #[test]
2485 fn test_builder_default_concurrency_is_none() {
2486 let definition = RouteBuilder::from("timer:tick")
2487 .route_id("test-route")
2488 .to("log:info")
2489 .build()
2490 .unwrap();
2491
2492 assert_eq!(definition.concurrency_override(), None);
2493 }
2494
2495 #[test]
2498 fn test_builder_route_id_sets_id() {
2499 let definition = RouteBuilder::from("timer:tick")
2500 .route_id("my-route")
2501 .build()
2502 .unwrap();
2503
2504 assert_eq!(definition.route_id(), "my-route");
2505 }
2506
2507 #[test]
2508 fn test_build_without_route_id_fails() {
2509 let result = RouteBuilder::from("timer:tick?period=1000")
2510 .to("log:info")
2511 .build();
2512 let err = match result {
2513 Err(e) => e.to_string(),
2514 Ok(_) => panic!("build() should fail without route_id"),
2515 };
2516 assert!(
2517 err.contains("route_id"),
2518 "error should mention route_id, got: {}",
2519 err
2520 );
2521 }
2522
2523 #[test]
2524 fn test_builder_empty_route_id_rejected() {
2525 let result = RouteBuilder::from("timer:tick").route_id("").build();
2526 let err = result.err().expect("empty route_id should be rejected");
2527 assert!(matches!(err, CamelError::RouteError(_)));
2528 }
2529
2530 #[test]
2531 fn test_builder_whitespace_route_id_rejected() {
2532 let result = RouteBuilder::from("timer:tick").route_id(" ").build();
2533 assert!(result.is_err());
2534 }
2535
2536 #[test]
2537 fn test_builder_auto_startup_false() {
2538 let definition = RouteBuilder::from("timer:tick")
2539 .route_id("test-route")
2540 .auto_startup(false)
2541 .build()
2542 .unwrap();
2543
2544 assert!(!definition.auto_startup());
2545 }
2546
2547 #[test]
2548 fn test_builder_startup_order_custom() {
2549 let definition = RouteBuilder::from("timer:tick")
2550 .route_id("test-route")
2551 .startup_order(50)
2552 .build()
2553 .unwrap();
2554
2555 assert_eq!(definition.startup_order(), 50);
2556 }
2557
2558 #[test]
2559 fn test_builder_defaults() {
2560 let definition = RouteBuilder::from("timer:tick")
2561 .route_id("test-route")
2562 .build()
2563 .unwrap();
2564
2565 assert_eq!(definition.route_id(), "test-route");
2566 assert!(definition.auto_startup());
2567 assert_eq!(definition.startup_order(), 1000);
2568 }
2569
2570 #[test]
2573 fn test_choice_builder_single_when() {
2574 let definition = RouteBuilder::from("timer:tick")
2575 .route_id("test-route")
2576 .choice()
2577 .when(|ex: &Exchange| ex.input.header("type").is_some())
2578 .to("mock:typed")
2579 .end_when()
2580 .end_choice()
2581 .build()
2582 .unwrap();
2583 assert_eq!(definition.steps().len(), 1);
2584 assert!(
2585 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2586 if whens.len() == 1 && otherwise.is_none())
2587 );
2588 }
2589
2590 #[test]
2591 fn test_choice_builder_when_otherwise() {
2592 let definition = RouteBuilder::from("timer:tick")
2593 .route_id("test-route")
2594 .choice()
2595 .when(|ex: &Exchange| ex.input.header("a").is_some())
2596 .to("mock:a")
2597 .end_when()
2598 .otherwise()
2599 .to("mock:fallback")
2600 .end_otherwise()
2601 .end_choice()
2602 .build()
2603 .unwrap();
2604 assert!(
2605 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2606 if whens.len() == 1 && otherwise.is_some())
2607 );
2608 }
2609
2610 #[test]
2611 fn test_choice_builder_multiple_whens() {
2612 let definition = RouteBuilder::from("timer:tick")
2613 .route_id("test-route")
2614 .choice()
2615 .when(|ex: &Exchange| ex.input.header("a").is_some())
2616 .to("mock:a")
2617 .end_when()
2618 .when(|ex: &Exchange| ex.input.header("b").is_some())
2619 .to("mock:b")
2620 .end_when()
2621 .end_choice()
2622 .build()
2623 .unwrap();
2624 assert!(
2625 matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2626 if whens.len() == 2)
2627 );
2628 }
2629
2630 #[test]
2631 fn test_choice_step_after_choice() {
2632 let definition = RouteBuilder::from("timer:tick")
2634 .route_id("test-route")
2635 .choice()
2636 .when(|_ex: &Exchange| true)
2637 .to("mock:inner")
2638 .end_when()
2639 .end_choice()
2640 .to("mock:outer") .build()
2642 .unwrap();
2643 assert_eq!(definition.steps().len(), 2);
2644 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2645 }
2646
2647 #[test]
2650 fn test_throttle_builder_typestate() {
2651 let definition = RouteBuilder::from("timer:tick")
2652 .route_id("test-route")
2653 .throttle(10, std::time::Duration::from_secs(1))
2654 .to("mock:result")
2655 .end_throttle()
2656 .build()
2657 .unwrap();
2658
2659 assert_eq!(definition.steps().len(), 1);
2660 assert!(matches!(
2661 &definition.steps()[0],
2662 BuilderStep::Throttle { .. }
2663 ));
2664 }
2665
2666 #[test]
2667 fn test_throttle_builder_with_strategy() {
2668 let definition = RouteBuilder::from("timer:tick")
2669 .route_id("test-route")
2670 .throttle(10, std::time::Duration::from_secs(1))
2671 .strategy(ThrottleStrategy::Reject)
2672 .to("mock:result")
2673 .end_throttle()
2674 .build()
2675 .unwrap();
2676
2677 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2678 assert_eq!(config.strategy, ThrottleStrategy::Reject);
2679 } else {
2680 panic!("Expected Throttle step");
2681 }
2682 }
2683
2684 #[test]
2685 fn test_throttle_builder_steps_collected() {
2686 let definition = RouteBuilder::from("timer:tick")
2687 .route_id("test-route")
2688 .throttle(5, std::time::Duration::from_secs(1))
2689 .set_header("throttled", Value::Bool(true))
2690 .to("mock:throttled")
2691 .end_throttle()
2692 .build()
2693 .unwrap();
2694
2695 match &definition.steps()[0] {
2696 BuilderStep::Throttle { steps, .. } => {
2697 assert_eq!(steps.len(), 2); }
2699 other => panic!("Expected Throttle, got {:?}", other),
2700 }
2701 }
2702
2703 #[test]
2704 fn test_throttle_step_after_throttle() {
2705 let definition = RouteBuilder::from("timer:tick")
2707 .route_id("test-route")
2708 .throttle(10, std::time::Duration::from_secs(1))
2709 .to("mock:inner")
2710 .end_throttle()
2711 .to("mock:outer")
2712 .build()
2713 .unwrap();
2714
2715 assert_eq!(definition.steps().len(), 2);
2716 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2717 }
2718
2719 #[test]
2722 fn test_load_balance_builder_typestate() {
2723 let definition = RouteBuilder::from("timer:tick")
2724 .route_id("test-route")
2725 .load_balance()
2726 .round_robin()
2727 .to("mock:a")
2728 .to("mock:b")
2729 .end_load_balance()
2730 .build()
2731 .unwrap();
2732
2733 assert_eq!(definition.steps().len(), 1);
2734 assert!(matches!(
2735 &definition.steps()[0],
2736 BuilderStep::LoadBalance { .. }
2737 ));
2738 }
2739
2740 #[test]
2741 fn test_load_balance_builder_with_strategy() {
2742 let definition = RouteBuilder::from("timer:tick")
2743 .route_id("test-route")
2744 .load_balance()
2745 .random()
2746 .to("mock:result")
2747 .end_load_balance()
2748 .build()
2749 .unwrap();
2750
2751 if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2752 assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2753 } else {
2754 panic!("Expected LoadBalance step");
2755 }
2756 }
2757
2758 #[test]
2759 fn test_load_balance_builder_steps_collected() {
2760 let definition = RouteBuilder::from("timer:tick")
2761 .route_id("test-route")
2762 .load_balance()
2763 .set_header("lb", Value::Bool(true))
2764 .to("mock:a")
2765 .end_load_balance()
2766 .build()
2767 .unwrap();
2768
2769 match &definition.steps()[0] {
2770 BuilderStep::LoadBalance { steps, .. } => {
2771 assert_eq!(steps.len(), 2); }
2773 other => panic!("Expected LoadBalance, got {:?}", other),
2774 }
2775 }
2776
2777 #[test]
2778 fn test_load_balance_step_after_load_balance() {
2779 let definition = RouteBuilder::from("timer:tick")
2781 .route_id("test-route")
2782 .load_balance()
2783 .to("mock:inner")
2784 .end_load_balance()
2785 .to("mock:outer")
2786 .build()
2787 .unwrap();
2788
2789 assert_eq!(definition.steps().len(), 2);
2790 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2791 }
2792
2793 #[test]
2796 fn test_dynamic_router_builder() {
2797 let definition = RouteBuilder::from("timer:tick")
2798 .route_id("test-route")
2799 .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2800 .build()
2801 .unwrap();
2802
2803 assert_eq!(definition.steps().len(), 1);
2804 assert!(matches!(
2805 &definition.steps()[0],
2806 BuilderStep::DynamicRouter { .. }
2807 ));
2808 }
2809
2810 #[test]
2811 fn test_dynamic_router_builder_with_config() {
2812 let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2813 .max_iterations(100)
2814 .cache_size(500);
2815
2816 let definition = RouteBuilder::from("timer:tick")
2817 .route_id("test-route")
2818 .dynamic_router_with_config(config)
2819 .build()
2820 .unwrap();
2821
2822 assert_eq!(definition.steps().len(), 1);
2823 if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2824 assert_eq!(config.max_iterations, 100);
2825 assert_eq!(config.cache_size, 500);
2826 } else {
2827 panic!("Expected DynamicRouter step");
2828 }
2829 }
2830
2831 #[test]
2832 fn test_dynamic_router_step_after_router() {
2833 let definition = RouteBuilder::from("timer:tick")
2835 .route_id("test-route")
2836 .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2837 .to("mock:outer")
2838 .build()
2839 .unwrap();
2840
2841 assert_eq!(definition.steps().len(), 2);
2842 assert!(matches!(
2843 &definition.steps()[0],
2844 BuilderStep::DynamicRouter { .. }
2845 ));
2846 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2847 }
2848
2849 #[test]
2850 fn routing_slip_builder_creates_step() {
2851 use camel_api::RoutingSlipExpression;
2852
2853 let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2854
2855 let route = RouteBuilder::from("direct:start")
2856 .route_id("routing-slip-test")
2857 .routing_slip(expression)
2858 .build()
2859 .unwrap();
2860
2861 assert!(
2862 matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2863 "Expected RoutingSlip step"
2864 );
2865 }
2866
2867 #[test]
2868 fn routing_slip_with_config_builder_creates_step() {
2869 use camel_api::RoutingSlipConfig;
2870
2871 let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2872 .uri_delimiter("|")
2873 .cache_size(50)
2874 .ignore_invalid_endpoints(true);
2875
2876 let route = RouteBuilder::from("direct:start")
2877 .route_id("routing-slip-config-test")
2878 .routing_slip_with_config(config)
2879 .build()
2880 .unwrap();
2881
2882 if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2883 assert_eq!(config.uri_delimiter, "|");
2884 assert_eq!(config.cache_size, 50);
2885 assert!(config.ignore_invalid_endpoints);
2886 } else {
2887 panic!("Expected RoutingSlip step");
2888 }
2889 }
2890
2891 #[test]
2892 fn test_builder_marshal_adds_processor_step() {
2893 let definition = RouteBuilder::from("timer:tick")
2894 .route_id("test-route")
2895 .marshal("json")
2896 .unwrap()
2897 .build()
2898 .unwrap();
2899 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2900 }
2901
2902 #[test]
2903 fn test_builder_unmarshal_adds_processor_step() {
2904 let definition = RouteBuilder::from("timer:tick")
2905 .route_id("test-route")
2906 .unmarshal("json")
2907 .unwrap()
2908 .build()
2909 .unwrap();
2910 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2911 }
2912
2913 #[test]
2914 fn test_builder_stream_cache_adds_processor_step() {
2915 let definition = RouteBuilder::from("timer:tick")
2916 .route_id("test-route")
2917 .stream_cache(1024)
2918 .build()
2919 .unwrap();
2920 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2921 }
2922
2923 #[test]
2924 fn validate_adds_to_step_with_validator_uri() {
2925 let def = RouteBuilder::from("direct:in")
2926 .route_id("test")
2927 .validate("schemas/order.xsd")
2928 .build()
2929 .unwrap();
2930 let steps = def.steps();
2931 assert_eq!(steps.len(), 1);
2932 assert!(
2933 matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2934 "got: {:?}",
2935 steps[0]
2936 );
2937 }
2938
2939 #[test]
2940 fn test_builder_marshal_returns_err_for_unknown_format() {
2941 let result = RouteBuilder::from("timer:tick")
2942 .route_id("test-route")
2943 .marshal("csv");
2944 let err = match result {
2945 Err(e) => e,
2946 Ok(_) => panic!("marshal with unknown format should return Err"),
2947 };
2948 let msg = err.to_string();
2949 assert!(
2950 msg.contains("unknown data format"),
2951 "error should mention unknown format, got: {msg}"
2952 );
2953 assert!(
2954 msg.contains("csv"),
2955 "error should mention format name, got: {msg}"
2956 );
2957 }
2958
2959 #[test]
2960 fn test_builder_unmarshal_returns_err_for_unknown_format() {
2961 let result = RouteBuilder::from("timer:tick")
2962 .route_id("test-route")
2963 .unmarshal("csv");
2964 let err = match result {
2965 Err(e) => e,
2966 Ok(_) => panic!("unmarshal with unknown format should return Err"),
2967 };
2968 let msg = err.to_string();
2969 assert!(
2970 msg.contains("unknown data format"),
2971 "error should mention unknown format, got: {msg}"
2972 );
2973 assert!(
2974 msg.contains("csv"),
2975 "error should mention format name, got: {msg}"
2976 );
2977 }
2978
2979 #[test]
2980 fn test_builder_recipient_list_creates_step() {
2981 let route = RouteBuilder::from("direct:start")
2982 .route_id("recipient-list-test")
2983 .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2984 .build()
2985 .unwrap();
2986
2987 assert!(matches!(
2988 &route.steps()[0],
2989 BuilderStep::RecipientList { .. }
2990 ));
2991 }
2992
2993 #[test]
2994 fn test_builder_recipient_list_with_config_creates_step() {
2995 let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2996
2997 let route = RouteBuilder::from("direct:start")
2998 .route_id("recipient-list-config-test")
2999 .recipient_list_with_config(config)
3000 .build()
3001 .unwrap();
3002
3003 assert!(matches!(
3004 &route.steps()[0],
3005 BuilderStep::RecipientList { .. }
3006 ));
3007 }
3008
3009 #[test]
3010 fn test_builder_script_adds_script_step() {
3011 let route = RouteBuilder::from("direct:start")
3012 .route_id("script-test")
3013 .script("rhai", "headers[\"x\"] = \"y\"")
3014 .build()
3015 .unwrap();
3016
3017 assert!(matches!(
3018 &route.steps()[0],
3019 BuilderStep::Script { language, script }
3020 if language == "rhai" && script == "headers[\"x\"] = \"y\""
3021 ));
3022 }
3023
3024 #[test]
3025 fn test_builder_delay_and_delay_with_header_add_steps() {
3026 let route = RouteBuilder::from("direct:start")
3027 .route_id("delay-test")
3028 .delay(Duration::from_millis(250))
3029 .delay_with_header(Duration::from_millis(500), "x-delay")
3030 .build()
3031 .unwrap();
3032
3033 assert_eq!(route.steps().len(), 2);
3034 assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
3035 assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
3036 }
3037
3038 #[test]
3039 fn test_builder_log_and_stop_add_steps_in_order() {
3040 let route = RouteBuilder::from("direct:start")
3041 .route_id("log-stop-test")
3042 .log("hello", LogLevel::Info)
3043 .stop()
3044 .to("mock:after")
3045 .build()
3046 .unwrap();
3047
3048 assert_eq!(route.steps().len(), 3);
3049 assert!(matches!(
3050 &route.steps()[0],
3051 BuilderStep::Log { message, .. } if message == "hello"
3052 ));
3053 assert!(matches!(&route.steps()[1], BuilderStep::Stop));
3054 assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
3055 }
3056
3057 #[test]
3058 fn test_builder_stream_cache_default_adds_processor_step() {
3059 let route = RouteBuilder::from("direct:start")
3060 .route_id("stream-cache-default-test")
3061 .stream_cache_default()
3062 .build()
3063 .unwrap();
3064
3065 assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
3066 }
3067
3068 #[test]
3069 fn test_validate_preserves_existing_validator_prefix() {
3070 let route = RouteBuilder::from("direct:in")
3071 .route_id("validate-prefix-test")
3072 .validate("validator:schemas/order.xsd")
3073 .build()
3074 .unwrap();
3075
3076 assert!(matches!(
3077 &route.steps()[0],
3078 BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
3079 ));
3080 }
3081
3082 #[test]
3083 fn test_load_balance_builder_weighted_failover_parallel_config() {
3084 let route = RouteBuilder::from("direct:start")
3085 .route_id("lb-weighted-failover-parallel")
3086 .load_balance()
3087 .weighted(vec![
3088 ("direct:a".to_string(), 3),
3089 ("direct:b".to_string(), 1),
3090 ])
3091 .failover()
3092 .parallel(true)
3093 .to("mock:result")
3094 .end_load_balance()
3095 .build()
3096 .unwrap();
3097
3098 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3099 assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3100 assert!(config.parallel);
3101 } else {
3102 panic!("Expected LoadBalance step");
3103 }
3104 }
3105
3106 #[test]
3107 fn test_multicast_builder_all_config_setters() {
3108 let route = RouteBuilder::from("direct:start")
3109 .route_id("multicast-config-test")
3110 .multicast()
3111 .parallel(true)
3112 .parallel_limit(4)
3113 .stop_on_exception(true)
3114 .timeout(Duration::from_millis(300))
3115 .aggregation(MulticastStrategy::Original)
3116 .to("mock:a")
3117 .end_multicast()
3118 .build()
3119 .unwrap();
3120
3121 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3122 assert!(config.parallel);
3123 assert_eq!(config.parallel_limit, Some(4));
3124 assert!(config.stop_on_exception);
3125 assert_eq!(config.timeout, Some(Duration::from_millis(300)));
3126 assert!(matches!(config.aggregation, MulticastStrategy::Original));
3127 } else {
3128 panic!("Expected Multicast step");
3129 }
3130 }
3131
3132 #[test]
3133 fn test_build_canonical_rejects_unsupported_processor_step() {
3134 let err = RouteBuilder::from("direct:start")
3135 .route_id("canonical-reject")
3136 .set_header("k", Value::String("v".into()))
3137 .build_canonical()
3138 .unwrap_err();
3139
3140 assert!(format!("{err}").contains("does not support step `processor`"));
3141 }
3142
3143 #[test]
3146 fn test_load_balance_builder_weighted_strategy() {
3147 let route = RouteBuilder::from("direct:start")
3148 .route_id("lb-weighted")
3149 .load_balance()
3150 .weighted(vec![
3151 ("direct:a".to_string(), 5),
3152 ("direct:b".to_string(), 2),
3153 ("direct:c".to_string(), 1),
3154 ])
3155 .to("mock:result")
3156 .end_load_balance()
3157 .build()
3158 .unwrap();
3159
3160 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3161 assert!(matches!(config.strategy, LoadBalanceStrategy::Weighted(_)));
3162 } else {
3163 panic!("Expected LoadBalance step");
3164 }
3165 }
3166
3167 #[test]
3168 fn test_load_balance_builder_failover_strategy() {
3169 let route = RouteBuilder::from("direct:start")
3170 .route_id("lb-failover")
3171 .load_balance()
3172 .failover()
3173 .to("mock:primary")
3174 .end_load_balance()
3175 .build()
3176 .unwrap();
3177
3178 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3179 assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3180 assert!(!config.parallel);
3181 } else {
3182 panic!("Expected LoadBalance step");
3183 }
3184 }
3185
3186 #[test]
3187 fn test_load_balance_builder_parallel_false_explicit() {
3188 let route = RouteBuilder::from("direct:start")
3189 .route_id("lb-parallel-false")
3190 .load_balance()
3191 .round_robin()
3192 .parallel(false)
3193 .to("mock:result")
3194 .end_load_balance()
3195 .build()
3196 .unwrap();
3197
3198 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3199 assert!(!config.parallel);
3200 } else {
3201 panic!("Expected LoadBalance step");
3202 }
3203 }
3204
3205 #[test]
3208 fn test_filter_in_split_builder_typestate() {
3209 use camel_api::splitter::{SplitterConfig, split_body_lines};
3210
3211 let definition = RouteBuilder::from("timer:test")
3212 .route_id("filter-in-split")
3213 .split(SplitterConfig::new(split_body_lines()))
3214 .filter(|_ex| true)
3215 .to("mock:filtered")
3216 .end_filter()
3217 .end_split()
3218 .build()
3219 .unwrap();
3220
3221 assert_eq!(definition.steps().len(), 1);
3222 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3223 assert_eq!(steps.len(), 1);
3224 assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3225 } else {
3226 panic!("Expected Split step");
3227 }
3228 }
3229
3230 #[test]
3231 fn test_filter_in_split_builder_multiple_steps() {
3232 use camel_api::splitter::{SplitterConfig, split_body_lines};
3233
3234 let definition = RouteBuilder::from("timer:test")
3235 .route_id("filter-in-split-multi")
3236 .split(SplitterConfig::new(split_body_lines()))
3237 .to("mock:before-filter")
3238 .filter(|_ex| true)
3239 .to("mock:inside-filter")
3240 .end_filter()
3241 .to("mock:after-filter")
3242 .end_split()
3243 .build()
3244 .unwrap();
3245
3246 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3247 assert_eq!(steps.len(), 3);
3249 } else {
3250 panic!("Expected Split step");
3251 }
3252 }
3253
3254 #[test]
3257 fn test_build_canonical_with_circuit_breaker() {
3258 use camel_api::circuit_breaker::CircuitBreakerConfig;
3259
3260 let spec = RouteBuilder::from("direct:start")
3261 .route_id("canonical-cb")
3262 .circuit_breaker(CircuitBreakerConfig::new().failure_threshold(10))
3263 .to("mock:result")
3264 .build_canonical()
3265 .unwrap();
3266
3267 let cb = spec.circuit_breaker.expect("circuit breaker should be set");
3268 assert_eq!(cb.failure_threshold, 10);
3269 }
3270
3271 #[test]
3272 fn test_build_canonical_rejects_custom_split_aggregation() {
3273 use camel_api::splitter::{SplitterConfig, split_body_lines};
3274
3275 let err = RouteBuilder::from("direct:start")
3276 .route_id("canonical-custom-split")
3277 .split(SplitterConfig::new(split_body_lines()).aggregation(
3278 camel_api::splitter::AggregationStrategy::Custom(Arc::new(|_, ex| ex)),
3279 ))
3280 .to("mock:frag")
3281 .end_split()
3282 .build_canonical()
3283 .unwrap_err();
3284
3285 assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3287 }
3288
3289 #[test]
3290 fn test_build_canonical_rejects_custom_aggregate_strategy() {
3291 let err = RouteBuilder::from("direct:start")
3292 .route_id("canonical-custom-agg")
3293 .aggregate(
3294 AggregatorConfig::correlate_by("key")
3295 .complete_when_size(2)
3296 .strategy(AggregationStrategy::Custom(Arc::new(|_, ex| ex)))
3297 .build()
3298 .unwrap(),
3299 )
3300 .build_canonical()
3301 .unwrap_err();
3302
3303 assert!(format!("{err}").contains("custom aggregate strategy"));
3304 }
3305
3306 #[test]
3307 fn test_build_canonical_rejects_fn_correlation_strategy() {
3308 let err = RouteBuilder::from("direct:start")
3309 .route_id("canonical-fn-corr")
3310 .aggregate(AggregatorConfig {
3311 header_name: "key".to_string(),
3312 completion: CompletionMode::Single(CompletionCondition::Size(1)),
3313 correlation: CorrelationStrategy::Fn(Arc::new(|_| Some("key".to_string()))),
3314 strategy: AggregationStrategy::CollectAll,
3315 max_buckets: None,
3316 bucket_ttl: None,
3317 force_completion_on_stop: false,
3318 discard_on_timeout: false,
3319 })
3320 .build_canonical()
3321 .unwrap_err();
3322
3323 assert!(format!("{err}").contains("Fn correlation strategy"));
3324 }
3325
3326 #[test]
3327 fn test_build_canonical_rejects_predicate_completion() {
3328 let err = RouteBuilder::from("direct:start")
3329 .route_id("canonical-pred-completion")
3330 .aggregate(AggregatorConfig {
3331 header_name: "key".to_string(),
3332 completion: CompletionMode::Single(CompletionCondition::Predicate(Arc::new(
3333 |_| false,
3334 ))),
3335 correlation: CorrelationStrategy::HeaderName("key".to_string()),
3336 strategy: AggregationStrategy::CollectAll,
3337 max_buckets: None,
3338 bucket_ttl: None,
3339 force_completion_on_stop: false,
3340 discard_on_timeout: false,
3341 })
3342 .build_canonical()
3343 .unwrap_err();
3344
3345 assert!(format!("{err}").contains("predicate completion"));
3346 }
3347
3348 #[test]
3349 fn test_build_canonical_with_expression_correlation() {
3350 let spec = RouteBuilder::from("direct:start")
3351 .route_id("canonical-expr-corr")
3352 .aggregate(AggregatorConfig {
3353 header_name: "key".to_string(),
3354 completion: CompletionMode::Single(CompletionCondition::Size(1)),
3355 correlation: CorrelationStrategy::Expression {
3356 expr: "header.key".to_string(),
3357 language: "simple".to_string(),
3358 },
3359 strategy: AggregationStrategy::CollectAll,
3360 max_buckets: None,
3361 bucket_ttl: None,
3362 force_completion_on_stop: false,
3363 discard_on_timeout: false,
3364 })
3365 .build_canonical()
3366 .unwrap();
3367
3368 assert!(spec.steps.iter().any(|s| matches!(s, CanonicalStepSpec::Aggregate(a) if a.correlation_key == Some("header.key".to_string()))));
3369 }
3370
3371 #[test]
3372 fn test_build_canonical_split_rejected_with_closure_expression() {
3373 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
3374
3375 let err = RouteBuilder::from("direct:start")
3377 .route_id("canonical-split-last")
3378 .split(
3379 SplitterConfig::new(split_body_lines()).aggregation(AggregationStrategy::LastWins),
3380 )
3381 .to("mock:frag")
3382 .end_split()
3383 .build_canonical()
3384 .unwrap_err();
3385
3386 assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3387 }
3388
3389 #[test]
3392 fn test_on_exception_full_chain_retry_backoff_jitter_handled_by() {
3393 let definition = RouteBuilder::from("direct:start")
3394 .route_id("on-exception-full")
3395 .dead_letter_channel("log:dlc")
3396 .on_exception(|e| matches!(e, CamelError::Io(_)))
3397 .retry(5)
3398 .with_backoff(Duration::from_millis(10), 2.0, Duration::from_millis(500))
3399 .with_jitter(0.3)
3400 .handled_by("log:io-handler")
3401 .end_on_exception()
3402 .to("mock:out")
3403 .build()
3404 .unwrap();
3405
3406 let cfg = definition
3407 .error_handler_config()
3408 .expect("error handler should be set");
3409 assert_eq!(cfg.policies.len(), 1);
3410 let policy = &cfg.policies[0];
3411 let retry = policy.retry.as_ref().expect("retry should be set");
3412 assert_eq!(retry.max_attempts, 5);
3413 assert_eq!(retry.initial_delay, Duration::from_millis(10));
3414 assert_eq!(retry.multiplier, 2.0);
3415 assert_eq!(retry.max_delay, Duration::from_millis(500));
3416 assert!((retry.jitter_factor - 0.3).abs() < f64::EPSILON);
3417 assert_eq!(policy.handled_by.as_deref(), Some("log:io-handler"));
3418 }
3419
3420 #[test]
3421 fn test_on_exception_jitter_clamped_to_valid_range() {
3422 let definition = RouteBuilder::from("direct:start")
3423 .route_id("jitter-clamp")
3424 .on_exception(|_e| true)
3425 .retry(1)
3426 .with_jitter(5.0)
3427 .end_on_exception()
3428 .to("mock:out")
3429 .build()
3430 .unwrap();
3431
3432 let cfg = definition.error_handler_config().unwrap();
3433 let retry = cfg.policies[0].retry.as_ref().unwrap();
3434 assert!((retry.jitter_factor - 1.0).abs() < f64::EPSILON);
3435 }
3436
3437 #[test]
3440 fn test_builder_process_fn_adds_processor_step() {
3441 use camel_api::BoxProcessorExt;
3442 let processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
3443 let definition = RouteBuilder::from("timer:tick")
3444 .route_id("process-fn-test")
3445 .process_fn(processor)
3446 .build()
3447 .unwrap();
3448
3449 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3450 }
3451
3452 #[test]
3453 fn test_builder_convert_body_to_adds_processor_step() {
3454 let definition = RouteBuilder::from("timer:tick")
3455 .route_id("convert-body-test")
3456 .convert_body_to(BodyType::Json)
3457 .build()
3458 .unwrap();
3459
3460 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3461 }
3462
3463 #[test]
3464 fn test_builder_bean_adds_bean_step() {
3465 let definition = RouteBuilder::from("timer:tick")
3466 .route_id("bean-test")
3467 .bean("myBean", "process")
3468 .build()
3469 .unwrap();
3470
3471 assert!(
3472 matches!(&definition.steps()[0], BuilderStep::Bean { name, method }
3473 if name == "myBean" && method == "process")
3474 );
3475 }
3476
3477 #[test]
3480 fn test_throttle_builder_delay_strategy() {
3481 let definition = RouteBuilder::from("timer:tick")
3482 .route_id("throttle-delay")
3483 .throttle(10, Duration::from_secs(1))
3484 .strategy(ThrottleStrategy::Delay)
3485 .to("mock:result")
3486 .end_throttle()
3487 .build()
3488 .unwrap();
3489
3490 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3491 assert_eq!(config.strategy, ThrottleStrategy::Delay);
3492 } else {
3493 panic!("Expected Throttle step");
3494 }
3495 }
3496
3497 #[test]
3498 fn test_throttle_builder_drop_strategy() {
3499 let definition = RouteBuilder::from("timer:tick")
3500 .route_id("throttle-drop")
3501 .throttle(10, Duration::from_secs(1))
3502 .strategy(ThrottleStrategy::Drop)
3503 .to("mock:result")
3504 .end_throttle()
3505 .build()
3506 .unwrap();
3507
3508 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3509 assert_eq!(config.strategy, ThrottleStrategy::Drop);
3510 } else {
3511 panic!("Expected Throttle step");
3512 }
3513 }
3514
3515 #[test]
3518 fn test_nested_loop_while_builder() {
3519 use camel_api::loop_eip::LoopMode;
3520
3521 let def = RouteBuilder::from("direct:start")
3522 .route_id("nested-loop-while")
3523 .loop_count(2)
3524 .to("mock:outer")
3525 .loop_while(|_ex| true)
3526 .to("mock:inner")
3527 .end_loop()
3528 .end_loop()
3529 .build()
3530 .unwrap();
3531
3532 assert_eq!(def.steps().len(), 1);
3533 if let BuilderStep::Loop { steps, .. } = &def.steps()[0] {
3534 assert_eq!(steps.len(), 2);
3535 if let BuilderStep::Loop { config, .. } = &steps[1] {
3536 assert!(matches!(config.mode, LoopMode::While(_)));
3537 } else {
3538 panic!("Expected inner Loop step");
3539 }
3540 } else {
3541 panic!("Expected outer Loop step");
3542 }
3543 }
3544
3545 #[test]
3548 fn test_choice_builder_multiple_whens_with_otherwise() {
3549 let definition = RouteBuilder::from("timer:tick")
3550 .route_id("choice-multi-otherwise")
3551 .choice()
3552 .when(|ex: &Exchange| ex.input.header("a").is_some())
3553 .to("mock:a")
3554 .end_when()
3555 .when(|ex: &Exchange| ex.input.header("b").is_some())
3556 .to("mock:b")
3557 .end_when()
3558 .when(|ex: &Exchange| ex.input.header("c").is_some())
3559 .to("mock:c")
3560 .end_when()
3561 .otherwise()
3562 .to("mock:fallback")
3563 .end_otherwise()
3564 .end_choice()
3565 .build()
3566 .unwrap();
3567
3568 if let BuilderStep::Choice { whens, otherwise } = &definition.steps()[0] {
3569 assert_eq!(whens.len(), 3);
3570 assert!(otherwise.is_some());
3571 assert_eq!(otherwise.as_ref().unwrap().len(), 1);
3572 } else {
3573 panic!("Expected Choice step");
3574 }
3575 }
3576
3577 #[test]
3580 fn test_multicast_builder_parallel_only() {
3581 let route = RouteBuilder::from("direct:start")
3582 .route_id("multicast-parallel")
3583 .multicast()
3584 .parallel(true)
3585 .to("mock:a")
3586 .end_multicast()
3587 .build()
3588 .unwrap();
3589
3590 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3591 assert!(config.parallel);
3592 assert_eq!(config.parallel_limit, None);
3593 } else {
3594 panic!("Expected Multicast step");
3595 }
3596 }
3597
3598 #[test]
3599 fn test_multicast_builder_timeout_only() {
3600 let route = RouteBuilder::from("direct:start")
3601 .route_id("multicast-timeout")
3602 .multicast()
3603 .timeout(Duration::from_secs(5))
3604 .to("mock:a")
3605 .end_multicast()
3606 .build()
3607 .unwrap();
3608
3609 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3610 assert_eq!(config.timeout, Some(Duration::from_secs(5)));
3611 } else {
3612 panic!("Expected Multicast step");
3613 }
3614 }
3615
3616 #[test]
3617 fn test_multicast_builder_aggregation_collect_all() {
3618 let route = RouteBuilder::from("direct:start")
3619 .route_id("multicast-collect")
3620 .multicast()
3621 .aggregation(MulticastStrategy::CollectAll)
3622 .to("mock:a")
3623 .end_multicast()
3624 .build()
3625 .unwrap();
3626
3627 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3628 assert!(matches!(config.aggregation, MulticastStrategy::CollectAll));
3629 } else {
3630 panic!("Expected Multicast step");
3631 }
3632 }
3633
3634 #[test]
3637 fn test_build_canonical_aggregate_any_completion_mode() {
3638 let spec = RouteBuilder::from("direct:start")
3639 .route_id("canonical-any-completion")
3640 .aggregate(
3641 AggregatorConfig::correlate_by("key")
3642 .complete_on_size_or_timeout(10, Duration::from_secs(30))
3643 .build()
3644 .unwrap(),
3645 )
3646 .build_canonical()
3647 .unwrap();
3648
3649 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3650 assert_eq!(agg.completion_size, Some(10));
3651 assert_eq!(agg.completion_timeout_ms, Some(30_000));
3652 } else {
3653 panic!("Expected Aggregate step");
3654 }
3655 }
3656
3657 #[test]
3658 fn test_build_canonical_aggregate_timeout_completion() {
3659 let spec = RouteBuilder::from("direct:start")
3660 .route_id("canonical-timeout-completion")
3661 .aggregate(
3662 AggregatorConfig::correlate_by("key")
3663 .complete_on_timeout(Duration::from_millis(500))
3664 .build()
3665 .unwrap(),
3666 )
3667 .build_canonical()
3668 .unwrap();
3669
3670 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3671 assert_eq!(agg.completion_size, None);
3672 assert_eq!(agg.completion_timeout_ms, Some(500));
3673 } else {
3674 panic!("Expected Aggregate step");
3675 }
3676 }
3677
3678 #[test]
3681 fn test_build_canonical_aggregate_discard_on_timeout() {
3682 use camel_api::aggregator::AggregatorConfig;
3683
3684 let spec = RouteBuilder::from("direct:start")
3685 .route_id("canonical-discard-timeout")
3686 .aggregate(
3687 AggregatorConfig::correlate_by("key")
3688 .complete_when_size(1)
3689 .discard_on_timeout(true)
3690 .build()
3691 .unwrap(),
3692 )
3693 .build_canonical()
3694 .unwrap();
3695
3696 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3697 assert_eq!(agg.discard_on_timeout, Some(true));
3698 } else {
3699 panic!("Expected Aggregate step");
3700 }
3701 }
3702
3703 #[test]
3704 fn test_build_canonical_aggregate_force_completion_on_stop() {
3705 use camel_api::aggregator::AggregatorConfig;
3706
3707 let spec = RouteBuilder::from("direct:start")
3708 .route_id("canonical-force-stop")
3709 .aggregate(
3710 AggregatorConfig::correlate_by("key")
3711 .complete_when_size(1)
3712 .force_completion_on_stop(true)
3713 .build()
3714 .unwrap(),
3715 )
3716 .build_canonical()
3717 .unwrap();
3718
3719 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3720 assert_eq!(agg.force_completion_on_stop, Some(true));
3721 } else {
3722 panic!("Expected Aggregate step");
3723 }
3724 }
3725
3726 #[test]
3729 fn test_build_canonical_aggregate_max_buckets_and_ttl() {
3730 use camel_api::aggregator::AggregatorConfig;
3731
3732 let spec = RouteBuilder::from("direct:start")
3733 .route_id("canonical-buckets-ttl")
3734 .aggregate(
3735 AggregatorConfig::correlate_by("key")
3736 .complete_when_size(1)
3737 .max_buckets(100)
3738 .bucket_ttl(Duration::from_secs(60))
3739 .build()
3740 .unwrap(),
3741 )
3742 .build_canonical()
3743 .unwrap();
3744
3745 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3746 assert_eq!(agg.max_buckets, Some(100));
3747 assert_eq!(agg.bucket_ttl_ms, Some(60_000));
3748 } else {
3749 panic!("Expected Aggregate step");
3750 }
3751 }
3752
3753 #[test]
3756 fn test_split_builder_with_filter_inside() {
3757 use camel_api::splitter::{SplitterConfig, split_body_lines};
3758
3759 let definition = RouteBuilder::from("timer:test")
3760 .route_id("split-with-filter")
3761 .split(SplitterConfig::new(split_body_lines()))
3762 .filter(|_ex| true)
3763 .to("mock:filtered-frag")
3764 .end_filter()
3765 .end_split()
3766 .build()
3767 .unwrap();
3768
3769 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3770 assert_eq!(steps.len(), 1);
3771 assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3772 } else {
3773 panic!("Expected Split step");
3774 }
3775 }
3776
3777 #[test]
3780 fn test_wire_tap_multiple_taps() {
3781 let definition = RouteBuilder::from("timer:tick")
3782 .route_id("multi-wire-tap")
3783 .wire_tap("mock:tap1")
3784 .wire_tap("mock:tap2")
3785 .to("mock:result")
3786 .build()
3787 .unwrap();
3788
3789 assert_eq!(definition.steps().len(), 3);
3790 assert!(
3791 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap1")
3792 );
3793 assert!(
3794 matches!(&definition.steps()[1], BuilderStep::WireTap { uri } if uri == "mock:tap2")
3795 );
3796 }
3797
3798 #[test]
3801 fn test_builder_shorthand_then_explicit_mixed_mode() {
3802 let result = RouteBuilder::from("direct:start")
3803 .route_id("mixed-mode-2")
3804 .dead_letter_channel("log:dlc")
3805 .error_handler(ErrorHandlerConfig::log_only())
3806 .to("mock:out")
3807 .build();
3808
3809 let err = result.err().expect("mixed mode should fail");
3810 assert!(format!("{err}").contains("mixed error handler modes"));
3811 }
3812
3813 #[test]
3816 fn test_build_canonical_empty_from_uri_errors() {
3817 let result = RouteBuilder::from("").route_id("test").build_canonical();
3818 assert!(result.is_err());
3819 }
3820
3821 #[test]
3822 fn test_build_canonical_missing_route_id_errors() {
3823 let result = RouteBuilder::from("direct:start").build_canonical();
3824 assert!(result.is_err());
3825 let err = result.unwrap_err().to_string();
3826 assert!(err.contains("route_id"));
3827 }
3828
3829 #[test]
3832 fn test_split_builder_with_aggregate_inside() {
3833 use camel_api::aggregator::AggregatorConfig;
3834 use camel_api::splitter::{SplitterConfig, split_body_lines};
3835
3836 let definition = RouteBuilder::from("timer:test")
3837 .route_id("split-agg")
3838 .split(SplitterConfig::new(split_body_lines()))
3839 .aggregate(
3840 AggregatorConfig::correlate_by("frag-key")
3841 .complete_when_size(3)
3842 .build()
3843 .unwrap(),
3844 )
3845 .end_split()
3846 .build()
3847 .unwrap();
3848
3849 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3850 assert_eq!(steps.len(), 1);
3851 assert!(matches!(&steps[0], BuilderStep::Aggregate { .. }));
3852 } else {
3853 panic!("Expected Split step");
3854 }
3855 }
3856
3857 #[test]
3860 fn test_throttle_builder_with_steps_inside() {
3861 let definition = RouteBuilder::from("timer:tick")
3862 .route_id("throttle-steps")
3863 .throttle(10, Duration::from_secs(1))
3864 .set_header("throttled", Value::Bool(true))
3865 .to("mock:throttled")
3866 .end_throttle()
3867 .build()
3868 .unwrap();
3869
3870 if let BuilderStep::Throttle { steps, .. } = &definition.steps()[0] {
3871 assert_eq!(steps.len(), 2);
3872 } else {
3873 panic!("Expected Throttle step");
3874 }
3875 }
3876
3877 #[test]
3880 fn test_load_balance_builder_with_steps_inside() {
3881 let definition = RouteBuilder::from("timer:tick")
3882 .route_id("lb-steps")
3883 .load_balance()
3884 .round_robin()
3885 .set_header("lb", Value::Bool(true))
3886 .to("mock:lb")
3887 .end_load_balance()
3888 .build()
3889 .unwrap();
3890
3891 if let BuilderStep::LoadBalance { steps, .. } = &definition.steps()[0] {
3892 assert_eq!(steps.len(), 2);
3893 } else {
3894 panic!("Expected LoadBalance step");
3895 }
3896 }
3897
3898 #[test]
3901 fn test_multicast_builder_with_steps_inside() {
3902 let definition = RouteBuilder::from("timer:tick")
3903 .route_id("multicast-steps")
3904 .multicast()
3905 .set_header("mc", Value::Bool(true))
3906 .to("mock:multicast")
3907 .end_multicast()
3908 .build()
3909 .unwrap();
3910
3911 if let BuilderStep::Multicast { steps, .. } = &definition.steps()[0] {
3912 assert_eq!(steps.len(), 2);
3913 } else {
3914 panic!("Expected Multicast step");
3915 }
3916 }
3917
3918 #[test]
3921 fn test_loop_builder_with_steps_inside() {
3922 let definition = RouteBuilder::from("timer:tick")
3923 .route_id("loop-steps")
3924 .loop_count(3)
3925 .set_header("loop", Value::Bool(true))
3926 .to("mock:loop")
3927 .end_loop()
3928 .build()
3929 .unwrap();
3930
3931 if let BuilderStep::Loop { steps, .. } = &definition.steps()[0] {
3932 assert_eq!(steps.len(), 2);
3933 } else {
3934 panic!("Expected Loop step");
3935 }
3936 }
3937
3938 #[test]
3941 fn test_build_canonical_rejects_loop_step() {
3942 let err = RouteBuilder::from("direct:start")
3943 .route_id("canonical-loop")
3944 .loop_count(3)
3945 .to("mock:loop")
3946 .end_loop()
3947 .build_canonical()
3948 .unwrap_err();
3949
3950 assert!(format!("{err}").contains("does not support step `loop`"));
3951 }
3952
3953 #[test]
3954 fn test_build_canonical_rejects_multicast_step() {
3955 let err = RouteBuilder::from("direct:start")
3956 .route_id("canonical-multicast")
3957 .multicast()
3958 .to("mock:a")
3959 .end_multicast()
3960 .build_canonical()
3961 .unwrap_err();
3962
3963 assert!(format!("{err}").contains("does not support step `multicast`"));
3964 }
3965
3966 #[test]
3967 fn test_build_canonical_rejects_throttle_step() {
3968 let err = RouteBuilder::from("direct:start")
3969 .route_id("canonical-throttle")
3970 .throttle(10, Duration::from_secs(1))
3971 .to("mock:result")
3972 .end_throttle()
3973 .build_canonical()
3974 .unwrap_err();
3975
3976 assert!(format!("{err}").contains("does not support step `throttle`"));
3977 }
3978
3979 #[test]
3980 fn test_build_canonical_rejects_load_balancer_step() {
3981 let err = RouteBuilder::from("direct:start")
3982 .route_id("canonical-lb")
3983 .load_balance()
3984 .round_robin()
3985 .to("mock:result")
3986 .end_load_balance()
3987 .build_canonical()
3988 .unwrap_err();
3989
3990 assert!(format!("{err}").contains("does not support step `load_balancer`"));
3991 }
3992
3993 #[test]
3994 fn test_build_canonical_rejects_bean_step() {
3995 let err = RouteBuilder::from("direct:start")
3996 .route_id("canonical-bean")
3997 .bean("myBean", "process")
3998 .build_canonical()
3999 .unwrap_err();
4000
4001 assert!(format!("{err}").contains("does not support step `bean`"));
4002 }
4003
4004 #[test]
4005 fn test_build_canonical_rejects_script_step() {
4006 let err = RouteBuilder::from("direct:start")
4007 .route_id("canonical-script")
4008 .script("rhai", "x = 1")
4009 .build_canonical()
4010 .unwrap_err();
4011
4012 assert!(format!("{err}").contains("does not support step `script`"));
4013 }
4014
4015 #[test]
4016 fn test_build_canonical_accepts_delay_step() {
4017 let spec = RouteBuilder::from("direct:start")
4018 .route_id("canonical-delay")
4019 .delay(Duration::from_millis(100))
4020 .build_canonical()
4021 .unwrap();
4022
4023 assert!(
4024 spec.steps.iter().any(
4025 |s| matches!(s, CanonicalStepSpec::Delay { delay_ms, .. } if *delay_ms == 100)
4026 )
4027 );
4028 }
4029
4030 #[test]
4031 fn test_build_canonical_accepts_wire_tap_step() {
4032 let spec = RouteBuilder::from("direct:start")
4033 .route_id("canonical-wiretap")
4034 .wire_tap("mock:tap")
4035 .build_canonical()
4036 .unwrap();
4037
4038 assert!(
4039 spec.steps
4040 .iter()
4041 .any(|s| matches!(s, CanonicalStepSpec::WireTap { uri } if uri == "mock:tap"))
4042 );
4043 }
4044
4045 #[test]
4046 fn test_build_canonical_rejects_dynamic_router_step() {
4047 let err = RouteBuilder::from("direct:start")
4048 .route_id("canonical-dyn-router")
4049 .dynamic_router(Arc::new(|_| Some("mock:a".to_string())))
4050 .build_canonical()
4051 .unwrap_err();
4052
4053 assert!(format!("{err}").contains("does not support step `dynamic_router`"));
4054 }
4055
4056 #[test]
4057 fn test_build_canonical_rejects_routing_slip_step() {
4058 let err = RouteBuilder::from("direct:start")
4059 .route_id("canonical-routing-slip")
4060 .routing_slip(Arc::new(|_| Some("mock:a".to_string())))
4061 .build_canonical()
4062 .unwrap_err();
4063
4064 assert!(format!("{err}").contains("does not support step `routing_slip`"));
4065 }
4066
4067 #[test]
4068 fn test_build_canonical_rejects_recipient_list_step() {
4069 let err = RouteBuilder::from("direct:start")
4070 .route_id("canonical-recipient")
4071 .recipient_list(Arc::new(|_| "mock:a".to_string()))
4072 .build_canonical()
4073 .unwrap_err();
4074
4075 assert!(format!("{err}").contains("does not support step `recipient_list`"));
4076 }
4077
4078 #[test]
4081 fn test_build_canonical_rejects_any_mode_with_predicate() {
4082 let err = RouteBuilder::from("direct:start")
4083 .route_id("canonical-any-pred")
4084 .aggregate(AggregatorConfig {
4085 header_name: "key".to_string(),
4086 completion: CompletionMode::Any(vec![
4087 CompletionCondition::Size(5),
4088 CompletionCondition::Predicate(Arc::new(|_| false)),
4089 ]),
4090 correlation: CorrelationStrategy::HeaderName("key".to_string()),
4091 strategy: AggregationStrategy::CollectAll,
4092 max_buckets: None,
4093 bucket_ttl: None,
4094 force_completion_on_stop: false,
4095 discard_on_timeout: false,
4096 })
4097 .build_canonical()
4098 .unwrap_err();
4099
4100 assert!(format!("{err}").contains("predicate completion"));
4101 }
4102
4103 #[test]
4106 fn test_builder_validation_missing_from_uri() {
4107 let result = RouteBuilder::from("")
4108 .route_id("missing-uri-route")
4109 .to("log:info")
4110 .build();
4111 assert!(result.is_err(), "empty from URI should fail validation");
4112 let err = result.err().unwrap().to_string();
4113 assert!(
4114 err.contains("'from'") || err.contains("URI"),
4115 "error should mention from/URI, got: {err}"
4116 );
4117 }
4118
4119 #[test]
4120 fn test_builder_validation_invalid_step_uri_scheme() {
4121 let result = RouteBuilder::from("timer:tick")
4122 .route_id("bad-step-route")
4123 .to("not-a-valid-uri") .build();
4125 assert!(
4128 result.is_ok(),
4129 "builder should accept opaque step URIs; resolution happens later"
4130 );
4131 }
4132
4133 #[test]
4136 fn test_builder_duplicate_route_ids_produce_identical_definitions() {
4137 let route1 = RouteBuilder::from("direct:a")
4140 .route_id("dup-route")
4141 .to("mock:out")
4142 .build();
4143 let route2 = RouteBuilder::from("direct:b")
4144 .route_id("dup-route")
4145 .to("mock:out")
4146 .build();
4147
4148 assert!(route1.is_ok());
4149 assert!(route2.is_ok());
4150 assert_eq!(route1.unwrap().route_id(), route2.unwrap().route_id());
4151 }
4152}