1use camel_api::DelayConfig;
7use camel_api::aggregator::{
8 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
9};
10use camel_api::body::Body;
11use camel_api::body_converter::BodyType;
12use camel_api::circuit_breaker::CircuitBreakerConfig;
13use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
14use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
15use camel_api::load_balancer::LoadBalancerConfig;
16use camel_api::loop_eip::{LoopConfig, LoopMode};
17use camel_api::multicast::{MulticastConfig, MulticastStrategy};
18use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
19use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
20use camel_api::splitter::SplitterConfig;
21use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
22use camel_api::{
23 BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
24 ProcessorFn, Value,
25 runtime::{
26 CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
27 CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
28 CanonicalWhenSpec,
29 },
30};
31use camel_component_api::ConcurrencyModel;
32use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
33use camel_processor::{
34 ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
35 StreamCacheService, UnmarshalService, builtin_data_format,
36};
37
38pub mod do_try;
40pub use do_try::{DoCatchBuilder, DoFinallyBuilder, DoTryBuilder};
41
42pub trait StepAccumulator: Sized {
48 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
49
50 fn to(mut self, endpoint: impl Into<String>) -> Self {
51 self.steps_mut().push(BuilderStep::To(endpoint.into()));
52 self
53 }
54
55 fn process<F, Fut>(mut self, f: F) -> Self
56 where
57 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
58 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
59 {
60 let svc = ProcessorFn::new(f);
61 self.steps_mut()
62 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
63 self
64 }
65
66 fn process_fn(mut self, processor: BoxProcessor) -> Self {
67 self.steps_mut().push(BuilderStep::Processor(processor));
68 self
69 }
70
71 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
72 let svc = SetHeader::new(IdentityProcessor, key, value);
73 self.steps_mut()
74 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
75 self
76 }
77
78 fn map_body<F>(mut self, mapper: F) -> Self
79 where
80 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
81 {
82 let svc = MapBody::new(IdentityProcessor, mapper);
83 self.steps_mut()
84 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
85 self
86 }
87
88 fn set_body<B>(mut self, body: B) -> Self
89 where
90 B: Into<Body> + Clone + Send + Sync + 'static,
91 {
92 let body: Body = body.into();
93 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
94 self.steps_mut()
95 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
96 self
97 }
98
99 fn transform<B>(self, body: B) -> Self
104 where
105 B: Into<Body> + Clone + Send + Sync + 'static,
106 {
107 self.set_body(body)
108 }
109
110 fn set_body_fn<F>(mut self, expr: F) -> Self
111 where
112 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
113 {
114 let svc = SetBody::new(IdentityProcessor, expr);
115 self.steps_mut()
116 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
117 self
118 }
119
120 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
121 where
122 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
123 {
124 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
125 self.steps_mut()
126 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
127 self
128 }
129
130 fn aggregate(mut self, config: AggregatorConfig) -> Self {
131 self.steps_mut().push(BuilderStep::Aggregate { config });
132 self
133 }
134
135 fn stop(mut self) -> Self {
141 self.steps_mut().push(BuilderStep::Stop);
142 self
143 }
144
145 fn delay(mut self, duration: std::time::Duration) -> Self {
146 self.steps_mut().push(BuilderStep::Delay {
147 config: DelayConfig::from_duration(duration),
148 });
149 self
150 }
151
152 fn delay_with_header(
153 mut self,
154 duration: std::time::Duration,
155 header: impl Into<String>,
156 ) -> Self {
157 self.steps_mut().push(BuilderStep::Delay {
158 config: DelayConfig::from_duration_with_header(duration, header),
159 });
160 self
161 }
162
163 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
167 self.steps_mut().push(BuilderStep::Log {
168 level,
169 message: message.into(),
170 });
171 self
172 }
173
174 fn convert_body_to(mut self, target: BodyType) -> Self {
186 let svc = ConvertBodyTo::new(IdentityProcessor, target);
187 self.steps_mut()
188 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
189 self
190 }
191
192 fn stream_cache(mut self, threshold: usize) -> Self {
193 let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
194 let svc = StreamCacheService::new(IdentityProcessor, config);
195 self.steps_mut()
196 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
197 self
198 }
199
200 fn stream_cache_default(self) -> Self {
204 self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
205 }
206
207 fn marshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
218 let name = format.into();
219 let df = builtin_data_format(&name)
220 .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
221 let svc = MarshalService::new(IdentityProcessor, df);
222 self.steps_mut()
223 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
224 Ok(self)
225 }
226
227 fn unmarshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
238 let name = format.into();
239 let df = builtin_data_format(&name)
240 .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
241 let svc = UnmarshalService::new(IdentityProcessor, df);
242 self.steps_mut()
243 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
244 Ok(self)
245 }
246
247 fn validate(mut self, schema_path: impl Into<String>) -> Self {
256 let path = schema_path.into();
257 let uri = if path.starts_with("validator:") {
258 path
259 } else {
260 format!("validator:{path}")
261 };
262 self.steps_mut().push(BuilderStep::To(uri));
263 self
264 }
265
266 fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
277 self.steps_mut().push(BuilderStep::Script {
278 language: language.into(),
279 script: script.into(),
280 });
281 self
282 }
283
284 fn enrich(mut self, uri: impl Into<String>) -> Self {
290 self.steps_mut().push(BuilderStep::Enrich {
291 uri: uri.into(),
292 strategy: None,
293 timeout_ms: None,
294 });
295 self
296 }
297
298 fn poll_enrich(mut self, uri: impl Into<String>, timeout_ms: u64) -> Self {
304 self.steps_mut().push(BuilderStep::PollEnrich {
305 uri: uri.into(),
306 strategy: None,
307 timeout_ms: Some(timeout_ms),
308 });
309 self
310 }
311
312 fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
313 self.steps_mut().push(BuilderStep::Bean {
314 name: name.into(),
315 method: method.into(),
316 });
317 self
318 }
319}
320
321pub struct RouteBuilder {
336 from_uri: String,
337 steps: Vec<BuilderStep>,
338 error_handler: Option<ErrorHandlerConfig>,
339 error_handler_mode: ErrorHandlerMode,
340 circuit_breaker_config: Option<CircuitBreakerConfig>,
341 security_policy_config: Option<camel_api::security_policy::SecurityPolicyConfig>,
342 security_authenticator: Option<std::sync::Arc<dyn camel_auth::TokenAuthenticator>>,
343 concurrency: Option<ConcurrencyModel>,
344 route_id: Option<String>,
345 auto_startup: Option<bool>,
346 startup_order: Option<i32>,
347}
348
349#[derive(Default)]
350enum ErrorHandlerMode {
351 #[default]
352 None,
353 ExplicitConfig,
354 Shorthand {
355 dlc_uri: Option<String>,
356 specs: Vec<OnExceptionSpec>,
357 },
358 Mixed,
359}
360
361#[derive(Clone)]
362struct OnExceptionSpec {
363 matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
364 retry: Option<RedeliveryPolicy>,
365 handled_by: Option<String>,
366}
367
368impl RouteBuilder {
369 pub fn from(endpoint: &str) -> Self {
371 Self {
372 from_uri: endpoint.to_string(),
373 steps: Vec::new(),
374 error_handler: None,
375 error_handler_mode: ErrorHandlerMode::None,
376 circuit_breaker_config: None,
377 security_policy_config: None,
378 security_authenticator: None,
379 concurrency: None,
380 route_id: None,
381 auto_startup: None,
382 startup_order: None,
383 }
384 }
385
386 pub fn filter<F>(self, predicate: F) -> FilterBuilder
390 where
391 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
392 {
393 FilterBuilder {
394 parent: self,
395 predicate: std::sync::Arc::new(predicate),
396 steps: vec![],
397 }
398 }
399
400 pub fn choice(self) -> ChoiceBuilder {
406 ChoiceBuilder {
407 parent: self,
408 whens: vec![],
409 _otherwise: None,
410 }
411 }
412
413 pub fn wire_tap(mut self, endpoint: &str) -> Self {
417 self.steps.push(BuilderStep::WireTap {
418 uri: endpoint.to_string(),
419 });
420 self
421 }
422
423 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
425 self.error_handler_mode = match self.error_handler_mode {
426 ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
427 ErrorHandlerMode::ExplicitConfig
428 }
429 ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
430 };
431 self.error_handler = Some(config);
432 self
433 }
434
435 pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
437 let uri = uri.into();
438 self.error_handler_mode = match self.error_handler_mode {
439 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
440 dlc_uri: Some(uri),
441 specs: Vec::new(),
442 },
443 ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
444 dlc_uri: Some(uri),
445 specs,
446 },
447 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
448 };
449 self
450 }
451
452 pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
454 where
455 F: Fn(&CamelError) -> bool + Send + Sync + 'static,
456 {
457 self.error_handler_mode = match self.error_handler_mode {
458 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
459 dlc_uri: None,
460 specs: Vec::new(),
461 },
462 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
463 shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
464 };
465
466 OnExceptionBuilder {
467 parent: self,
468 policy: OnExceptionSpec {
469 matches: std::sync::Arc::new(matches),
470 retry: None,
471 handled_by: None,
472 },
473 }
474 }
475
476 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
478 self.circuit_breaker_config = Some(config);
479 self
480 }
481
482 pub fn security_policy(
483 mut self,
484 config: camel_api::security_policy::SecurityPolicyConfig,
485 ) -> Self {
486 self.security_policy_config = Some(config);
487 self
488 }
489
490 pub fn security_authenticator(
491 mut self,
492 auth: std::sync::Arc<dyn camel_auth::TokenAuthenticator>,
493 ) -> Self {
494 self.security_authenticator = Some(auth);
495 self
496 }
497
498 pub fn concurrent(mut self, max: usize) -> Self {
512 let max = if max == 0 { None } else { Some(max) };
513 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
514 self
515 }
516
517 pub fn sequential(mut self) -> Self {
522 self.concurrency = Some(ConcurrencyModel::Sequential);
523 self
524 }
525
526 pub fn route_id(mut self, id: impl Into<String>) -> Self {
530 self.route_id = Some(id.into());
531 self
532 }
533
534 pub fn auto_startup(mut self, auto: bool) -> Self {
538 self.auto_startup = Some(auto);
539 self
540 }
541
542 pub fn startup_order(mut self, order: i32) -> Self {
546 self.startup_order = Some(order);
547 self
548 }
549
550 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
556 SplitBuilder {
557 parent: self,
558 config,
559 steps: Vec::new(),
560 }
561 }
562
563 pub fn multicast(self) -> MulticastBuilder {
569 MulticastBuilder {
570 parent: self,
571 steps: Vec::new(),
572 config: MulticastConfig::new(),
573 }
574 }
575
576 pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
583 ThrottleBuilder {
584 parent: self,
585 config: ThrottlerConfig::new(max_requests, period),
586 steps: Vec::new(),
587 }
588 }
589
590 pub fn loop_count(self, count: usize) -> LoopBuilder {
592 LoopBuilder {
593 parent: self,
594 config: LoopConfig {
595 mode: LoopMode::Count(count),
596 },
597 steps: vec![],
598 }
599 }
600
601 pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
603 where
604 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
605 {
606 LoopBuilder {
607 parent: self,
608 config: LoopConfig {
609 mode: LoopMode::While(std::sync::Arc::new(predicate)),
610 },
611 steps: vec![],
612 }
613 }
614
615 pub fn load_balance(self) -> LoadBalancerBuilder {
621 LoadBalancerBuilder {
622 parent: self,
623 config: LoadBalancerConfig::round_robin(),
624 steps: Vec::new(),
625 }
626 }
627
628 pub fn dynamic_router(self, expression: RouterExpression) -> Self {
644 self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
645 }
646
647 pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
651 self.steps.push(BuilderStep::DynamicRouter { config });
652 self
653 }
654
655 pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
656 self.routing_slip_with_config(RoutingSlipConfig::new(expression))
657 }
658
659 pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
660 self.steps.push(BuilderStep::RoutingSlip { config });
661 self
662 }
663
664 pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
665 self.recipient_list_with_config(RecipientListConfig::new(expression))
666 }
667
668 pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
669 self.steps.push(BuilderStep::RecipientList { config });
670 self
671 }
672
673 pub fn build(self) -> Result<RouteDefinition, CamelError> {
679 validate_uri(&self.from_uri)?;
680 let route_id = self
681 .route_id
682 .filter(|s| !s.trim().is_empty())
683 .ok_or_else(|| {
684 CamelError::RouteError(
685 "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
686 .to_string(),
687 )
688 })?;
689 let resolved_error_handler = match self.error_handler_mode {
690 ErrorHandlerMode::None => self.error_handler,
691 ErrorHandlerMode::ExplicitConfig => self.error_handler,
692 ErrorHandlerMode::Mixed => {
693 return Err(CamelError::RouteError(
694 "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
695 ));
696 }
697 ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
698 let mut config = if let Some(uri) = dlc_uri {
699 ErrorHandlerConfig::dead_letter_channel(uri)
700 } else {
701 ErrorHandlerConfig::log_only()
702 };
703
704 for spec in specs {
705 let matcher = spec.matches.clone();
706 let mut builder = config.on_exception(move |e| matcher(e));
707
708 if let Some(retry) = spec.retry {
709 builder = builder.retry(retry.max_attempts).with_backoff(
710 retry.initial_delay,
711 retry.multiplier,
712 retry.max_delay,
713 );
714 if retry.jitter_factor > 0.0 {
715 builder = builder.with_jitter(retry.jitter_factor);
716 }
717 }
718
719 if let Some(uri) = spec.handled_by {
720 builder = builder.handled_by(uri);
721 }
722
723 config = builder.build();
724 }
725
726 Some(config)
727 }
728 };
729
730 let definition = RouteDefinition::new(self.from_uri, self.steps);
731 let definition = if let Some(eh) = resolved_error_handler {
732 definition.with_error_handler(eh)
733 } else {
734 definition
735 };
736 let definition = if let Some(cb) = self.circuit_breaker_config {
737 definition.with_circuit_breaker(cb)
738 } else {
739 definition
740 };
741 let definition = if let Some(sp) = self.security_policy_config {
742 definition.with_security_policy(sp)
743 } else {
744 definition
745 };
746 let definition = if let Some(auth) = self.security_authenticator {
747 definition.with_security_authenticator(auth)
748 } else {
749 definition
750 };
751 let definition = if let Some(concurrency) = self.concurrency {
752 definition.with_concurrency(concurrency)
753 } else {
754 definition
755 };
756 let definition = definition.with_route_id(route_id);
757 let definition = if let Some(auto) = self.auto_startup {
758 definition.with_auto_startup(auto)
759 } else {
760 definition
761 };
762 let definition = if let Some(order) = self.startup_order {
763 definition.with_startup_order(order)
764 } else {
765 definition
766 };
767 Ok(definition)
768 }
769
770 pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
772 validate_uri(&self.from_uri)?;
773 let route_id = self
774 .route_id
775 .filter(|s| !s.trim().is_empty())
776 .ok_or_else(|| {
777 CamelError::RouteError(
778 "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
779 .to_string(),
780 )
781 })?;
782
783 let steps = canonicalize_steps(self.steps)?;
784 let circuit_breaker = self
785 .circuit_breaker_config
786 .map(canonicalize_circuit_breaker);
787
788 if self.security_policy_config.is_some() {
789 return Err(CamelError::RouteError(
790 "routes with security_policy cannot use the canonical/hot-reload path (not yet supported)"
791 .into(),
792 ));
793 }
794
795 let spec = CanonicalRouteSpec {
796 route_id,
797 from: self.from_uri,
798 steps,
799 circuit_breaker,
800 auto_startup: None,
801 startup_order: None,
802 concurrency: None,
803 version: camel_api::CANONICAL_CONTRACT_VERSION,
804 };
805 spec.validate_contract()?;
806 Ok(spec)
807 }
808}
809
810pub struct OnExceptionBuilder {
811 parent: RouteBuilder,
812 policy: OnExceptionSpec,
813}
814
815impl OnExceptionBuilder {
816 pub fn retry(mut self, max_attempts: u32) -> Self {
817 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
818 self
819 }
820
821 pub fn with_backoff(
822 mut self,
823 initial: std::time::Duration,
824 multiplier: f64,
825 max: std::time::Duration,
826 ) -> Self {
827 if let Some(ref mut retry) = self.policy.retry {
828 retry.initial_delay = initial;
829 retry.multiplier = multiplier;
830 retry.max_delay = max;
831 } else {
832 tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
833 }
834 self
835 }
836
837 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
838 if let Some(ref mut retry) = self.policy.retry {
839 retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
840 } else {
841 tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
842 }
843 self
844 }
845
846 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
847 self.policy.handled_by = Some(uri.into());
848 self
849 }
850
851 pub fn end_on_exception(mut self) -> RouteBuilder {
852 if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
853 specs.push(self.policy);
854 }
855 self.parent
856 }
857}
858
859fn validate_uri(uri: &str) -> Result<(), CamelError> {
861 let trimmed = uri.trim();
862 if trimmed.is_empty() {
863 return Err(CamelError::RouteError(
864 "route must have a 'from' URI".to_string(),
865 ));
866 }
867 if !trimmed.contains(':') {
868 return Err(CamelError::RouteError(
869 "URI must have a scheme (e.g. 'timer:tick')".to_string(),
870 ));
871 }
872 let scheme = trimmed.split(':').next().unwrap_or("");
873 if scheme.trim().is_empty() {
874 return Err(CamelError::RouteError(
875 "URI scheme must not be empty".to_string(),
876 ));
877 }
878 Ok(())
879}
880
881fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
882 let mut canonical = Vec::with_capacity(steps.len());
883 for step in steps {
884 canonical.push(canonicalize_step(step)?);
885 }
886 Ok(canonical)
887}
888
889fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
890 match step {
891 BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
892 BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
893 BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
894 BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
895 BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
896 delay_ms: config.delay_ms,
897 dynamic_header: config.dynamic_header,
898 }),
899 BuilderStep::DeclarativeScript { expression } => {
900 Ok(CanonicalStepSpec::Script { expression })
901 }
902 BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
903 predicate,
904 steps: canonicalize_steps(steps)?,
905 }),
906 BuilderStep::DeclarativeChoice { whens, otherwise } => {
907 let mut canonical_whens = Vec::with_capacity(whens.len());
908 for DeclarativeWhenStep { predicate, steps } in whens {
909 canonical_whens.push(CanonicalWhenSpec {
910 predicate,
911 steps: canonicalize_steps(steps)?,
912 });
913 }
914 let otherwise = match otherwise {
915 Some(steps) => Some(canonicalize_steps(steps)?),
916 None => None,
917 };
918 Ok(CanonicalStepSpec::Choice {
919 whens: canonical_whens,
920 otherwise,
921 })
922 }
923 BuilderStep::DeclarativeSplit {
924 expression,
925 aggregation,
926 parallel,
927 parallel_limit,
928 stop_on_exception,
929 steps,
930 } => Ok(CanonicalStepSpec::Split {
931 expression: CanonicalSplitExpressionSpec::Language(expression),
932 aggregation: canonicalize_split_aggregation(aggregation)?,
933 parallel,
934 parallel_limit,
935 stop_on_exception,
936 steps: canonicalize_steps(steps)?,
937 }),
938 BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate(
939 canonicalize_aggregate(config)?,
940 )),
941 other => {
942 let step_name = canonical_step_name(&other);
943 let detail = camel_api::canonical_contract_rejection_reason(step_name)
944 .unwrap_or("not included in canonical v1");
945 Err(CamelError::RouteError(format!(
946 "canonical v1 does not support step `{step_name}`: {detail}"
947 )))
948 }
949 }
950}
951
952fn canonicalize_split_aggregation(
953 strategy: camel_api::splitter::AggregationStrategy,
954) -> Result<CanonicalSplitAggregationSpec, CamelError> {
955 match strategy {
956 camel_api::splitter::AggregationStrategy::LastWins => {
957 Ok(CanonicalSplitAggregationSpec::LastWins)
958 }
959 camel_api::splitter::AggregationStrategy::CollectAll => {
960 Ok(CanonicalSplitAggregationSpec::CollectAll)
961 }
962 camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
963 "canonical v1 does not support custom split aggregation".to_string(),
964 )),
965 camel_api::splitter::AggregationStrategy::Original => {
966 Ok(CanonicalSplitAggregationSpec::Original)
967 }
968 }
969}
970
971fn extract_completion_fields(
972 mode: &CompletionMode,
973) -> Result<(Option<usize>, Option<u64>), CamelError> {
974 match mode {
975 CompletionMode::Single(cond) => match cond {
976 CompletionCondition::Size(n) => Ok((Some(*n), None)),
977 CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
978 CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
979 "canonical v1 does not support aggregate predicate completion".to_string(),
980 )),
981 },
982 CompletionMode::Any(conds) => {
983 let mut size = None;
984 let mut timeout_ms = None;
985 for cond in conds {
986 match cond {
987 CompletionCondition::Size(n) => size = Some(*n),
988 CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
989 CompletionCondition::Predicate(_) => {
990 return Err(CamelError::RouteError(
991 "canonical v1 does not support aggregate predicate completion"
992 .to_string(),
993 ));
994 }
995 }
996 }
997 Ok((size, timeout_ms))
998 }
999 }
1000}
1001
1002fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
1003 let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
1004
1005 let header = match &config.correlation {
1006 CorrelationStrategy::HeaderName(h) => h.clone(),
1007 CorrelationStrategy::Expression { expr, .. } => expr.clone(),
1008 CorrelationStrategy::Fn(_) => {
1009 return Err(CamelError::RouteError(
1010 "canonical v1 does not support Fn correlation strategy".to_string(),
1011 ));
1012 }
1013 };
1014
1015 let correlation_key = match &config.correlation {
1016 CorrelationStrategy::HeaderName(_) => None,
1017 CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
1018 CorrelationStrategy::Fn(_) => unreachable!(),
1019 };
1020
1021 let strategy = match config.strategy {
1022 AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
1023 AggregationStrategy::Custom(_) => {
1024 return Err(CamelError::RouteError(
1025 "canonical v1 does not support custom aggregate strategy".to_string(),
1026 ));
1027 }
1028 };
1029 let bucket_ttl_ms = config
1030 .bucket_ttl
1031 .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
1032
1033 Ok(CanonicalAggregateSpec {
1034 header,
1035 completion_size,
1036 completion_timeout_ms,
1037 correlation_key,
1038 force_completion_on_stop: if config.force_completion_on_stop {
1039 Some(true)
1040 } else {
1041 None
1042 },
1043 discard_on_timeout: if config.discard_on_timeout {
1044 Some(true)
1045 } else {
1046 None
1047 },
1048 strategy,
1049 max_buckets: config.max_buckets,
1050 bucket_ttl_ms,
1051 })
1052}
1053
1054fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
1055 CanonicalCircuitBreakerSpec {
1056 failure_threshold: config.failure_threshold,
1057 open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
1058 }
1059}
1060
1061fn canonical_step_name(step: &BuilderStep) -> &'static str {
1062 match step {
1063 BuilderStep::Processor(_) => "processor",
1064 BuilderStep::To(_) => "to",
1065 BuilderStep::Stop => "stop",
1066 BuilderStep::Log { .. } => "log",
1067 BuilderStep::DeclarativeSetHeader { .. } => "set_header",
1068 BuilderStep::DeclarativeSetBody { .. } => "set_body",
1069 BuilderStep::DeclarativeFilter { .. } => "filter",
1070 BuilderStep::DeclarativeChoice { .. } => "choice",
1071 BuilderStep::DeclarativeScript { .. } => "script",
1072 BuilderStep::DeclarativeFunction { .. } => "function",
1073 BuilderStep::DeclarativeSplit { .. } => "split",
1074 BuilderStep::Split { .. } => "split",
1075 BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
1076 BuilderStep::Aggregate { .. } => "aggregate",
1077 BuilderStep::Filter { .. } => "filter",
1078 BuilderStep::Choice { .. } => "choice",
1079 BuilderStep::WireTap { .. } => "wire_tap",
1080 BuilderStep::Delay { .. } => "delay",
1081 BuilderStep::Multicast { .. } => "multicast",
1082 BuilderStep::DeclarativeLog { .. } => "log",
1083 BuilderStep::Bean { .. } => "bean",
1084 BuilderStep::Script { .. } => "script",
1085 BuilderStep::Throttle { .. } => "throttle",
1086 BuilderStep::LoadBalance { .. } => "load_balancer",
1087 BuilderStep::DynamicRouter { .. } => "dynamic_router",
1088 BuilderStep::RoutingSlip { .. } => "routing_slip",
1089 BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
1090 BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
1091 BuilderStep::RecipientList { .. } => "recipient_list",
1092 BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
1093 BuilderStep::DeclarativeSetProperty { .. } => "set_property",
1094 BuilderStep::DeclarativeStreamSplit { .. } => "stream_split",
1095 BuilderStep::Enrich { .. } => "enrich",
1096 BuilderStep::PollEnrich { .. } => "poll_enrich",
1097 BuilderStep::DeclarativeDoTry { .. } => "do_try",
1098 }
1099}
1100
1101impl StepAccumulator for RouteBuilder {
1102 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1103 &mut self.steps
1104 }
1105}
1106
1107pub struct SplitBuilder {
1115 parent: RouteBuilder,
1116 config: SplitterConfig,
1117 steps: Vec<BuilderStep>,
1118}
1119
1120impl SplitBuilder {
1121 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1123 where
1124 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1125 {
1126 FilterInSplitBuilder {
1127 parent: self,
1128 predicate: std::sync::Arc::new(predicate),
1129 steps: vec![],
1130 }
1131 }
1132
1133 pub fn end_split(mut self) -> RouteBuilder {
1136 let split_step = BuilderStep::Split {
1137 config: self.config,
1138 steps: self.steps,
1139 };
1140 self.parent.steps.push(split_step);
1141 self.parent
1142 }
1143}
1144
1145impl StepAccumulator for SplitBuilder {
1146 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1147 &mut self.steps
1148 }
1149}
1150
1151pub struct FilterBuilder {
1153 parent: RouteBuilder,
1154 predicate: FilterPredicate,
1155 steps: Vec<BuilderStep>,
1156}
1157
1158impl FilterBuilder {
1159 pub fn end_filter(mut self) -> RouteBuilder {
1162 let step = BuilderStep::Filter {
1163 predicate: self.predicate,
1164 steps: self.steps,
1165 };
1166 self.parent.steps.push(step);
1167 self.parent
1168 }
1169}
1170
1171impl StepAccumulator for FilterBuilder {
1172 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1173 &mut self.steps
1174 }
1175}
1176
1177pub struct FilterInSplitBuilder {
1179 parent: SplitBuilder,
1180 predicate: FilterPredicate,
1181 steps: Vec<BuilderStep>,
1182}
1183
1184impl FilterInSplitBuilder {
1185 pub fn end_filter(mut self) -> SplitBuilder {
1187 let step = BuilderStep::Filter {
1188 predicate: self.predicate,
1189 steps: self.steps,
1190 };
1191 self.parent.steps.push(step);
1192 self.parent
1193 }
1194}
1195
1196impl StepAccumulator for FilterInSplitBuilder {
1197 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1198 &mut self.steps
1199 }
1200}
1201
1202pub struct ChoiceBuilder {
1209 parent: RouteBuilder,
1210 whens: Vec<WhenStep>,
1211 _otherwise: Option<Vec<BuilderStep>>,
1212}
1213
1214impl ChoiceBuilder {
1215 pub fn when<F>(self, predicate: F) -> WhenBuilder
1218 where
1219 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1220 {
1221 WhenBuilder {
1222 parent: self,
1223 predicate: std::sync::Arc::new(predicate),
1224 steps: vec![],
1225 }
1226 }
1227
1228 pub fn otherwise(self) -> OtherwiseBuilder {
1232 OtherwiseBuilder {
1233 parent: self,
1234 steps: vec![],
1235 }
1236 }
1237
1238 pub fn end_choice(mut self) -> RouteBuilder {
1242 let step = BuilderStep::Choice {
1243 whens: self.whens,
1244 otherwise: self._otherwise,
1245 };
1246 self.parent.steps.push(step);
1247 self.parent
1248 }
1249}
1250
1251pub struct WhenBuilder {
1253 parent: ChoiceBuilder,
1254 predicate: camel_api::FilterPredicate,
1255 steps: Vec<BuilderStep>,
1256}
1257
1258impl WhenBuilder {
1259 pub fn end_when(mut self) -> ChoiceBuilder {
1262 self.parent.whens.push(WhenStep {
1263 predicate: self.predicate,
1264 steps: self.steps,
1265 });
1266 self.parent
1267 }
1268}
1269
1270impl StepAccumulator for WhenBuilder {
1271 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1272 &mut self.steps
1273 }
1274}
1275
1276pub struct OtherwiseBuilder {
1278 parent: ChoiceBuilder,
1279 steps: Vec<BuilderStep>,
1280}
1281
1282impl OtherwiseBuilder {
1283 pub fn end_otherwise(self) -> ChoiceBuilder {
1285 let OtherwiseBuilder { mut parent, steps } = self;
1286 parent._otherwise = Some(steps);
1287 parent
1288 }
1289}
1290
1291impl StepAccumulator for OtherwiseBuilder {
1292 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1293 &mut self.steps
1294 }
1295}
1296
1297pub struct MulticastBuilder {
1305 parent: RouteBuilder,
1306 steps: Vec<BuilderStep>,
1307 config: MulticastConfig,
1308}
1309
1310impl MulticastBuilder {
1311 pub fn parallel(mut self, parallel: bool) -> Self {
1312 self.config = self.config.parallel(parallel);
1313 self
1314 }
1315
1316 pub fn parallel_limit(mut self, limit: usize) -> Self {
1317 self.config = self.config.parallel_limit(limit);
1318 self
1319 }
1320
1321 pub fn stop_on_exception(mut self, stop: bool) -> Self {
1322 self.config = self.config.stop_on_exception(stop);
1323 self
1324 }
1325
1326 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1327 self.config = self.config.timeout(duration);
1328 self
1329 }
1330
1331 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1332 self.config = self.config.aggregation(strategy);
1333 self
1334 }
1335
1336 pub fn end_multicast(mut self) -> RouteBuilder {
1337 let step = BuilderStep::Multicast {
1338 steps: self.steps,
1339 config: self.config,
1340 };
1341 self.parent.steps.push(step);
1342 self.parent
1343 }
1344}
1345
1346impl StepAccumulator for MulticastBuilder {
1347 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1348 &mut self.steps
1349 }
1350}
1351
1352pub struct ThrottleBuilder {
1360 parent: RouteBuilder,
1361 config: ThrottlerConfig,
1362 steps: Vec<BuilderStep>,
1363}
1364
1365impl ThrottleBuilder {
1366 pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1372 self.config = self.config.strategy(strategy);
1373 self
1374 }
1375
1376 pub fn end_throttle(mut self) -> RouteBuilder {
1379 let step = BuilderStep::Throttle {
1380 config: self.config,
1381 steps: self.steps,
1382 };
1383 self.parent.steps.push(step);
1384 self.parent
1385 }
1386}
1387
1388impl StepAccumulator for ThrottleBuilder {
1389 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1390 &mut self.steps
1391 }
1392}
1393
1394pub struct LoopBuilder {
1396 parent: RouteBuilder,
1397 config: LoopConfig,
1398 steps: Vec<BuilderStep>,
1399}
1400
1401impl LoopBuilder {
1402 pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1403 LoopInLoopBuilder {
1404 parent: self,
1405 config: LoopConfig {
1406 mode: LoopMode::Count(count),
1407 },
1408 steps: vec![],
1409 }
1410 }
1411
1412 pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1413 where
1414 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1415 {
1416 LoopInLoopBuilder {
1417 parent: self,
1418 config: LoopConfig {
1419 mode: LoopMode::While(std::sync::Arc::new(predicate)),
1420 },
1421 steps: vec![],
1422 }
1423 }
1424
1425 pub fn end_loop(mut self) -> RouteBuilder {
1426 let step = BuilderStep::Loop {
1427 config: self.config,
1428 steps: self.steps,
1429 };
1430 self.parent.steps.push(step);
1431 self.parent
1432 }
1433}
1434
1435impl StepAccumulator for LoopBuilder {
1436 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1437 &mut self.steps
1438 }
1439}
1440
1441pub struct LoopInLoopBuilder {
1442 parent: LoopBuilder,
1443 config: LoopConfig,
1444 steps: Vec<BuilderStep>,
1445}
1446
1447impl LoopInLoopBuilder {
1448 pub fn end_loop(mut self) -> LoopBuilder {
1449 let step = BuilderStep::Loop {
1450 config: self.config,
1451 steps: self.steps,
1452 };
1453 self.parent.steps.push(step);
1454 self.parent
1455 }
1456}
1457
1458impl StepAccumulator for LoopInLoopBuilder {
1459 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1460 &mut self.steps
1461 }
1462}
1463
1464pub struct LoadBalancerBuilder {
1472 parent: RouteBuilder,
1473 config: LoadBalancerConfig,
1474 steps: Vec<BuilderStep>,
1475}
1476
1477impl LoadBalancerBuilder {
1478 pub fn round_robin(mut self) -> Self {
1480 self.config = LoadBalancerConfig::round_robin();
1481 self
1482 }
1483
1484 pub fn random(mut self) -> Self {
1486 self.config = LoadBalancerConfig::random();
1487 self
1488 }
1489
1490 pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1495 self.config = LoadBalancerConfig::weighted(weights);
1496 self
1497 }
1498
1499 pub fn failover(mut self) -> Self {
1504 self.config = LoadBalancerConfig::failover();
1505 self
1506 }
1507
1508 pub fn end_load_balance(mut self) -> RouteBuilder {
1511 let step = BuilderStep::LoadBalance {
1512 config: self.config,
1513 steps: self.steps,
1514 };
1515 self.parent.steps.push(step);
1516 self.parent
1517 }
1518}
1519
1520impl StepAccumulator for LoadBalancerBuilder {
1521 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1522 &mut self.steps
1523 }
1524}
1525
1526#[cfg(test)]
1531mod tests {
1532 use super::*;
1533 use camel_api::error_handler::ErrorHandlerConfig;
1534 use camel_api::load_balancer::LoadBalanceStrategy;
1535 use camel_api::{Exchange, Message};
1536 use camel_core::route::BuilderStep;
1537 use std::sync::Arc;
1538 use std::time::Duration;
1539 use tower::{Service, ServiceExt};
1540
1541 #[test]
1542 fn test_builder_from_creates_definition() {
1543 let definition = RouteBuilder::from("timer:tick")
1544 .route_id("test-route")
1545 .build()
1546 .unwrap();
1547 assert_eq!(definition.from_uri(), "timer:tick");
1548 }
1549
1550 #[test]
1551 fn test_builder_empty_from_uri_errors() {
1552 let result = RouteBuilder::from("").route_id("test-route").build();
1553 assert!(result.is_err());
1554 }
1555
1556 #[test]
1557 fn test_build_rejects_schemeless_uri() {
1558 let result = RouteBuilder::from("no-scheme-here")
1559 .route_id("test-route")
1560 .build();
1561 match result {
1562 Err(err) => {
1563 let err_msg = format!("{err}");
1564 assert!(
1565 err_msg.contains("scheme"),
1566 "expected scheme-related error, got: {err_msg}"
1567 );
1568 }
1569 Ok(_) => panic!("schemeless URI should fail"),
1570 }
1571 }
1572
1573 #[test]
1574 fn test_build_rejects_empty_scheme_uri() {
1575 let result = RouteBuilder::from(":missing-scheme")
1576 .route_id("test-route")
1577 .build();
1578 match result {
1579 Err(err) => {
1580 let err_msg = format!("{err}");
1581 assert!(
1582 err_msg.contains("scheme"),
1583 "expected scheme-related error, got: {err_msg}"
1584 );
1585 }
1586 Ok(_) => panic!("empty-scheme URI should fail"),
1587 }
1588 }
1589
1590 #[test]
1591 fn test_build_accepts_valid_uri() {
1592 let result = RouteBuilder::from("timer:tick")
1593 .route_id("test-route")
1594 .build();
1595 assert!(result.is_ok());
1596 }
1597
1598 #[test]
1599 fn test_build_canonical_rejects_schemeless_uri() {
1600 let result = RouteBuilder::from("no-scheme-here")
1601 .route_id("test-route")
1602 .build_canonical();
1603 assert!(result.is_err());
1604 }
1605
1606 #[test]
1607 fn test_builder_to_adds_step() {
1608 let definition = RouteBuilder::from("timer:tick")
1609 .route_id("test-route")
1610 .to("log:info")
1611 .build()
1612 .unwrap();
1613
1614 assert_eq!(definition.from_uri(), "timer:tick");
1615 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1617 }
1618
1619 #[test]
1620 fn test_builder_filter_adds_filter_step() {
1621 let definition = RouteBuilder::from("timer:tick")
1622 .route_id("test-route")
1623 .filter(|_ex| true)
1624 .to("mock:result")
1625 .end_filter()
1626 .build()
1627 .unwrap();
1628
1629 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1630 }
1631
1632 #[test]
1633 fn test_builder_set_header_adds_processor_step() {
1634 let definition = RouteBuilder::from("timer:tick")
1635 .route_id("test-route")
1636 .set_header("key", Value::String("value".into()))
1637 .build()
1638 .unwrap();
1639
1640 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1641 }
1642
1643 #[test]
1644 fn test_builder_map_body_adds_processor_step() {
1645 let definition = RouteBuilder::from("timer:tick")
1646 .route_id("test-route")
1647 .map_body(|body| body)
1648 .build()
1649 .unwrap();
1650
1651 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1652 }
1653
1654 #[test]
1655 fn test_builder_process_adds_processor_step() {
1656 let definition = RouteBuilder::from("timer:tick")
1657 .route_id("test-route")
1658 .process(|ex| async move { Ok(ex) })
1659 .build()
1660 .unwrap();
1661
1662 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1663 }
1664
1665 #[test]
1666 fn test_builder_chain_multiple_steps() {
1667 let definition = RouteBuilder::from("timer:tick")
1668 .route_id("test-route")
1669 .set_header("source", Value::String("timer".into()))
1670 .filter(|ex| ex.input.header("source").is_some())
1671 .to("log:info")
1672 .end_filter()
1673 .to("mock:result")
1674 .build()
1675 .unwrap();
1676
1677 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1681 }
1682
1683 #[test]
1684 fn test_loop_count_builder() {
1685 use camel_api::loop_eip::LoopMode;
1686
1687 let def = RouteBuilder::from("direct:start")
1688 .route_id("loop-test")
1689 .loop_count(3)
1690 .to("mock:inside")
1691 .end_loop()
1692 .to("mock:after")
1693 .build()
1694 .unwrap();
1695
1696 assert_eq!(def.steps().len(), 2);
1697 match &def.steps()[0] {
1698 BuilderStep::Loop { config, steps } => {
1699 assert!(matches!(config.mode, LoopMode::Count(3)));
1700 assert_eq!(steps.len(), 1);
1701 }
1702 other => panic!("Expected Loop, got {:?}", other),
1703 }
1704 assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1705 }
1706
1707 #[test]
1708 fn test_loop_while_builder() {
1709 use camel_api::loop_eip::LoopMode;
1710
1711 let def = RouteBuilder::from("direct:start")
1712 .route_id("loop-while-test")
1713 .loop_while(|_ex| true)
1714 .to("mock:retry")
1715 .end_loop()
1716 .build()
1717 .unwrap();
1718
1719 assert_eq!(def.steps().len(), 1);
1720 match &def.steps()[0] {
1721 BuilderStep::Loop { config, steps } => {
1722 assert!(matches!(config.mode, LoopMode::While(_)));
1723 assert_eq!(steps.len(), 1);
1724 }
1725 other => panic!("Expected Loop, got {:?}", other),
1726 }
1727 }
1728
1729 #[test]
1730 fn test_nested_loop_builder() {
1731 use camel_api::loop_eip::LoopMode;
1732
1733 let def = RouteBuilder::from("direct:start")
1734 .route_id("nested-loop-test")
1735 .loop_count(2)
1736 .to("mock:outer")
1737 .loop_count(3)
1738 .to("mock:inner")
1739 .end_loop()
1740 .end_loop()
1741 .to("mock:after")
1742 .build()
1743 .unwrap();
1744
1745 assert_eq!(def.steps().len(), 2);
1746 match &def.steps()[0] {
1747 BuilderStep::Loop { steps, .. } => {
1748 assert_eq!(steps.len(), 2);
1749 match &steps[1] {
1750 BuilderStep::Loop {
1751 config,
1752 steps: inner_steps,
1753 } => {
1754 assert!(matches!(config.mode, LoopMode::Count(3)));
1755 assert_eq!(inner_steps.len(), 1);
1756 }
1757 other => panic!("Expected nested Loop, got {:?}", other),
1758 }
1759 }
1760 other => panic!("Expected outer Loop, got {:?}", other),
1761 }
1762 }
1763
1764 #[tokio::test]
1769 async fn test_set_header_processor_works() {
1770 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1771 let exchange = Exchange::new(Message::new("test"));
1772 let result = svc.call(exchange).await.unwrap();
1773 assert_eq!(
1774 result.input.header("greeting"),
1775 Some(&Value::String("hello".into()))
1776 );
1777 }
1778
1779 #[tokio::test]
1780 async fn test_filter_processor_passes() {
1781 use camel_api::BoxProcessorExt;
1782 use camel_processor::FilterService;
1783
1784 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1785 let mut svc =
1786 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1787 let exchange = Exchange::new(Message::new("pass"));
1788 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1789 assert_eq!(result.input.body.as_text(), Some("pass"));
1790 }
1791
1792 #[tokio::test]
1793 async fn test_filter_processor_blocks() {
1794 use camel_api::BoxProcessorExt;
1795 use camel_processor::FilterService;
1796
1797 let sub = BoxProcessor::from_fn(|_ex| {
1798 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1799 });
1800 let mut svc =
1801 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1802 let exchange = Exchange::new(Message::new("reject"));
1803 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1804 assert_eq!(result.input.body.as_text(), Some("reject"));
1805 }
1806
1807 #[tokio::test]
1808 async fn test_map_body_processor_works() {
1809 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1810 if let Some(text) = body.as_text() {
1811 Body::Text(text.to_uppercase())
1812 } else {
1813 body
1814 }
1815 });
1816 let exchange = Exchange::new(Message::new("hello"));
1817 let result = mapper.oneshot(exchange).await.unwrap();
1818 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1819 }
1820
1821 #[tokio::test]
1822 async fn test_process_custom_processor_works() {
1823 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1824 ex.set_property("custom", Value::Bool(true));
1825 Ok(ex)
1826 });
1827 let exchange = Exchange::new(Message::default());
1828 let result = processor.oneshot(exchange).await.unwrap();
1829 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1830 }
1831
1832 #[tokio::test]
1837 async fn test_compose_pipeline_runs_steps_in_order() {
1838 use camel_core::route::{CompiledStep, compose_pipeline};
1839
1840 let processors = vec![
1841 CompiledStep::Process {
1842 processor: BoxProcessor::new(SetHeader::new(
1843 IdentityProcessor,
1844 "step",
1845 Value::String("one".into()),
1846 )),
1847 body_contract: None,
1848 },
1849 CompiledStep::Process {
1850 processor: BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1851 if let Some(text) = body.as_text() {
1852 Body::Text(format!("{}-processed", text))
1853 } else {
1854 body
1855 }
1856 })),
1857 body_contract: None,
1858 },
1859 ];
1860
1861 let pipeline = compose_pipeline(processors);
1862 let exchange = Exchange::new(Message::new("hello"));
1863 let result = pipeline.oneshot(exchange).await.unwrap();
1864
1865 assert_eq!(
1866 result.input.header("step"),
1867 Some(&Value::String("one".into()))
1868 );
1869 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1870 }
1871
1872 #[tokio::test]
1873 async fn test_compose_pipeline_empty_is_identity() {
1874 use camel_core::route::compose_pipeline;
1875
1876 let pipeline = compose_pipeline(vec![]);
1877 let exchange = Exchange::new(Message::new("unchanged"));
1878 let result = pipeline.oneshot(exchange).await.unwrap();
1879 assert_eq!(result.input.body.as_text(), Some("unchanged"));
1880 }
1881
1882 #[test]
1887 fn test_builder_circuit_breaker_sets_config() {
1888 use camel_api::circuit_breaker::CircuitBreakerConfig;
1889
1890 let config = CircuitBreakerConfig::new().failure_threshold(5);
1891 let definition = RouteBuilder::from("timer:tick")
1892 .route_id("test-route")
1893 .circuit_breaker(config)
1894 .build()
1895 .unwrap();
1896
1897 let cb = definition
1898 .circuit_breaker_config()
1899 .expect("circuit breaker should be set");
1900 assert_eq!(cb.failure_threshold, 5);
1901 }
1902
1903 #[test]
1904 fn test_builder_circuit_breaker_with_error_handler() {
1905 use camel_api::circuit_breaker::CircuitBreakerConfig;
1906 use camel_api::error_handler::ErrorHandlerConfig;
1907
1908 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1909 let eh_config = ErrorHandlerConfig::log_only();
1910
1911 let definition = RouteBuilder::from("timer:tick")
1912 .route_id("test-route")
1913 .to("log:info")
1914 .circuit_breaker(cb_config)
1915 .error_handler(eh_config)
1916 .build()
1917 .unwrap();
1918
1919 assert!(
1920 definition.circuit_breaker_config().is_some(),
1921 "circuit breaker config should be set"
1922 );
1923 }
1925
1926 #[test]
1927 fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1928 let definition = RouteBuilder::from("direct:start")
1929 .route_id("test-route")
1930 .dead_letter_channel("log:dlc")
1931 .on_exception(|e| matches!(e, CamelError::Io(_)))
1932 .retry(3)
1933 .handled_by("log:io")
1934 .end_on_exception()
1935 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1936 .retry(1)
1937 .end_on_exception()
1938 .to("mock:out")
1939 .build()
1940 .expect("route should build");
1941
1942 let cfg = definition
1943 .error_handler_config()
1944 .expect("error handler should be set");
1945 assert_eq!(cfg.policies.len(), 2);
1946 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1947 assert_eq!(
1948 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1949 Some(3)
1950 );
1951 assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1952 assert_eq!(
1953 cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1954 Some(1)
1955 );
1956 }
1957
1958 #[test]
1959 fn test_builder_on_exception_mixed_mode_rejected() {
1960 let result = RouteBuilder::from("direct:start")
1961 .route_id("test-route")
1962 .error_handler(ErrorHandlerConfig::log_only())
1963 .on_exception(|_e| true)
1964 .end_on_exception()
1965 .to("mock:out")
1966 .build();
1967
1968 let err = result.err().expect("mixed mode should fail with an error");
1969
1970 assert!(
1971 format!("{err}").contains("mixed error handler modes"),
1972 "unexpected error: {err}"
1973 );
1974 }
1975
1976 #[test]
1977 fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1978 let definition = RouteBuilder::from("direct:start")
1979 .route_id("test-route")
1980 .on_exception(|_e| true)
1981 .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1982 .with_jitter(0.5)
1983 .end_on_exception()
1984 .to("mock:out")
1985 .build()
1986 .expect("route should build");
1987
1988 let cfg = definition
1989 .error_handler_config()
1990 .expect("error handler should be set");
1991 assert_eq!(cfg.policies.len(), 1);
1992 assert!(cfg.policies[0].retry.is_none());
1993 }
1994
1995 #[test]
1996 fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1997 let definition = RouteBuilder::from("direct:start")
1998 .route_id("test-route")
1999 .dead_letter_channel("log:dlc")
2000 .to("mock:out")
2001 .build()
2002 .expect("route should build");
2003
2004 let cfg = definition
2005 .error_handler_config()
2006 .expect("error handler should be set");
2007 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
2008 assert!(cfg.policies.is_empty());
2009 }
2010
2011 #[test]
2012 fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
2013 let definition = RouteBuilder::from("direct:start")
2014 .route_id("test-route")
2015 .dead_letter_channel("log:first")
2016 .on_exception(|e| matches!(e, CamelError::Io(_)))
2017 .retry(2)
2018 .end_on_exception()
2019 .dead_letter_channel("log:second")
2020 .to("mock:out")
2021 .build()
2022 .expect("route should build");
2023
2024 let cfg = definition
2025 .error_handler_config()
2026 .expect("error handler should be set");
2027 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2028 assert_eq!(cfg.policies.len(), 1);
2029 assert_eq!(
2030 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
2031 Some(2)
2032 );
2033 }
2034
2035 #[test]
2036 fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
2037 let definition = RouteBuilder::from("direct:start")
2038 .route_id("test-route")
2039 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
2040 .retry(1)
2041 .end_on_exception()
2042 .to("mock:out")
2043 .build()
2044 .expect("route should build");
2045
2046 let cfg = definition
2047 .error_handler_config()
2048 .expect("error handler should be set");
2049 assert!(cfg.dlc_uri.is_none());
2050 assert_eq!(cfg.policies.len(), 1);
2051 }
2052
2053 #[test]
2054 fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
2055 let first = ErrorHandlerConfig::dead_letter_channel("log:first");
2056 let second = ErrorHandlerConfig::dead_letter_channel("log:second");
2057
2058 let definition = RouteBuilder::from("direct:start")
2059 .route_id("test-route")
2060 .error_handler(first)
2061 .error_handler(second)
2062 .to("mock:out")
2063 .build()
2064 .expect("route should build");
2065
2066 let cfg = definition
2067 .error_handler_config()
2068 .expect("error handler should be set");
2069 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2070 }
2071
2072 #[test]
2075 fn test_split_builder_typestate() {
2076 use camel_api::splitter::{SplitterConfig, split_body_lines};
2077
2078 let definition = RouteBuilder::from("timer:test?period=1000")
2080 .route_id("test-route")
2081 .split(SplitterConfig::new(split_body_lines()))
2082 .to("mock:per-fragment")
2083 .end_split()
2084 .to("mock:final")
2085 .build()
2086 .unwrap();
2087
2088 assert_eq!(definition.steps().len(), 2);
2090 }
2091
2092 #[test]
2093 fn test_split_builder_steps_collected() {
2094 use camel_api::splitter::{SplitterConfig, split_body_lines};
2095
2096 let definition = RouteBuilder::from("timer:test?period=1000")
2097 .route_id("test-route")
2098 .split(SplitterConfig::new(split_body_lines()))
2099 .set_header("fragment", Value::String("yes".into()))
2100 .to("mock:per-fragment")
2101 .end_split()
2102 .build()
2103 .unwrap();
2104
2105 assert_eq!(definition.steps().len(), 1);
2107 match &definition.steps()[0] {
2108 BuilderStep::Split { steps, .. } => {
2109 assert_eq!(steps.len(), 2); }
2111 other => panic!("Expected Split, got {:?}", other),
2112 }
2113 }
2114
2115 #[test]
2116 fn test_split_builder_config_propagated() {
2117 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
2118
2119 let definition = RouteBuilder::from("timer:test?period=1000")
2120 .route_id("test-route")
2121 .split(
2122 SplitterConfig::new(split_body_lines())
2123 .parallel(true)
2124 .parallel_limit(4)
2125 .aggregation(AggregationStrategy::CollectAll),
2126 )
2127 .to("mock:per-fragment")
2128 .end_split()
2129 .build()
2130 .unwrap();
2131
2132 match &definition.steps()[0] {
2133 BuilderStep::Split { config, .. } => {
2134 assert!(config.parallel);
2135 assert_eq!(config.parallel_limit, Some(4));
2136 assert!(matches!(
2137 config.aggregation,
2138 AggregationStrategy::CollectAll
2139 ));
2140 }
2141 other => panic!("Expected Split, got {:?}", other),
2142 }
2143 }
2144
2145 #[test]
2146 fn test_aggregate_builder_adds_step() {
2147 use camel_api::aggregator::AggregatorConfig;
2148 use camel_core::route::BuilderStep;
2149
2150 let definition = RouteBuilder::from("timer:tick")
2151 .route_id("test-route")
2152 .aggregate(
2153 AggregatorConfig::correlate_by("key")
2154 .complete_when_size(2)
2155 .build()
2156 .unwrap(),
2157 )
2158 .build()
2159 .unwrap();
2160
2161 assert_eq!(definition.steps().len(), 1);
2162 assert!(matches!(
2163 definition.steps()[0],
2164 BuilderStep::Aggregate { .. }
2165 ));
2166 }
2167
2168 #[test]
2169 fn test_aggregate_in_split_builder() {
2170 use camel_api::aggregator::AggregatorConfig;
2171 use camel_api::splitter::{SplitterConfig, split_body_lines};
2172 use camel_core::route::BuilderStep;
2173
2174 let definition = RouteBuilder::from("timer:tick")
2175 .route_id("test-route")
2176 .split(SplitterConfig::new(split_body_lines()))
2177 .aggregate(
2178 AggregatorConfig::correlate_by("key")
2179 .complete_when_size(1)
2180 .build()
2181 .unwrap(),
2182 )
2183 .end_split()
2184 .build()
2185 .unwrap();
2186
2187 assert_eq!(definition.steps().len(), 1);
2188 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2189 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2190 } else {
2191 panic!("expected Split step");
2192 }
2193 }
2194
2195 #[test]
2198 fn test_builder_set_body_static_adds_processor() {
2199 let definition = RouteBuilder::from("timer:tick")
2200 .route_id("test-route")
2201 .set_body("fixed")
2202 .build()
2203 .unwrap();
2204 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2205 }
2206
2207 #[test]
2208 fn test_builder_set_body_fn_adds_processor() {
2209 let definition = RouteBuilder::from("timer:tick")
2210 .route_id("test-route")
2211 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2212 .build()
2213 .unwrap();
2214 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2215 }
2216
2217 #[test]
2218 fn transform_alias_produces_same_as_set_body() {
2219 let route_transform = RouteBuilder::from("timer:tick")
2220 .route_id("test-route")
2221 .transform("hello")
2222 .build()
2223 .unwrap();
2224
2225 let route_set_body = RouteBuilder::from("timer:tick")
2226 .route_id("test-route")
2227 .set_body("hello")
2228 .build()
2229 .unwrap();
2230
2231 assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2232 }
2233
2234 #[test]
2235 fn test_builder_set_header_fn_adds_processor() {
2236 let definition = RouteBuilder::from("timer:tick")
2237 .route_id("test-route")
2238 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2239 .build()
2240 .unwrap();
2241 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2242 }
2243
2244 #[tokio::test]
2245 async fn test_set_body_static_processor_works() {
2246 use camel_core::route::{CompiledStep, compose_pipeline};
2247 let def = RouteBuilder::from("t:t")
2248 .route_id("test-route")
2249 .set_body("replaced")
2250 .build()
2251 .unwrap();
2252 let pipeline = compose_pipeline(
2253 def.steps()
2254 .iter()
2255 .filter_map(|s| {
2256 if let BuilderStep::Processor(p) = s {
2257 Some(p.clone())
2258 } else {
2259 None
2260 }
2261 })
2262 .map(|p| CompiledStep::Process {
2263 processor: p,
2264 body_contract: None,
2265 })
2266 .collect(),
2267 );
2268 let exchange = Exchange::new(Message::new("original"));
2269 let result = pipeline.oneshot(exchange).await.unwrap();
2270 assert_eq!(result.input.body.as_text(), Some("replaced"));
2271 }
2272
2273 #[tokio::test]
2274 async fn test_set_body_fn_processor_works() {
2275 use camel_core::route::{CompiledStep, compose_pipeline};
2276 let def = RouteBuilder::from("t:t")
2277 .route_id("test-route")
2278 .set_body_fn(|ex: &Exchange| {
2279 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2280 })
2281 .build()
2282 .unwrap();
2283 let pipeline = compose_pipeline(
2284 def.steps()
2285 .iter()
2286 .filter_map(|s| {
2287 if let BuilderStep::Processor(p) = s {
2288 Some(p.clone())
2289 } else {
2290 None
2291 }
2292 })
2293 .map(|p| CompiledStep::Process {
2294 processor: p,
2295 body_contract: None,
2296 })
2297 .collect(),
2298 );
2299 let exchange = Exchange::new(Message::new("hello"));
2300 let result = pipeline.oneshot(exchange).await.unwrap();
2301 assert_eq!(result.input.body.as_text(), Some("HELLO"));
2302 }
2303
2304 #[tokio::test]
2305 async fn test_set_header_fn_processor_works() {
2306 use camel_core::route::{CompiledStep, compose_pipeline};
2307 let def = RouteBuilder::from("t:t")
2308 .route_id("test-route")
2309 .set_header_fn("echo", |ex: &Exchange| {
2310 ex.input
2311 .body
2312 .as_text()
2313 .map(|t| Value::String(t.into()))
2314 .unwrap_or(Value::Null)
2315 })
2316 .build()
2317 .unwrap();
2318 let pipeline = compose_pipeline(
2319 def.steps()
2320 .iter()
2321 .filter_map(|s| {
2322 if let BuilderStep::Processor(p) = s {
2323 Some(p.clone())
2324 } else {
2325 None
2326 }
2327 })
2328 .map(|p| CompiledStep::Process {
2329 processor: p,
2330 body_contract: None,
2331 })
2332 .collect(),
2333 );
2334 let exchange = Exchange::new(Message::new("ping"));
2335 let result = pipeline.oneshot(exchange).await.unwrap();
2336 assert_eq!(
2337 result.input.header("echo"),
2338 Some(&Value::String("ping".into()))
2339 );
2340 }
2341
2342 #[test]
2345 fn test_filter_builder_typestate() {
2346 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2347 .route_id("test-route")
2348 .filter(|_ex| true)
2349 .to("mock:inner")
2350 .end_filter()
2351 .to("mock:outer")
2352 .build();
2353 assert!(result.is_ok());
2354 }
2355
2356 #[test]
2357 fn test_filter_builder_steps_collected() {
2358 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2359 .route_id("test-route")
2360 .filter(|_ex| true)
2361 .to("mock:inner")
2362 .end_filter()
2363 .build()
2364 .unwrap();
2365
2366 assert_eq!(definition.steps().len(), 1);
2367 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2368 }
2369
2370 #[test]
2371 fn test_wire_tap_builder_adds_step() {
2372 let definition = RouteBuilder::from("timer:tick")
2373 .route_id("test-route")
2374 .wire_tap("mock:tap")
2375 .to("mock:result")
2376 .build()
2377 .unwrap();
2378
2379 assert_eq!(definition.steps().len(), 2);
2380 assert!(
2381 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2382 );
2383 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2384 }
2385
2386 #[test]
2389 fn test_multicast_builder_typestate() {
2390 let definition = RouteBuilder::from("timer:tick")
2391 .route_id("test-route")
2392 .multicast()
2393 .to("direct:a")
2394 .to("direct:b")
2395 .end_multicast()
2396 .to("mock:result")
2397 .build()
2398 .unwrap();
2399
2400 assert_eq!(definition.steps().len(), 2); }
2402
2403 #[test]
2404 fn test_multicast_builder_steps_collected() {
2405 let definition = RouteBuilder::from("timer:tick")
2406 .route_id("test-route")
2407 .multicast()
2408 .to("direct:a")
2409 .to("direct:b")
2410 .end_multicast()
2411 .build()
2412 .unwrap();
2413
2414 match &definition.steps()[0] {
2415 BuilderStep::Multicast { steps, .. } => {
2416 assert_eq!(steps.len(), 2);
2417 }
2418 other => panic!("Expected Multicast, got {:?}", other),
2419 }
2420 }
2421
2422 #[test]
2425 fn test_builder_concurrent_sets_concurrency() {
2426 use camel_component_api::ConcurrencyModel;
2427
2428 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2429 .route_id("test-route")
2430 .concurrent(16)
2431 .to("log:info")
2432 .build()
2433 .unwrap();
2434
2435 assert_eq!(
2436 definition.concurrency_override(),
2437 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2438 );
2439 }
2440
2441 #[test]
2442 fn test_builder_concurrent_zero_means_unbounded() {
2443 use camel_component_api::ConcurrencyModel;
2444
2445 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2446 .route_id("test-route")
2447 .concurrent(0)
2448 .to("log:info")
2449 .build()
2450 .unwrap();
2451
2452 assert_eq!(
2453 definition.concurrency_override(),
2454 Some(&ConcurrencyModel::Concurrent { max: None })
2455 );
2456 }
2457
2458 #[test]
2459 fn test_builder_sequential_sets_concurrency() {
2460 use camel_component_api::ConcurrencyModel;
2461
2462 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2463 .route_id("test-route")
2464 .sequential()
2465 .to("log:info")
2466 .build()
2467 .unwrap();
2468
2469 assert_eq!(
2470 definition.concurrency_override(),
2471 Some(&ConcurrencyModel::Sequential)
2472 );
2473 }
2474
2475 #[test]
2476 fn test_builder_default_concurrency_is_none() {
2477 let definition = RouteBuilder::from("timer:tick")
2478 .route_id("test-route")
2479 .to("log:info")
2480 .build()
2481 .unwrap();
2482
2483 assert_eq!(definition.concurrency_override(), None);
2484 }
2485
2486 #[test]
2489 fn test_builder_route_id_sets_id() {
2490 let definition = RouteBuilder::from("timer:tick")
2491 .route_id("my-route")
2492 .build()
2493 .unwrap();
2494
2495 assert_eq!(definition.route_id(), "my-route");
2496 }
2497
2498 #[test]
2499 fn test_build_without_route_id_fails() {
2500 let result = RouteBuilder::from("timer:tick?period=1000")
2501 .to("log:info")
2502 .build();
2503 let err = match result {
2504 Err(e) => e.to_string(),
2505 Ok(_) => panic!("build() should fail without route_id"),
2506 };
2507 assert!(
2508 err.contains("route_id"),
2509 "error should mention route_id, got: {}",
2510 err
2511 );
2512 }
2513
2514 #[test]
2515 fn test_builder_empty_route_id_rejected() {
2516 let result = RouteBuilder::from("timer:tick").route_id("").build();
2517 let err = result.err().expect("empty route_id should be rejected");
2518 assert!(matches!(err, CamelError::RouteError(_)));
2519 }
2520
2521 #[test]
2522 fn test_builder_whitespace_route_id_rejected() {
2523 let result = RouteBuilder::from("timer:tick").route_id(" ").build();
2524 assert!(result.is_err());
2525 }
2526
2527 #[test]
2528 fn test_builder_auto_startup_false() {
2529 let definition = RouteBuilder::from("timer:tick")
2530 .route_id("test-route")
2531 .auto_startup(false)
2532 .build()
2533 .unwrap();
2534
2535 assert!(!definition.auto_startup());
2536 }
2537
2538 #[test]
2539 fn test_builder_startup_order_custom() {
2540 let definition = RouteBuilder::from("timer:tick")
2541 .route_id("test-route")
2542 .startup_order(50)
2543 .build()
2544 .unwrap();
2545
2546 assert_eq!(definition.startup_order(), 50);
2547 }
2548
2549 #[test]
2550 fn test_builder_defaults() {
2551 let definition = RouteBuilder::from("timer:tick")
2552 .route_id("test-route")
2553 .build()
2554 .unwrap();
2555
2556 assert_eq!(definition.route_id(), "test-route");
2557 assert!(definition.auto_startup());
2558 assert_eq!(definition.startup_order(), 1000);
2559 }
2560
2561 #[test]
2564 fn test_choice_builder_single_when() {
2565 let definition = RouteBuilder::from("timer:tick")
2566 .route_id("test-route")
2567 .choice()
2568 .when(|ex: &Exchange| ex.input.header("type").is_some())
2569 .to("mock:typed")
2570 .end_when()
2571 .end_choice()
2572 .build()
2573 .unwrap();
2574 assert_eq!(definition.steps().len(), 1);
2575 assert!(
2576 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2577 if whens.len() == 1 && otherwise.is_none())
2578 );
2579 }
2580
2581 #[test]
2582 fn test_choice_builder_when_otherwise() {
2583 let definition = RouteBuilder::from("timer:tick")
2584 .route_id("test-route")
2585 .choice()
2586 .when(|ex: &Exchange| ex.input.header("a").is_some())
2587 .to("mock:a")
2588 .end_when()
2589 .otherwise()
2590 .to("mock:fallback")
2591 .end_otherwise()
2592 .end_choice()
2593 .build()
2594 .unwrap();
2595 assert!(
2596 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2597 if whens.len() == 1 && otherwise.is_some())
2598 );
2599 }
2600
2601 #[test]
2602 fn test_choice_builder_multiple_whens() {
2603 let definition = RouteBuilder::from("timer:tick")
2604 .route_id("test-route")
2605 .choice()
2606 .when(|ex: &Exchange| ex.input.header("a").is_some())
2607 .to("mock:a")
2608 .end_when()
2609 .when(|ex: &Exchange| ex.input.header("b").is_some())
2610 .to("mock:b")
2611 .end_when()
2612 .end_choice()
2613 .build()
2614 .unwrap();
2615 assert!(
2616 matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2617 if whens.len() == 2)
2618 );
2619 }
2620
2621 #[test]
2622 fn test_choice_step_after_choice() {
2623 let definition = RouteBuilder::from("timer:tick")
2625 .route_id("test-route")
2626 .choice()
2627 .when(|_ex: &Exchange| true)
2628 .to("mock:inner")
2629 .end_when()
2630 .end_choice()
2631 .to("mock:outer") .build()
2633 .unwrap();
2634 assert_eq!(definition.steps().len(), 2);
2635 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2636 }
2637
2638 #[test]
2641 fn test_throttle_builder_typestate() {
2642 let definition = RouteBuilder::from("timer:tick")
2643 .route_id("test-route")
2644 .throttle(10, std::time::Duration::from_secs(1))
2645 .to("mock:result")
2646 .end_throttle()
2647 .build()
2648 .unwrap();
2649
2650 assert_eq!(definition.steps().len(), 1);
2651 assert!(matches!(
2652 &definition.steps()[0],
2653 BuilderStep::Throttle { .. }
2654 ));
2655 }
2656
2657 #[test]
2658 fn test_throttle_builder_with_strategy() {
2659 let definition = RouteBuilder::from("timer:tick")
2660 .route_id("test-route")
2661 .throttle(10, std::time::Duration::from_secs(1))
2662 .strategy(ThrottleStrategy::Reject)
2663 .to("mock:result")
2664 .end_throttle()
2665 .build()
2666 .unwrap();
2667
2668 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2669 assert_eq!(config.strategy, ThrottleStrategy::Reject);
2670 } else {
2671 panic!("Expected Throttle step");
2672 }
2673 }
2674
2675 #[test]
2676 fn test_throttle_builder_steps_collected() {
2677 let definition = RouteBuilder::from("timer:tick")
2678 .route_id("test-route")
2679 .throttle(5, std::time::Duration::from_secs(1))
2680 .set_header("throttled", Value::Bool(true))
2681 .to("mock:throttled")
2682 .end_throttle()
2683 .build()
2684 .unwrap();
2685
2686 match &definition.steps()[0] {
2687 BuilderStep::Throttle { steps, .. } => {
2688 assert_eq!(steps.len(), 2); }
2690 other => panic!("Expected Throttle, got {:?}", other),
2691 }
2692 }
2693
2694 #[test]
2695 fn test_throttle_step_after_throttle() {
2696 let definition = RouteBuilder::from("timer:tick")
2698 .route_id("test-route")
2699 .throttle(10, std::time::Duration::from_secs(1))
2700 .to("mock:inner")
2701 .end_throttle()
2702 .to("mock:outer")
2703 .build()
2704 .unwrap();
2705
2706 assert_eq!(definition.steps().len(), 2);
2707 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2708 }
2709
2710 #[test]
2713 fn test_load_balance_builder_typestate() {
2714 let definition = RouteBuilder::from("timer:tick")
2715 .route_id("test-route")
2716 .load_balance()
2717 .round_robin()
2718 .to("mock:a")
2719 .to("mock:b")
2720 .end_load_balance()
2721 .build()
2722 .unwrap();
2723
2724 assert_eq!(definition.steps().len(), 1);
2725 assert!(matches!(
2726 &definition.steps()[0],
2727 BuilderStep::LoadBalance { .. }
2728 ));
2729 }
2730
2731 #[test]
2732 fn test_load_balance_builder_with_strategy() {
2733 let definition = RouteBuilder::from("timer:tick")
2734 .route_id("test-route")
2735 .load_balance()
2736 .random()
2737 .to("mock:result")
2738 .end_load_balance()
2739 .build()
2740 .unwrap();
2741
2742 if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2743 assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2744 } else {
2745 panic!("Expected LoadBalance step");
2746 }
2747 }
2748
2749 #[test]
2750 fn test_load_balance_builder_steps_collected() {
2751 let definition = RouteBuilder::from("timer:tick")
2752 .route_id("test-route")
2753 .load_balance()
2754 .set_header("lb", Value::Bool(true))
2755 .to("mock:a")
2756 .end_load_balance()
2757 .build()
2758 .unwrap();
2759
2760 match &definition.steps()[0] {
2761 BuilderStep::LoadBalance { steps, .. } => {
2762 assert_eq!(steps.len(), 2); }
2764 other => panic!("Expected LoadBalance, got {:?}", other),
2765 }
2766 }
2767
2768 #[test]
2769 fn test_load_balance_step_after_load_balance() {
2770 let definition = RouteBuilder::from("timer:tick")
2772 .route_id("test-route")
2773 .load_balance()
2774 .to("mock:inner")
2775 .end_load_balance()
2776 .to("mock:outer")
2777 .build()
2778 .unwrap();
2779
2780 assert_eq!(definition.steps().len(), 2);
2781 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2782 }
2783
2784 #[test]
2787 fn test_dynamic_router_builder() {
2788 let definition = RouteBuilder::from("timer:tick")
2789 .route_id("test-route")
2790 .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2791 .build()
2792 .unwrap();
2793
2794 assert_eq!(definition.steps().len(), 1);
2795 assert!(matches!(
2796 &definition.steps()[0],
2797 BuilderStep::DynamicRouter { .. }
2798 ));
2799 }
2800
2801 #[test]
2802 fn test_dynamic_router_builder_with_config() {
2803 let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2804 .max_iterations(100)
2805 .cache_size(500);
2806
2807 let definition = RouteBuilder::from("timer:tick")
2808 .route_id("test-route")
2809 .dynamic_router_with_config(config)
2810 .build()
2811 .unwrap();
2812
2813 assert_eq!(definition.steps().len(), 1);
2814 if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2815 assert_eq!(config.max_iterations, 100);
2816 assert_eq!(config.cache_size, 500);
2817 } else {
2818 panic!("Expected DynamicRouter step");
2819 }
2820 }
2821
2822 #[test]
2823 fn test_dynamic_router_step_after_router() {
2824 let definition = RouteBuilder::from("timer:tick")
2826 .route_id("test-route")
2827 .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2828 .to("mock:outer")
2829 .build()
2830 .unwrap();
2831
2832 assert_eq!(definition.steps().len(), 2);
2833 assert!(matches!(
2834 &definition.steps()[0],
2835 BuilderStep::DynamicRouter { .. }
2836 ));
2837 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2838 }
2839
2840 #[test]
2841 fn routing_slip_builder_creates_step() {
2842 use camel_api::RoutingSlipExpression;
2843
2844 let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2845
2846 let route = RouteBuilder::from("direct:start")
2847 .route_id("routing-slip-test")
2848 .routing_slip(expression)
2849 .build()
2850 .unwrap();
2851
2852 assert!(
2853 matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2854 "Expected RoutingSlip step"
2855 );
2856 }
2857
2858 #[test]
2859 fn routing_slip_with_config_builder_creates_step() {
2860 use camel_api::RoutingSlipConfig;
2861
2862 let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2863 .uri_delimiter("|")
2864 .cache_size(50)
2865 .ignore_invalid_endpoints(true);
2866
2867 let route = RouteBuilder::from("direct:start")
2868 .route_id("routing-slip-config-test")
2869 .routing_slip_with_config(config)
2870 .build()
2871 .unwrap();
2872
2873 if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2874 assert_eq!(config.uri_delimiter, "|");
2875 assert_eq!(config.cache_size, 50);
2876 assert!(config.ignore_invalid_endpoints);
2877 } else {
2878 panic!("Expected RoutingSlip step");
2879 }
2880 }
2881
2882 #[test]
2883 fn test_builder_marshal_adds_processor_step() {
2884 let definition = RouteBuilder::from("timer:tick")
2885 .route_id("test-route")
2886 .marshal("json")
2887 .unwrap()
2888 .build()
2889 .unwrap();
2890 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2891 }
2892
2893 #[test]
2894 fn test_builder_unmarshal_adds_processor_step() {
2895 let definition = RouteBuilder::from("timer:tick")
2896 .route_id("test-route")
2897 .unmarshal("json")
2898 .unwrap()
2899 .build()
2900 .unwrap();
2901 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2902 }
2903
2904 #[test]
2905 fn test_builder_stream_cache_adds_processor_step() {
2906 let definition = RouteBuilder::from("timer:tick")
2907 .route_id("test-route")
2908 .stream_cache(1024)
2909 .build()
2910 .unwrap();
2911 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2912 }
2913
2914 #[test]
2915 fn validate_adds_to_step_with_validator_uri() {
2916 let def = RouteBuilder::from("direct:in")
2917 .route_id("test")
2918 .validate("schemas/order.xsd")
2919 .build()
2920 .unwrap();
2921 let steps = def.steps();
2922 assert_eq!(steps.len(), 1);
2923 assert!(
2924 matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2925 "got: {:?}",
2926 steps[0]
2927 );
2928 }
2929
2930 #[test]
2931 fn test_builder_marshal_returns_err_for_unknown_format() {
2932 let result = RouteBuilder::from("timer:tick")
2933 .route_id("test-route")
2934 .marshal("csv");
2935 let err = match result {
2936 Err(e) => e,
2937 Ok(_) => panic!("marshal with unknown format should return Err"),
2938 };
2939 let msg = err.to_string();
2940 assert!(
2941 msg.contains("unknown data format"),
2942 "error should mention unknown format, got: {msg}"
2943 );
2944 assert!(
2945 msg.contains("csv"),
2946 "error should mention format name, got: {msg}"
2947 );
2948 }
2949
2950 #[test]
2951 fn test_builder_unmarshal_returns_err_for_unknown_format() {
2952 let result = RouteBuilder::from("timer:tick")
2953 .route_id("test-route")
2954 .unmarshal("csv");
2955 let err = match result {
2956 Err(e) => e,
2957 Ok(_) => panic!("unmarshal with unknown format should return Err"),
2958 };
2959 let msg = err.to_string();
2960 assert!(
2961 msg.contains("unknown data format"),
2962 "error should mention unknown format, got: {msg}"
2963 );
2964 assert!(
2965 msg.contains("csv"),
2966 "error should mention format name, got: {msg}"
2967 );
2968 }
2969
2970 #[test]
2971 fn test_builder_recipient_list_creates_step() {
2972 let route = RouteBuilder::from("direct:start")
2973 .route_id("recipient-list-test")
2974 .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2975 .build()
2976 .unwrap();
2977
2978 assert!(matches!(
2979 &route.steps()[0],
2980 BuilderStep::RecipientList { .. }
2981 ));
2982 }
2983
2984 #[test]
2985 fn test_builder_recipient_list_with_config_creates_step() {
2986 let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2987
2988 let route = RouteBuilder::from("direct:start")
2989 .route_id("recipient-list-config-test")
2990 .recipient_list_with_config(config)
2991 .build()
2992 .unwrap();
2993
2994 assert!(matches!(
2995 &route.steps()[0],
2996 BuilderStep::RecipientList { .. }
2997 ));
2998 }
2999
3000 #[test]
3001 fn test_builder_script_adds_script_step() {
3002 let route = RouteBuilder::from("direct:start")
3003 .route_id("script-test")
3004 .script("rhai", "headers[\"x\"] = \"y\"")
3005 .build()
3006 .unwrap();
3007
3008 assert!(matches!(
3009 &route.steps()[0],
3010 BuilderStep::Script { language, script }
3011 if language == "rhai" && script == "headers[\"x\"] = \"y\""
3012 ));
3013 }
3014
3015 #[test]
3016 fn test_builder_delay_and_delay_with_header_add_steps() {
3017 let route = RouteBuilder::from("direct:start")
3018 .route_id("delay-test")
3019 .delay(Duration::from_millis(250))
3020 .delay_with_header(Duration::from_millis(500), "x-delay")
3021 .build()
3022 .unwrap();
3023
3024 assert_eq!(route.steps().len(), 2);
3025 assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
3026 assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
3027 }
3028
3029 #[test]
3030 fn test_builder_log_and_stop_add_steps_in_order() {
3031 let route = RouteBuilder::from("direct:start")
3032 .route_id("log-stop-test")
3033 .log("hello", LogLevel::Info)
3034 .stop()
3035 .to("mock:after")
3036 .build()
3037 .unwrap();
3038
3039 assert_eq!(route.steps().len(), 3);
3040 assert!(matches!(
3041 &route.steps()[0],
3042 BuilderStep::Log { message, .. } if message == "hello"
3043 ));
3044 assert!(matches!(&route.steps()[1], BuilderStep::Stop));
3045 assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
3046 }
3047
3048 #[test]
3049 fn test_builder_stream_cache_default_adds_processor_step() {
3050 let route = RouteBuilder::from("direct:start")
3051 .route_id("stream-cache-default-test")
3052 .stream_cache_default()
3053 .build()
3054 .unwrap();
3055
3056 assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
3057 }
3058
3059 #[test]
3060 fn test_validate_preserves_existing_validator_prefix() {
3061 let route = RouteBuilder::from("direct:in")
3062 .route_id("validate-prefix-test")
3063 .validate("validator:schemas/order.xsd")
3064 .build()
3065 .unwrap();
3066
3067 assert!(matches!(
3068 &route.steps()[0],
3069 BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
3070 ));
3071 }
3072
3073 #[test]
3074 fn test_load_balance_builder_weighted_failover_config() {
3075 let route = RouteBuilder::from("direct:start")
3076 .route_id("lb-weighted-failover")
3077 .load_balance()
3078 .weighted(vec![
3079 ("direct:a".to_string(), 3),
3080 ("direct:b".to_string(), 1),
3081 ])
3082 .failover()
3083 .to("mock:result")
3084 .end_load_balance()
3085 .build()
3086 .unwrap();
3087
3088 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3089 assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3090 } else {
3091 panic!("Expected LoadBalance step");
3092 }
3093 }
3094
3095 #[test]
3096 fn test_multicast_builder_all_config_setters() {
3097 let route = RouteBuilder::from("direct:start")
3098 .route_id("multicast-config-test")
3099 .multicast()
3100 .parallel(true)
3101 .parallel_limit(4)
3102 .stop_on_exception(true)
3103 .timeout(Duration::from_millis(300))
3104 .aggregation(MulticastStrategy::Original)
3105 .to("mock:a")
3106 .end_multicast()
3107 .build()
3108 .unwrap();
3109
3110 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3111 assert!(config.parallel);
3112 assert_eq!(config.parallel_limit, Some(4));
3113 assert!(config.stop_on_exception);
3114 assert_eq!(config.timeout, Some(Duration::from_millis(300)));
3115 assert!(matches!(config.aggregation, MulticastStrategy::Original));
3116 } else {
3117 panic!("Expected Multicast step");
3118 }
3119 }
3120
3121 #[test]
3122 fn test_build_canonical_rejects_unsupported_processor_step() {
3123 let err = RouteBuilder::from("direct:start")
3124 .route_id("canonical-reject")
3125 .set_header("k", Value::String("v".into()))
3126 .build_canonical()
3127 .unwrap_err();
3128
3129 assert!(format!("{err}").contains("does not support step `processor`"));
3130 }
3131
3132 #[test]
3135 fn test_load_balance_builder_weighted_strategy() {
3136 let route = RouteBuilder::from("direct:start")
3137 .route_id("lb-weighted")
3138 .load_balance()
3139 .weighted(vec![
3140 ("direct:a".to_string(), 5),
3141 ("direct:b".to_string(), 2),
3142 ("direct:c".to_string(), 1),
3143 ])
3144 .to("mock:result")
3145 .end_load_balance()
3146 .build()
3147 .unwrap();
3148
3149 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3150 assert!(matches!(config.strategy, LoadBalanceStrategy::Weighted(_)));
3151 } else {
3152 panic!("Expected LoadBalance step");
3153 }
3154 }
3155
3156 #[test]
3157 fn test_load_balance_builder_failover_strategy() {
3158 let route = RouteBuilder::from("direct:start")
3159 .route_id("lb-failover")
3160 .load_balance()
3161 .failover()
3162 .to("mock:primary")
3163 .end_load_balance()
3164 .build()
3165 .unwrap();
3166
3167 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3168 assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3169 } else {
3170 panic!("Expected LoadBalance step");
3171 }
3172 }
3173
3174 #[test]
3177 fn test_filter_in_split_builder_typestate() {
3178 use camel_api::splitter::{SplitterConfig, split_body_lines};
3179
3180 let definition = RouteBuilder::from("timer:test")
3181 .route_id("filter-in-split")
3182 .split(SplitterConfig::new(split_body_lines()))
3183 .filter(|_ex| true)
3184 .to("mock:filtered")
3185 .end_filter()
3186 .end_split()
3187 .build()
3188 .unwrap();
3189
3190 assert_eq!(definition.steps().len(), 1);
3191 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3192 assert_eq!(steps.len(), 1);
3193 assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3194 } else {
3195 panic!("Expected Split step");
3196 }
3197 }
3198
3199 #[test]
3200 fn test_filter_in_split_builder_multiple_steps() {
3201 use camel_api::splitter::{SplitterConfig, split_body_lines};
3202
3203 let definition = RouteBuilder::from("timer:test")
3204 .route_id("filter-in-split-multi")
3205 .split(SplitterConfig::new(split_body_lines()))
3206 .to("mock:before-filter")
3207 .filter(|_ex| true)
3208 .to("mock:inside-filter")
3209 .end_filter()
3210 .to("mock:after-filter")
3211 .end_split()
3212 .build()
3213 .unwrap();
3214
3215 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3216 assert_eq!(steps.len(), 3);
3218 } else {
3219 panic!("Expected Split step");
3220 }
3221 }
3222
3223 #[test]
3226 fn test_build_canonical_with_circuit_breaker() {
3227 use camel_api::circuit_breaker::CircuitBreakerConfig;
3228
3229 let spec = RouteBuilder::from("direct:start")
3230 .route_id("canonical-cb")
3231 .circuit_breaker(CircuitBreakerConfig::new().failure_threshold(10))
3232 .to("mock:result")
3233 .build_canonical()
3234 .unwrap();
3235
3236 let cb = spec.circuit_breaker.expect("circuit breaker should be set");
3237 assert_eq!(cb.failure_threshold, 10);
3238 }
3239
3240 #[test]
3241 fn test_build_canonical_rejects_custom_split_aggregation() {
3242 use camel_api::splitter::{SplitterConfig, split_body_lines};
3243
3244 let err = RouteBuilder::from("direct:start")
3245 .route_id("canonical-custom-split")
3246 .split(SplitterConfig::new(split_body_lines()).aggregation(
3247 camel_api::splitter::AggregationStrategy::Custom(Arc::new(|_, ex| ex)),
3248 ))
3249 .to("mock:frag")
3250 .end_split()
3251 .build_canonical()
3252 .unwrap_err();
3253
3254 assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3256 }
3257
3258 #[test]
3259 fn test_build_canonical_rejects_custom_aggregate_strategy() {
3260 let err = RouteBuilder::from("direct:start")
3261 .route_id("canonical-custom-agg")
3262 .aggregate(
3263 AggregatorConfig::correlate_by("key")
3264 .complete_when_size(2)
3265 .strategy(AggregationStrategy::Custom(Arc::new(|_, ex| ex)))
3266 .build()
3267 .unwrap(),
3268 )
3269 .build_canonical()
3270 .unwrap_err();
3271
3272 assert!(format!("{err}").contains("custom aggregate strategy"));
3273 }
3274
3275 #[test]
3276 fn test_build_canonical_rejects_fn_correlation_strategy() {
3277 let err = RouteBuilder::from("direct:start")
3278 .route_id("canonical-fn-corr")
3279 .aggregate(AggregatorConfig {
3280 header_name: "key".to_string(),
3281 completion: CompletionMode::Single(CompletionCondition::Size(1)),
3282 correlation: CorrelationStrategy::Fn(Arc::new(|_| Some("key".to_string()))),
3283 strategy: AggregationStrategy::CollectAll,
3284 max_buckets: None,
3285 bucket_ttl: None,
3286 force_completion_on_stop: false,
3287 discard_on_timeout: false,
3288 })
3289 .build_canonical()
3290 .unwrap_err();
3291
3292 assert!(format!("{err}").contains("Fn correlation strategy"));
3293 }
3294
3295 #[test]
3296 fn test_build_canonical_rejects_predicate_completion() {
3297 let err = RouteBuilder::from("direct:start")
3298 .route_id("canonical-pred-completion")
3299 .aggregate(AggregatorConfig {
3300 header_name: "key".to_string(),
3301 completion: CompletionMode::Single(CompletionCondition::Predicate(Arc::new(
3302 |_| false,
3303 ))),
3304 correlation: CorrelationStrategy::HeaderName("key".to_string()),
3305 strategy: AggregationStrategy::CollectAll,
3306 max_buckets: None,
3307 bucket_ttl: None,
3308 force_completion_on_stop: false,
3309 discard_on_timeout: false,
3310 })
3311 .build_canonical()
3312 .unwrap_err();
3313
3314 assert!(format!("{err}").contains("predicate completion"));
3315 }
3316
3317 #[test]
3318 fn test_build_canonical_with_expression_correlation() {
3319 let spec = RouteBuilder::from("direct:start")
3320 .route_id("canonical-expr-corr")
3321 .aggregate(AggregatorConfig {
3322 header_name: "key".to_string(),
3323 completion: CompletionMode::Single(CompletionCondition::Size(1)),
3324 correlation: CorrelationStrategy::Expression {
3325 expr: "header.key".to_string(),
3326 language: "simple".to_string(),
3327 },
3328 strategy: AggregationStrategy::CollectAll,
3329 max_buckets: None,
3330 bucket_ttl: None,
3331 force_completion_on_stop: false,
3332 discard_on_timeout: false,
3333 })
3334 .build_canonical()
3335 .unwrap();
3336
3337 assert!(spec.steps.iter().any(|s| matches!(s, CanonicalStepSpec::Aggregate(a) if a.correlation_key == Some("header.key".to_string()))));
3338 }
3339
3340 #[test]
3341 fn test_build_canonical_split_rejected_with_closure_expression() {
3342 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
3343
3344 let err = RouteBuilder::from("direct:start")
3346 .route_id("canonical-split-last")
3347 .split(
3348 SplitterConfig::new(split_body_lines()).aggregation(AggregationStrategy::LastWins),
3349 )
3350 .to("mock:frag")
3351 .end_split()
3352 .build_canonical()
3353 .unwrap_err();
3354
3355 assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3356 }
3357
3358 #[test]
3361 fn test_on_exception_full_chain_retry_backoff_jitter_handled_by() {
3362 let definition = RouteBuilder::from("direct:start")
3363 .route_id("on-exception-full")
3364 .dead_letter_channel("log:dlc")
3365 .on_exception(|e| matches!(e, CamelError::Io(_)))
3366 .retry(5)
3367 .with_backoff(Duration::from_millis(10), 2.0, Duration::from_millis(500))
3368 .with_jitter(0.3)
3369 .handled_by("log:io-handler")
3370 .end_on_exception()
3371 .to("mock:out")
3372 .build()
3373 .unwrap();
3374
3375 let cfg = definition
3376 .error_handler_config()
3377 .expect("error handler should be set");
3378 assert_eq!(cfg.policies.len(), 1);
3379 let policy = &cfg.policies[0];
3380 let retry = policy.retry.as_ref().expect("retry should be set");
3381 assert_eq!(retry.max_attempts, 5);
3382 assert_eq!(retry.initial_delay, Duration::from_millis(10));
3383 assert_eq!(retry.multiplier, 2.0);
3384 assert_eq!(retry.max_delay, Duration::from_millis(500));
3385 assert!((retry.jitter_factor - 0.3).abs() < f64::EPSILON);
3386 assert_eq!(policy.handled_by.as_deref(), Some("log:io-handler"));
3387 }
3388
3389 #[test]
3390 fn test_on_exception_jitter_clamped_to_valid_range() {
3391 let definition = RouteBuilder::from("direct:start")
3392 .route_id("jitter-clamp")
3393 .on_exception(|_e| true)
3394 .retry(1)
3395 .with_jitter(5.0)
3396 .end_on_exception()
3397 .to("mock:out")
3398 .build()
3399 .unwrap();
3400
3401 let cfg = definition.error_handler_config().unwrap();
3402 let retry = cfg.policies[0].retry.as_ref().unwrap();
3403 assert!((retry.jitter_factor - 1.0).abs() < f64::EPSILON);
3404 }
3405
3406 #[test]
3409 fn test_builder_process_fn_adds_processor_step() {
3410 use camel_api::BoxProcessorExt;
3411 let processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
3412 let definition = RouteBuilder::from("timer:tick")
3413 .route_id("process-fn-test")
3414 .process_fn(processor)
3415 .build()
3416 .unwrap();
3417
3418 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3419 }
3420
3421 #[test]
3422 fn test_builder_convert_body_to_adds_processor_step() {
3423 let definition = RouteBuilder::from("timer:tick")
3424 .route_id("convert-body-test")
3425 .convert_body_to(BodyType::Json)
3426 .build()
3427 .unwrap();
3428
3429 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3430 }
3431
3432 #[test]
3433 fn test_builder_bean_adds_bean_step() {
3434 let definition = RouteBuilder::from("timer:tick")
3435 .route_id("bean-test")
3436 .bean("myBean", "process")
3437 .build()
3438 .unwrap();
3439
3440 assert!(
3441 matches!(&definition.steps()[0], BuilderStep::Bean { name, method }
3442 if name == "myBean" && method == "process")
3443 );
3444 }
3445
3446 #[test]
3449 fn test_throttle_builder_delay_strategy() {
3450 let definition = RouteBuilder::from("timer:tick")
3451 .route_id("throttle-delay")
3452 .throttle(10, Duration::from_secs(1))
3453 .strategy(ThrottleStrategy::Delay)
3454 .to("mock:result")
3455 .end_throttle()
3456 .build()
3457 .unwrap();
3458
3459 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3460 assert_eq!(config.strategy, ThrottleStrategy::Delay);
3461 } else {
3462 panic!("Expected Throttle step");
3463 }
3464 }
3465
3466 #[test]
3467 fn test_throttle_builder_drop_strategy() {
3468 let definition = RouteBuilder::from("timer:tick")
3469 .route_id("throttle-drop")
3470 .throttle(10, Duration::from_secs(1))
3471 .strategy(ThrottleStrategy::Drop)
3472 .to("mock:result")
3473 .end_throttle()
3474 .build()
3475 .unwrap();
3476
3477 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3478 assert_eq!(config.strategy, ThrottleStrategy::Drop);
3479 } else {
3480 panic!("Expected Throttle step");
3481 }
3482 }
3483
3484 #[test]
3487 fn test_nested_loop_while_builder() {
3488 use camel_api::loop_eip::LoopMode;
3489
3490 let def = RouteBuilder::from("direct:start")
3491 .route_id("nested-loop-while")
3492 .loop_count(2)
3493 .to("mock:outer")
3494 .loop_while(|_ex| true)
3495 .to("mock:inner")
3496 .end_loop()
3497 .end_loop()
3498 .build()
3499 .unwrap();
3500
3501 assert_eq!(def.steps().len(), 1);
3502 if let BuilderStep::Loop { steps, .. } = &def.steps()[0] {
3503 assert_eq!(steps.len(), 2);
3504 if let BuilderStep::Loop { config, .. } = &steps[1] {
3505 assert!(matches!(config.mode, LoopMode::While(_)));
3506 } else {
3507 panic!("Expected inner Loop step");
3508 }
3509 } else {
3510 panic!("Expected outer Loop step");
3511 }
3512 }
3513
3514 #[test]
3517 fn test_choice_builder_multiple_whens_with_otherwise() {
3518 let definition = RouteBuilder::from("timer:tick")
3519 .route_id("choice-multi-otherwise")
3520 .choice()
3521 .when(|ex: &Exchange| ex.input.header("a").is_some())
3522 .to("mock:a")
3523 .end_when()
3524 .when(|ex: &Exchange| ex.input.header("b").is_some())
3525 .to("mock:b")
3526 .end_when()
3527 .when(|ex: &Exchange| ex.input.header("c").is_some())
3528 .to("mock:c")
3529 .end_when()
3530 .otherwise()
3531 .to("mock:fallback")
3532 .end_otherwise()
3533 .end_choice()
3534 .build()
3535 .unwrap();
3536
3537 if let BuilderStep::Choice { whens, otherwise } = &definition.steps()[0] {
3538 assert_eq!(whens.len(), 3);
3539 assert!(otherwise.is_some());
3540 assert_eq!(otherwise.as_ref().unwrap().len(), 1);
3541 } else {
3542 panic!("Expected Choice step");
3543 }
3544 }
3545
3546 #[test]
3549 fn test_multicast_builder_parallel_only() {
3550 let route = RouteBuilder::from("direct:start")
3551 .route_id("multicast-parallel")
3552 .multicast()
3553 .parallel(true)
3554 .to("mock:a")
3555 .end_multicast()
3556 .build()
3557 .unwrap();
3558
3559 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3560 assert!(config.parallel);
3561 assert_eq!(config.parallel_limit, None);
3562 } else {
3563 panic!("Expected Multicast step");
3564 }
3565 }
3566
3567 #[test]
3568 fn test_multicast_builder_timeout_only() {
3569 let route = RouteBuilder::from("direct:start")
3570 .route_id("multicast-timeout")
3571 .multicast()
3572 .timeout(Duration::from_secs(5))
3573 .to("mock:a")
3574 .end_multicast()
3575 .build()
3576 .unwrap();
3577
3578 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3579 assert_eq!(config.timeout, Some(Duration::from_secs(5)));
3580 } else {
3581 panic!("Expected Multicast step");
3582 }
3583 }
3584
3585 #[test]
3586 fn test_multicast_builder_aggregation_collect_all() {
3587 let route = RouteBuilder::from("direct:start")
3588 .route_id("multicast-collect")
3589 .multicast()
3590 .aggregation(MulticastStrategy::CollectAll)
3591 .to("mock:a")
3592 .end_multicast()
3593 .build()
3594 .unwrap();
3595
3596 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3597 assert!(matches!(config.aggregation, MulticastStrategy::CollectAll));
3598 } else {
3599 panic!("Expected Multicast step");
3600 }
3601 }
3602
3603 #[test]
3606 fn test_build_canonical_aggregate_any_completion_mode() {
3607 let spec = RouteBuilder::from("direct:start")
3608 .route_id("canonical-any-completion")
3609 .aggregate(
3610 AggregatorConfig::correlate_by("key")
3611 .complete_on_size_or_timeout(10, Duration::from_secs(30))
3612 .build()
3613 .unwrap(),
3614 )
3615 .build_canonical()
3616 .unwrap();
3617
3618 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3619 assert_eq!(agg.completion_size, Some(10));
3620 assert_eq!(agg.completion_timeout_ms, Some(30_000));
3621 } else {
3622 panic!("Expected Aggregate step");
3623 }
3624 }
3625
3626 #[test]
3627 fn test_build_canonical_aggregate_timeout_completion() {
3628 let spec = RouteBuilder::from("direct:start")
3629 .route_id("canonical-timeout-completion")
3630 .aggregate(
3631 AggregatorConfig::correlate_by("key")
3632 .complete_on_timeout(Duration::from_millis(500))
3633 .build()
3634 .unwrap(),
3635 )
3636 .build_canonical()
3637 .unwrap();
3638
3639 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3640 assert_eq!(agg.completion_size, None);
3641 assert_eq!(agg.completion_timeout_ms, Some(500));
3642 } else {
3643 panic!("Expected Aggregate step");
3644 }
3645 }
3646
3647 #[test]
3650 fn test_build_canonical_aggregate_discard_on_timeout() {
3651 use camel_api::aggregator::AggregatorConfig;
3652
3653 let spec = RouteBuilder::from("direct:start")
3654 .route_id("canonical-discard-timeout")
3655 .aggregate(
3656 AggregatorConfig::correlate_by("key")
3657 .complete_when_size(1)
3658 .discard_on_timeout(true)
3659 .build()
3660 .unwrap(),
3661 )
3662 .build_canonical()
3663 .unwrap();
3664
3665 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3666 assert_eq!(agg.discard_on_timeout, Some(true));
3667 } else {
3668 panic!("Expected Aggregate step");
3669 }
3670 }
3671
3672 #[test]
3673 fn test_build_canonical_aggregate_force_completion_on_stop() {
3674 use camel_api::aggregator::AggregatorConfig;
3675
3676 let spec = RouteBuilder::from("direct:start")
3677 .route_id("canonical-force-stop")
3678 .aggregate(
3679 AggregatorConfig::correlate_by("key")
3680 .complete_when_size(1)
3681 .force_completion_on_stop(true)
3682 .build()
3683 .unwrap(),
3684 )
3685 .build_canonical()
3686 .unwrap();
3687
3688 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3689 assert_eq!(agg.force_completion_on_stop, Some(true));
3690 } else {
3691 panic!("Expected Aggregate step");
3692 }
3693 }
3694
3695 #[test]
3698 fn test_build_canonical_aggregate_max_buckets_and_ttl() {
3699 use camel_api::aggregator::AggregatorConfig;
3700
3701 let spec = RouteBuilder::from("direct:start")
3702 .route_id("canonical-buckets-ttl")
3703 .aggregate(
3704 AggregatorConfig::correlate_by("key")
3705 .complete_when_size(1)
3706 .max_buckets(100)
3707 .bucket_ttl(Duration::from_secs(60))
3708 .build()
3709 .unwrap(),
3710 )
3711 .build_canonical()
3712 .unwrap();
3713
3714 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3715 assert_eq!(agg.max_buckets, Some(100));
3716 assert_eq!(agg.bucket_ttl_ms, Some(60_000));
3717 } else {
3718 panic!("Expected Aggregate step");
3719 }
3720 }
3721
3722 #[test]
3725 fn test_split_builder_with_filter_inside() {
3726 use camel_api::splitter::{SplitterConfig, split_body_lines};
3727
3728 let definition = RouteBuilder::from("timer:test")
3729 .route_id("split-with-filter")
3730 .split(SplitterConfig::new(split_body_lines()))
3731 .filter(|_ex| true)
3732 .to("mock:filtered-frag")
3733 .end_filter()
3734 .end_split()
3735 .build()
3736 .unwrap();
3737
3738 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3739 assert_eq!(steps.len(), 1);
3740 assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3741 } else {
3742 panic!("Expected Split step");
3743 }
3744 }
3745
3746 #[test]
3749 fn test_wire_tap_multiple_taps() {
3750 let definition = RouteBuilder::from("timer:tick")
3751 .route_id("multi-wire-tap")
3752 .wire_tap("mock:tap1")
3753 .wire_tap("mock:tap2")
3754 .to("mock:result")
3755 .build()
3756 .unwrap();
3757
3758 assert_eq!(definition.steps().len(), 3);
3759 assert!(
3760 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap1")
3761 );
3762 assert!(
3763 matches!(&definition.steps()[1], BuilderStep::WireTap { uri } if uri == "mock:tap2")
3764 );
3765 }
3766
3767 #[test]
3770 fn test_builder_shorthand_then_explicit_mixed_mode() {
3771 let result = RouteBuilder::from("direct:start")
3772 .route_id("mixed-mode-2")
3773 .dead_letter_channel("log:dlc")
3774 .error_handler(ErrorHandlerConfig::log_only())
3775 .to("mock:out")
3776 .build();
3777
3778 let err = result.err().expect("mixed mode should fail");
3779 assert!(format!("{err}").contains("mixed error handler modes"));
3780 }
3781
3782 #[test]
3785 fn test_build_canonical_empty_from_uri_errors() {
3786 let result = RouteBuilder::from("").route_id("test").build_canonical();
3787 assert!(result.is_err());
3788 }
3789
3790 #[test]
3791 fn test_build_canonical_missing_route_id_errors() {
3792 let result = RouteBuilder::from("direct:start").build_canonical();
3793 assert!(result.is_err());
3794 let err = result.unwrap_err().to_string();
3795 assert!(err.contains("route_id"));
3796 }
3797
3798 #[test]
3801 fn test_split_builder_with_aggregate_inside() {
3802 use camel_api::aggregator::AggregatorConfig;
3803 use camel_api::splitter::{SplitterConfig, split_body_lines};
3804
3805 let definition = RouteBuilder::from("timer:test")
3806 .route_id("split-agg")
3807 .split(SplitterConfig::new(split_body_lines()))
3808 .aggregate(
3809 AggregatorConfig::correlate_by("frag-key")
3810 .complete_when_size(3)
3811 .build()
3812 .unwrap(),
3813 )
3814 .end_split()
3815 .build()
3816 .unwrap();
3817
3818 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3819 assert_eq!(steps.len(), 1);
3820 assert!(matches!(&steps[0], BuilderStep::Aggregate { .. }));
3821 } else {
3822 panic!("Expected Split step");
3823 }
3824 }
3825
3826 #[test]
3829 fn test_throttle_builder_with_steps_inside() {
3830 let definition = RouteBuilder::from("timer:tick")
3831 .route_id("throttle-steps")
3832 .throttle(10, Duration::from_secs(1))
3833 .set_header("throttled", Value::Bool(true))
3834 .to("mock:throttled")
3835 .end_throttle()
3836 .build()
3837 .unwrap();
3838
3839 if let BuilderStep::Throttle { steps, .. } = &definition.steps()[0] {
3840 assert_eq!(steps.len(), 2);
3841 } else {
3842 panic!("Expected Throttle step");
3843 }
3844 }
3845
3846 #[test]
3849 fn test_load_balance_builder_with_steps_inside() {
3850 let definition = RouteBuilder::from("timer:tick")
3851 .route_id("lb-steps")
3852 .load_balance()
3853 .round_robin()
3854 .set_header("lb", Value::Bool(true))
3855 .to("mock:lb")
3856 .end_load_balance()
3857 .build()
3858 .unwrap();
3859
3860 if let BuilderStep::LoadBalance { steps, .. } = &definition.steps()[0] {
3861 assert_eq!(steps.len(), 2);
3862 } else {
3863 panic!("Expected LoadBalance step");
3864 }
3865 }
3866
3867 #[test]
3870 fn test_multicast_builder_with_steps_inside() {
3871 let definition = RouteBuilder::from("timer:tick")
3872 .route_id("multicast-steps")
3873 .multicast()
3874 .set_header("mc", Value::Bool(true))
3875 .to("mock:multicast")
3876 .end_multicast()
3877 .build()
3878 .unwrap();
3879
3880 if let BuilderStep::Multicast { steps, .. } = &definition.steps()[0] {
3881 assert_eq!(steps.len(), 2);
3882 } else {
3883 panic!("Expected Multicast step");
3884 }
3885 }
3886
3887 #[test]
3890 fn test_loop_builder_with_steps_inside() {
3891 let definition = RouteBuilder::from("timer:tick")
3892 .route_id("loop-steps")
3893 .loop_count(3)
3894 .set_header("loop", Value::Bool(true))
3895 .to("mock:loop")
3896 .end_loop()
3897 .build()
3898 .unwrap();
3899
3900 if let BuilderStep::Loop { steps, .. } = &definition.steps()[0] {
3901 assert_eq!(steps.len(), 2);
3902 } else {
3903 panic!("Expected Loop step");
3904 }
3905 }
3906
3907 #[test]
3910 fn test_build_canonical_rejects_loop_step() {
3911 let err = RouteBuilder::from("direct:start")
3912 .route_id("canonical-loop")
3913 .loop_count(3)
3914 .to("mock:loop")
3915 .end_loop()
3916 .build_canonical()
3917 .unwrap_err();
3918
3919 assert!(format!("{err}").contains("does not support step `loop`"));
3920 }
3921
3922 #[test]
3923 fn test_build_canonical_rejects_multicast_step() {
3924 let err = RouteBuilder::from("direct:start")
3925 .route_id("canonical-multicast")
3926 .multicast()
3927 .to("mock:a")
3928 .end_multicast()
3929 .build_canonical()
3930 .unwrap_err();
3931
3932 assert!(format!("{err}").contains("does not support step `multicast`"));
3933 }
3934
3935 #[test]
3936 fn test_build_canonical_rejects_throttle_step() {
3937 let err = RouteBuilder::from("direct:start")
3938 .route_id("canonical-throttle")
3939 .throttle(10, Duration::from_secs(1))
3940 .to("mock:result")
3941 .end_throttle()
3942 .build_canonical()
3943 .unwrap_err();
3944
3945 assert!(format!("{err}").contains("does not support step `throttle`"));
3946 }
3947
3948 #[test]
3949 fn test_build_canonical_rejects_load_balancer_step() {
3950 let err = RouteBuilder::from("direct:start")
3951 .route_id("canonical-lb")
3952 .load_balance()
3953 .round_robin()
3954 .to("mock:result")
3955 .end_load_balance()
3956 .build_canonical()
3957 .unwrap_err();
3958
3959 assert!(format!("{err}").contains("does not support step `load_balancer`"));
3960 }
3961
3962 #[test]
3963 fn test_build_canonical_rejects_bean_step() {
3964 let err = RouteBuilder::from("direct:start")
3965 .route_id("canonical-bean")
3966 .bean("myBean", "process")
3967 .build_canonical()
3968 .unwrap_err();
3969
3970 assert!(format!("{err}").contains("does not support step `bean`"));
3971 }
3972
3973 #[test]
3974 fn test_build_canonical_rejects_script_step() {
3975 let err = RouteBuilder::from("direct:start")
3976 .route_id("canonical-script")
3977 .script("rhai", "x = 1")
3978 .build_canonical()
3979 .unwrap_err();
3980
3981 assert!(format!("{err}").contains("does not support step `script`"));
3982 }
3983
3984 #[test]
3985 fn test_build_canonical_accepts_delay_step() {
3986 let spec = RouteBuilder::from("direct:start")
3987 .route_id("canonical-delay")
3988 .delay(Duration::from_millis(100))
3989 .build_canonical()
3990 .unwrap();
3991
3992 assert!(
3993 spec.steps.iter().any(
3994 |s| matches!(s, CanonicalStepSpec::Delay { delay_ms, .. } if *delay_ms == 100)
3995 )
3996 );
3997 }
3998
3999 #[test]
4000 fn test_build_canonical_accepts_wire_tap_step() {
4001 let spec = RouteBuilder::from("direct:start")
4002 .route_id("canonical-wiretap")
4003 .wire_tap("mock:tap")
4004 .build_canonical()
4005 .unwrap();
4006
4007 assert!(
4008 spec.steps
4009 .iter()
4010 .any(|s| matches!(s, CanonicalStepSpec::WireTap { uri } if uri == "mock:tap"))
4011 );
4012 }
4013
4014 #[test]
4015 fn test_build_canonical_rejects_dynamic_router_step() {
4016 let err = RouteBuilder::from("direct:start")
4017 .route_id("canonical-dyn-router")
4018 .dynamic_router(Arc::new(|_| Some("mock:a".to_string())))
4019 .build_canonical()
4020 .unwrap_err();
4021
4022 assert!(format!("{err}").contains("does not support step `dynamic_router`"));
4023 }
4024
4025 #[test]
4026 fn test_build_canonical_rejects_routing_slip_step() {
4027 let err = RouteBuilder::from("direct:start")
4028 .route_id("canonical-routing-slip")
4029 .routing_slip(Arc::new(|_| Some("mock:a".to_string())))
4030 .build_canonical()
4031 .unwrap_err();
4032
4033 assert!(format!("{err}").contains("does not support step `routing_slip`"));
4034 }
4035
4036 #[test]
4037 fn test_build_canonical_rejects_recipient_list_step() {
4038 let err = RouteBuilder::from("direct:start")
4039 .route_id("canonical-recipient")
4040 .recipient_list(Arc::new(|_| "mock:a".to_string()))
4041 .build_canonical()
4042 .unwrap_err();
4043
4044 assert!(format!("{err}").contains("does not support step `recipient_list`"));
4045 }
4046
4047 #[test]
4050 fn test_build_canonical_rejects_any_mode_with_predicate() {
4051 let err = RouteBuilder::from("direct:start")
4052 .route_id("canonical-any-pred")
4053 .aggregate(AggregatorConfig {
4054 header_name: "key".to_string(),
4055 completion: CompletionMode::Any(vec![
4056 CompletionCondition::Size(5),
4057 CompletionCondition::Predicate(Arc::new(|_| false)),
4058 ]),
4059 correlation: CorrelationStrategy::HeaderName("key".to_string()),
4060 strategy: AggregationStrategy::CollectAll,
4061 max_buckets: None,
4062 bucket_ttl: None,
4063 force_completion_on_stop: false,
4064 discard_on_timeout: false,
4065 })
4066 .build_canonical()
4067 .unwrap_err();
4068
4069 assert!(format!("{err}").contains("predicate completion"));
4070 }
4071
4072 #[test]
4075 fn test_builder_validation_missing_from_uri() {
4076 let result = RouteBuilder::from("")
4077 .route_id("missing-uri-route")
4078 .to("log:info")
4079 .build();
4080 assert!(result.is_err(), "empty from URI should fail validation");
4081 let err = result.err().unwrap().to_string();
4082 assert!(
4083 err.contains("'from'") || err.contains("URI"),
4084 "error should mention from/URI, got: {err}"
4085 );
4086 }
4087
4088 #[test]
4089 fn test_builder_validation_invalid_step_uri_scheme() {
4090 let result = RouteBuilder::from("timer:tick")
4091 .route_id("bad-step-route")
4092 .to("not-a-valid-uri") .build();
4094 assert!(
4097 result.is_ok(),
4098 "builder should accept opaque step URIs; resolution happens later"
4099 );
4100 }
4101
4102 #[test]
4105 fn test_builder_duplicate_route_ids_produce_identical_definitions() {
4106 let route1 = RouteBuilder::from("direct:a")
4109 .route_id("dup-route")
4110 .to("mock:out")
4111 .build();
4112 let route2 = RouteBuilder::from("direct:b")
4113 .route_id("dup-route")
4114 .to("mock:out")
4115 .build();
4116
4117 assert!(route1.is_ok());
4118 assert!(route2.is_ok());
4119 assert_eq!(route1.unwrap().route_id(), route2.unwrap().route_id());
4120 }
4121}