1use camel_api::DelayConfig;
7use camel_api::aggregator::{
8 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
9};
10use camel_api::body::Body;
11use camel_api::body_converter::BodyType;
12use camel_api::circuit_breaker::CircuitBreakerConfig;
13use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
14use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
15use camel_api::load_balancer::LoadBalancerConfig;
16use camel_api::loop_eip::{LoopConfig, LoopMode};
17use camel_api::multicast::{MulticastConfig, MulticastStrategy};
18use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
19use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
20use camel_api::splitter::SplitterConfig;
21use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
22use camel_api::{
23 BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
24 ProcessorFn, Value,
25 runtime::{
26 CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
27 CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
28 CanonicalWhenSpec,
29 },
30};
31use camel_component_api::ConcurrencyModel;
32use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
33use camel_processor::{
34 ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
35 StreamCacheService, UnmarshalService, builtin_data_format,
36};
37
38pub trait StepAccumulator: Sized {
44 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
45
46 fn to(mut self, endpoint: impl Into<String>) -> Self {
47 self.steps_mut().push(BuilderStep::To(endpoint.into()));
48 self
49 }
50
51 fn process<F, Fut>(mut self, f: F) -> Self
52 where
53 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
54 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
55 {
56 let svc = ProcessorFn::new(f);
57 self.steps_mut()
58 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
59 self
60 }
61
62 fn process_fn(mut self, processor: BoxProcessor) -> Self {
63 self.steps_mut().push(BuilderStep::Processor(processor));
64 self
65 }
66
67 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
68 let svc = SetHeader::new(IdentityProcessor, key, value);
69 self.steps_mut()
70 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
71 self
72 }
73
74 fn map_body<F>(mut self, mapper: F) -> Self
75 where
76 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
77 {
78 let svc = MapBody::new(IdentityProcessor, mapper);
79 self.steps_mut()
80 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
81 self
82 }
83
84 fn set_body<B>(mut self, body: B) -> Self
85 where
86 B: Into<Body> + Clone + Send + Sync + 'static,
87 {
88 let body: Body = body.into();
89 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
90 self.steps_mut()
91 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
92 self
93 }
94
95 fn transform<B>(self, body: B) -> Self
100 where
101 B: Into<Body> + Clone + Send + Sync + 'static,
102 {
103 self.set_body(body)
104 }
105
106 fn set_body_fn<F>(mut self, expr: F) -> Self
107 where
108 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
109 {
110 let svc = SetBody::new(IdentityProcessor, expr);
111 self.steps_mut()
112 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
113 self
114 }
115
116 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
117 where
118 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
119 {
120 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
121 self.steps_mut()
122 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
123 self
124 }
125
126 fn aggregate(mut self, config: AggregatorConfig) -> Self {
127 self.steps_mut().push(BuilderStep::Aggregate { config });
128 self
129 }
130
131 fn stop(mut self) -> Self {
137 self.steps_mut().push(BuilderStep::Stop);
138 self
139 }
140
141 fn delay(mut self, duration: std::time::Duration) -> Self {
142 self.steps_mut().push(BuilderStep::Delay {
143 config: DelayConfig::from_duration(duration),
144 });
145 self
146 }
147
148 fn delay_with_header(
149 mut self,
150 duration: std::time::Duration,
151 header: impl Into<String>,
152 ) -> Self {
153 self.steps_mut().push(BuilderStep::Delay {
154 config: DelayConfig::from_duration_with_header(duration, header),
155 });
156 self
157 }
158
159 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
163 self.steps_mut().push(BuilderStep::Log {
164 level,
165 message: message.into(),
166 });
167 self
168 }
169
170 fn convert_body_to(mut self, target: BodyType) -> Self {
182 let svc = ConvertBodyTo::new(IdentityProcessor, target);
183 self.steps_mut()
184 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
185 self
186 }
187
188 fn stream_cache(mut self, threshold: usize) -> Self {
189 let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
190 let svc = StreamCacheService::new(IdentityProcessor, config);
191 self.steps_mut()
192 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
193 self
194 }
195
196 fn stream_cache_default(self) -> Self {
200 self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
201 }
202
203 fn marshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
214 let name = format.into();
215 let df = builtin_data_format(&name)
216 .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
217 let svc = MarshalService::new(IdentityProcessor, df);
218 self.steps_mut()
219 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
220 Ok(self)
221 }
222
223 fn unmarshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
234 let name = format.into();
235 let df = builtin_data_format(&name)
236 .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
237 let svc = UnmarshalService::new(IdentityProcessor, df);
238 self.steps_mut()
239 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
240 Ok(self)
241 }
242
243 fn validate(mut self, schema_path: impl Into<String>) -> Self {
252 let path = schema_path.into();
253 let uri = if path.starts_with("validator:") {
254 path
255 } else {
256 format!("validator:{path}")
257 };
258 self.steps_mut().push(BuilderStep::To(uri));
259 self
260 }
261
262 fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
273 self.steps_mut().push(BuilderStep::Script {
274 language: language.into(),
275 script: script.into(),
276 });
277 self
278 }
279
280 fn enrich(mut self, uri: impl Into<String>) -> Self {
286 self.steps_mut().push(BuilderStep::Enrich {
287 uri: uri.into(),
288 strategy: None,
289 timeout_ms: None,
290 });
291 self
292 }
293
294 fn poll_enrich(mut self, uri: impl Into<String>, timeout_ms: u64) -> Self {
300 self.steps_mut().push(BuilderStep::PollEnrich {
301 uri: uri.into(),
302 strategy: None,
303 timeout_ms: Some(timeout_ms),
304 });
305 self
306 }
307
308 fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
309 self.steps_mut().push(BuilderStep::Bean {
310 name: name.into(),
311 method: method.into(),
312 });
313 self
314 }
315}
316
317pub struct RouteBuilder {
332 from_uri: String,
333 steps: Vec<BuilderStep>,
334 error_handler: Option<ErrorHandlerConfig>,
335 error_handler_mode: ErrorHandlerMode,
336 circuit_breaker_config: Option<CircuitBreakerConfig>,
337 security_policy_config: Option<camel_api::security_policy::SecurityPolicyConfig>,
338 security_authenticator: Option<std::sync::Arc<dyn camel_auth::TokenAuthenticator>>,
339 concurrency: Option<ConcurrencyModel>,
340 route_id: Option<String>,
341 auto_startup: Option<bool>,
342 startup_order: Option<i32>,
343}
344
345#[derive(Default)]
346enum ErrorHandlerMode {
347 #[default]
348 None,
349 ExplicitConfig,
350 Shorthand {
351 dlc_uri: Option<String>,
352 specs: Vec<OnExceptionSpec>,
353 },
354 Mixed,
355}
356
357#[derive(Clone)]
358struct OnExceptionSpec {
359 matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
360 retry: Option<RedeliveryPolicy>,
361 handled_by: Option<String>,
362}
363
364impl RouteBuilder {
365 pub fn from(endpoint: &str) -> Self {
367 Self {
368 from_uri: endpoint.to_string(),
369 steps: Vec::new(),
370 error_handler: None,
371 error_handler_mode: ErrorHandlerMode::None,
372 circuit_breaker_config: None,
373 security_policy_config: None,
374 security_authenticator: None,
375 concurrency: None,
376 route_id: None,
377 auto_startup: None,
378 startup_order: None,
379 }
380 }
381
382 pub fn filter<F>(self, predicate: F) -> FilterBuilder
386 where
387 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
388 {
389 FilterBuilder {
390 parent: self,
391 predicate: std::sync::Arc::new(predicate),
392 steps: vec![],
393 }
394 }
395
396 pub fn choice(self) -> ChoiceBuilder {
402 ChoiceBuilder {
403 parent: self,
404 whens: vec![],
405 _otherwise: None,
406 }
407 }
408
409 pub fn wire_tap(mut self, endpoint: &str) -> Self {
413 self.steps.push(BuilderStep::WireTap {
414 uri: endpoint.to_string(),
415 });
416 self
417 }
418
419 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
421 self.error_handler_mode = match self.error_handler_mode {
422 ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
423 ErrorHandlerMode::ExplicitConfig
424 }
425 ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
426 };
427 self.error_handler = Some(config);
428 self
429 }
430
431 pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
433 let uri = uri.into();
434 self.error_handler_mode = match self.error_handler_mode {
435 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
436 dlc_uri: Some(uri),
437 specs: Vec::new(),
438 },
439 ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
440 dlc_uri: Some(uri),
441 specs,
442 },
443 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
444 };
445 self
446 }
447
448 pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
450 where
451 F: Fn(&CamelError) -> bool + Send + Sync + 'static,
452 {
453 self.error_handler_mode = match self.error_handler_mode {
454 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
455 dlc_uri: None,
456 specs: Vec::new(),
457 },
458 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
459 shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
460 };
461
462 OnExceptionBuilder {
463 parent: self,
464 policy: OnExceptionSpec {
465 matches: std::sync::Arc::new(matches),
466 retry: None,
467 handled_by: None,
468 },
469 }
470 }
471
472 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
474 self.circuit_breaker_config = Some(config);
475 self
476 }
477
478 pub fn security_policy(
479 mut self,
480 config: camel_api::security_policy::SecurityPolicyConfig,
481 ) -> Self {
482 self.security_policy_config = Some(config);
483 self
484 }
485
486 pub fn security_authenticator(
487 mut self,
488 auth: std::sync::Arc<dyn camel_auth::TokenAuthenticator>,
489 ) -> Self {
490 self.security_authenticator = Some(auth);
491 self
492 }
493
494 pub fn concurrent(mut self, max: usize) -> Self {
508 let max = if max == 0 { None } else { Some(max) };
509 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
510 self
511 }
512
513 pub fn sequential(mut self) -> Self {
518 self.concurrency = Some(ConcurrencyModel::Sequential);
519 self
520 }
521
522 pub fn route_id(mut self, id: impl Into<String>) -> Self {
526 self.route_id = Some(id.into());
527 self
528 }
529
530 pub fn auto_startup(mut self, auto: bool) -> Self {
534 self.auto_startup = Some(auto);
535 self
536 }
537
538 pub fn startup_order(mut self, order: i32) -> Self {
542 self.startup_order = Some(order);
543 self
544 }
545
546 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
552 SplitBuilder {
553 parent: self,
554 config,
555 steps: Vec::new(),
556 }
557 }
558
559 pub fn multicast(self) -> MulticastBuilder {
565 MulticastBuilder {
566 parent: self,
567 steps: Vec::new(),
568 config: MulticastConfig::new(),
569 }
570 }
571
572 pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
579 ThrottleBuilder {
580 parent: self,
581 config: ThrottlerConfig::new(max_requests, period),
582 steps: Vec::new(),
583 }
584 }
585
586 pub fn loop_count(self, count: usize) -> LoopBuilder {
588 LoopBuilder {
589 parent: self,
590 config: LoopConfig {
591 mode: LoopMode::Count(count),
592 },
593 steps: vec![],
594 }
595 }
596
597 pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
599 where
600 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
601 {
602 LoopBuilder {
603 parent: self,
604 config: LoopConfig {
605 mode: LoopMode::While(std::sync::Arc::new(predicate)),
606 },
607 steps: vec![],
608 }
609 }
610
611 pub fn load_balance(self) -> LoadBalancerBuilder {
617 LoadBalancerBuilder {
618 parent: self,
619 config: LoadBalancerConfig::round_robin(),
620 steps: Vec::new(),
621 }
622 }
623
624 pub fn dynamic_router(self, expression: RouterExpression) -> Self {
640 self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
641 }
642
643 pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
647 self.steps.push(BuilderStep::DynamicRouter { config });
648 self
649 }
650
651 pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
652 self.routing_slip_with_config(RoutingSlipConfig::new(expression))
653 }
654
655 pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
656 self.steps.push(BuilderStep::RoutingSlip { config });
657 self
658 }
659
660 pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
661 self.recipient_list_with_config(RecipientListConfig::new(expression))
662 }
663
664 pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
665 self.steps.push(BuilderStep::RecipientList { config });
666 self
667 }
668
669 pub fn build(self) -> Result<RouteDefinition, CamelError> {
675 validate_uri(&self.from_uri)?;
676 let route_id = self
677 .route_id
678 .filter(|s| !s.trim().is_empty())
679 .ok_or_else(|| {
680 CamelError::RouteError(
681 "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
682 .to_string(),
683 )
684 })?;
685 let resolved_error_handler = match self.error_handler_mode {
686 ErrorHandlerMode::None => self.error_handler,
687 ErrorHandlerMode::ExplicitConfig => self.error_handler,
688 ErrorHandlerMode::Mixed => {
689 return Err(CamelError::RouteError(
690 "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
691 ));
692 }
693 ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
694 let mut config = if let Some(uri) = dlc_uri {
695 ErrorHandlerConfig::dead_letter_channel(uri)
696 } else {
697 ErrorHandlerConfig::log_only()
698 };
699
700 for spec in specs {
701 let matcher = spec.matches.clone();
702 let mut builder = config.on_exception(move |e| matcher(e));
703
704 if let Some(retry) = spec.retry {
705 builder = builder.retry(retry.max_attempts).with_backoff(
706 retry.initial_delay,
707 retry.multiplier,
708 retry.max_delay,
709 );
710 if retry.jitter_factor > 0.0 {
711 builder = builder.with_jitter(retry.jitter_factor);
712 }
713 }
714
715 if let Some(uri) = spec.handled_by {
716 builder = builder.handled_by(uri);
717 }
718
719 config = builder.build();
720 }
721
722 Some(config)
723 }
724 };
725
726 let definition = RouteDefinition::new(self.from_uri, self.steps);
727 let definition = if let Some(eh) = resolved_error_handler {
728 definition.with_error_handler(eh)
729 } else {
730 definition
731 };
732 let definition = if let Some(cb) = self.circuit_breaker_config {
733 definition.with_circuit_breaker(cb)
734 } else {
735 definition
736 };
737 let definition = if let Some(sp) = self.security_policy_config {
738 definition.with_security_policy(sp)
739 } else {
740 definition
741 };
742 let definition = if let Some(auth) = self.security_authenticator {
743 definition.with_security_authenticator(auth)
744 } else {
745 definition
746 };
747 let definition = if let Some(concurrency) = self.concurrency {
748 definition.with_concurrency(concurrency)
749 } else {
750 definition
751 };
752 let definition = definition.with_route_id(route_id);
753 let definition = if let Some(auto) = self.auto_startup {
754 definition.with_auto_startup(auto)
755 } else {
756 definition
757 };
758 let definition = if let Some(order) = self.startup_order {
759 definition.with_startup_order(order)
760 } else {
761 definition
762 };
763 Ok(definition)
764 }
765
766 pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
768 validate_uri(&self.from_uri)?;
769 let route_id = self
770 .route_id
771 .filter(|s| !s.trim().is_empty())
772 .ok_or_else(|| {
773 CamelError::RouteError(
774 "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
775 .to_string(),
776 )
777 })?;
778
779 let steps = canonicalize_steps(self.steps)?;
780 let circuit_breaker = self
781 .circuit_breaker_config
782 .map(canonicalize_circuit_breaker);
783
784 if self.security_policy_config.is_some() {
785 return Err(CamelError::RouteError(
786 "routes with security_policy cannot use the canonical/hot-reload path (not yet supported)"
787 .into(),
788 ));
789 }
790
791 let spec = CanonicalRouteSpec {
792 route_id,
793 from: self.from_uri,
794 steps,
795 circuit_breaker,
796 auto_startup: None,
797 startup_order: None,
798 concurrency: None,
799 version: camel_api::CANONICAL_CONTRACT_VERSION,
800 };
801 spec.validate_contract()?;
802 Ok(spec)
803 }
804}
805
806pub struct OnExceptionBuilder {
807 parent: RouteBuilder,
808 policy: OnExceptionSpec,
809}
810
811impl OnExceptionBuilder {
812 pub fn retry(mut self, max_attempts: u32) -> Self {
813 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
814 self
815 }
816
817 pub fn with_backoff(
818 mut self,
819 initial: std::time::Duration,
820 multiplier: f64,
821 max: std::time::Duration,
822 ) -> Self {
823 if let Some(ref mut retry) = self.policy.retry {
824 retry.initial_delay = initial;
825 retry.multiplier = multiplier;
826 retry.max_delay = max;
827 } else {
828 tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
829 }
830 self
831 }
832
833 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
834 if let Some(ref mut retry) = self.policy.retry {
835 retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
836 } else {
837 tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
838 }
839 self
840 }
841
842 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
843 self.policy.handled_by = Some(uri.into());
844 self
845 }
846
847 pub fn end_on_exception(mut self) -> RouteBuilder {
848 if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
849 specs.push(self.policy);
850 }
851 self.parent
852 }
853}
854
855fn validate_uri(uri: &str) -> Result<(), CamelError> {
857 let trimmed = uri.trim();
858 if trimmed.is_empty() {
859 return Err(CamelError::RouteError(
860 "route must have a 'from' URI".to_string(),
861 ));
862 }
863 if !trimmed.contains(':') {
864 return Err(CamelError::RouteError(
865 "URI must have a scheme (e.g. 'timer:tick')".to_string(),
866 ));
867 }
868 let scheme = trimmed.split(':').next().unwrap_or("");
869 if scheme.trim().is_empty() {
870 return Err(CamelError::RouteError(
871 "URI scheme must not be empty".to_string(),
872 ));
873 }
874 Ok(())
875}
876
877fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
878 let mut canonical = Vec::with_capacity(steps.len());
879 for step in steps {
880 canonical.push(canonicalize_step(step)?);
881 }
882 Ok(canonical)
883}
884
885fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
886 match step {
887 BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
888 BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
889 BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
890 BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
891 BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
892 delay_ms: config.delay_ms,
893 dynamic_header: config.dynamic_header,
894 }),
895 BuilderStep::DeclarativeScript { expression } => {
896 Ok(CanonicalStepSpec::Script { expression })
897 }
898 BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
899 predicate,
900 steps: canonicalize_steps(steps)?,
901 }),
902 BuilderStep::DeclarativeChoice { whens, otherwise } => {
903 let mut canonical_whens = Vec::with_capacity(whens.len());
904 for DeclarativeWhenStep { predicate, steps } in whens {
905 canonical_whens.push(CanonicalWhenSpec {
906 predicate,
907 steps: canonicalize_steps(steps)?,
908 });
909 }
910 let otherwise = match otherwise {
911 Some(steps) => Some(canonicalize_steps(steps)?),
912 None => None,
913 };
914 Ok(CanonicalStepSpec::Choice {
915 whens: canonical_whens,
916 otherwise,
917 })
918 }
919 BuilderStep::DeclarativeSplit {
920 expression,
921 aggregation,
922 parallel,
923 parallel_limit,
924 stop_on_exception,
925 steps,
926 } => Ok(CanonicalStepSpec::Split {
927 expression: CanonicalSplitExpressionSpec::Language(expression),
928 aggregation: canonicalize_split_aggregation(aggregation)?,
929 parallel,
930 parallel_limit,
931 stop_on_exception,
932 steps: canonicalize_steps(steps)?,
933 }),
934 BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate(
935 canonicalize_aggregate(config)?,
936 )),
937 other => {
938 let step_name = canonical_step_name(&other);
939 let detail = camel_api::canonical_contract_rejection_reason(step_name)
940 .unwrap_or("not included in canonical v1");
941 Err(CamelError::RouteError(format!(
942 "canonical v1 does not support step `{step_name}`: {detail}"
943 )))
944 }
945 }
946}
947
948fn canonicalize_split_aggregation(
949 strategy: camel_api::splitter::AggregationStrategy,
950) -> Result<CanonicalSplitAggregationSpec, CamelError> {
951 match strategy {
952 camel_api::splitter::AggregationStrategy::LastWins => {
953 Ok(CanonicalSplitAggregationSpec::LastWins)
954 }
955 camel_api::splitter::AggregationStrategy::CollectAll => {
956 Ok(CanonicalSplitAggregationSpec::CollectAll)
957 }
958 camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
959 "canonical v1 does not support custom split aggregation".to_string(),
960 )),
961 camel_api::splitter::AggregationStrategy::Original => {
962 Ok(CanonicalSplitAggregationSpec::Original)
963 }
964 }
965}
966
967fn extract_completion_fields(
968 mode: &CompletionMode,
969) -> Result<(Option<usize>, Option<u64>), CamelError> {
970 match mode {
971 CompletionMode::Single(cond) => match cond {
972 CompletionCondition::Size(n) => Ok((Some(*n), None)),
973 CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
974 CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
975 "canonical v1 does not support aggregate predicate completion".to_string(),
976 )),
977 },
978 CompletionMode::Any(conds) => {
979 let mut size = None;
980 let mut timeout_ms = None;
981 for cond in conds {
982 match cond {
983 CompletionCondition::Size(n) => size = Some(*n),
984 CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
985 CompletionCondition::Predicate(_) => {
986 return Err(CamelError::RouteError(
987 "canonical v1 does not support aggregate predicate completion"
988 .to_string(),
989 ));
990 }
991 }
992 }
993 Ok((size, timeout_ms))
994 }
995 }
996}
997
998fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
999 let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
1000
1001 let header = match &config.correlation {
1002 CorrelationStrategy::HeaderName(h) => h.clone(),
1003 CorrelationStrategy::Expression { expr, .. } => expr.clone(),
1004 CorrelationStrategy::Fn(_) => {
1005 return Err(CamelError::RouteError(
1006 "canonical v1 does not support Fn correlation strategy".to_string(),
1007 ));
1008 }
1009 };
1010
1011 let correlation_key = match &config.correlation {
1012 CorrelationStrategy::HeaderName(_) => None,
1013 CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
1014 CorrelationStrategy::Fn(_) => unreachable!(),
1015 };
1016
1017 let strategy = match config.strategy {
1018 AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
1019 AggregationStrategy::Custom(_) => {
1020 return Err(CamelError::RouteError(
1021 "canonical v1 does not support custom aggregate strategy".to_string(),
1022 ));
1023 }
1024 };
1025 let bucket_ttl_ms = config
1026 .bucket_ttl
1027 .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
1028
1029 Ok(CanonicalAggregateSpec {
1030 header,
1031 completion_size,
1032 completion_timeout_ms,
1033 correlation_key,
1034 force_completion_on_stop: if config.force_completion_on_stop {
1035 Some(true)
1036 } else {
1037 None
1038 },
1039 discard_on_timeout: if config.discard_on_timeout {
1040 Some(true)
1041 } else {
1042 None
1043 },
1044 strategy,
1045 max_buckets: config.max_buckets,
1046 bucket_ttl_ms,
1047 })
1048}
1049
1050fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
1051 CanonicalCircuitBreakerSpec {
1052 failure_threshold: config.failure_threshold,
1053 open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
1054 }
1055}
1056
1057fn canonical_step_name(step: &BuilderStep) -> &'static str {
1058 match step {
1059 BuilderStep::Processor(_) => "processor",
1060 BuilderStep::To(_) => "to",
1061 BuilderStep::Stop => "stop",
1062 BuilderStep::Log { .. } => "log",
1063 BuilderStep::DeclarativeSetHeader { .. } => "set_header",
1064 BuilderStep::DeclarativeSetBody { .. } => "set_body",
1065 BuilderStep::DeclarativeFilter { .. } => "filter",
1066 BuilderStep::DeclarativeChoice { .. } => "choice",
1067 BuilderStep::DeclarativeScript { .. } => "script",
1068 BuilderStep::DeclarativeFunction { .. } => "function",
1069 BuilderStep::DeclarativeSplit { .. } => "split",
1070 BuilderStep::Split { .. } => "split",
1071 BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
1072 BuilderStep::Aggregate { .. } => "aggregate",
1073 BuilderStep::Filter { .. } => "filter",
1074 BuilderStep::Choice { .. } => "choice",
1075 BuilderStep::WireTap { .. } => "wire_tap",
1076 BuilderStep::Delay { .. } => "delay",
1077 BuilderStep::Multicast { .. } => "multicast",
1078 BuilderStep::DeclarativeLog { .. } => "log",
1079 BuilderStep::Bean { .. } => "bean",
1080 BuilderStep::Script { .. } => "script",
1081 BuilderStep::Throttle { .. } => "throttle",
1082 BuilderStep::LoadBalance { .. } => "load_balancer",
1083 BuilderStep::DynamicRouter { .. } => "dynamic_router",
1084 BuilderStep::RoutingSlip { .. } => "routing_slip",
1085 BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
1086 BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
1087 BuilderStep::RecipientList { .. } => "recipient_list",
1088 BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
1089 BuilderStep::DeclarativeSetProperty { .. } => "set_property",
1090 BuilderStep::Enrich { .. } => "enrich",
1091 BuilderStep::PollEnrich { .. } => "poll_enrich",
1092 }
1093}
1094
1095impl StepAccumulator for RouteBuilder {
1096 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1097 &mut self.steps
1098 }
1099}
1100
1101pub struct SplitBuilder {
1109 parent: RouteBuilder,
1110 config: SplitterConfig,
1111 steps: Vec<BuilderStep>,
1112}
1113
1114impl SplitBuilder {
1115 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1117 where
1118 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1119 {
1120 FilterInSplitBuilder {
1121 parent: self,
1122 predicate: std::sync::Arc::new(predicate),
1123 steps: vec![],
1124 }
1125 }
1126
1127 pub fn end_split(mut self) -> RouteBuilder {
1130 let split_step = BuilderStep::Split {
1131 config: self.config,
1132 steps: self.steps,
1133 };
1134 self.parent.steps.push(split_step);
1135 self.parent
1136 }
1137}
1138
1139impl StepAccumulator for SplitBuilder {
1140 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1141 &mut self.steps
1142 }
1143}
1144
1145pub struct FilterBuilder {
1147 parent: RouteBuilder,
1148 predicate: FilterPredicate,
1149 steps: Vec<BuilderStep>,
1150}
1151
1152impl FilterBuilder {
1153 pub fn end_filter(mut self) -> RouteBuilder {
1156 let step = BuilderStep::Filter {
1157 predicate: self.predicate,
1158 steps: self.steps,
1159 };
1160 self.parent.steps.push(step);
1161 self.parent
1162 }
1163}
1164
1165impl StepAccumulator for FilterBuilder {
1166 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1167 &mut self.steps
1168 }
1169}
1170
1171pub struct FilterInSplitBuilder {
1173 parent: SplitBuilder,
1174 predicate: FilterPredicate,
1175 steps: Vec<BuilderStep>,
1176}
1177
1178impl FilterInSplitBuilder {
1179 pub fn end_filter(mut self) -> SplitBuilder {
1181 let step = BuilderStep::Filter {
1182 predicate: self.predicate,
1183 steps: self.steps,
1184 };
1185 self.parent.steps.push(step);
1186 self.parent
1187 }
1188}
1189
1190impl StepAccumulator for FilterInSplitBuilder {
1191 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1192 &mut self.steps
1193 }
1194}
1195
1196pub struct ChoiceBuilder {
1203 parent: RouteBuilder,
1204 whens: Vec<WhenStep>,
1205 _otherwise: Option<Vec<BuilderStep>>,
1206}
1207
1208impl ChoiceBuilder {
1209 pub fn when<F>(self, predicate: F) -> WhenBuilder
1212 where
1213 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1214 {
1215 WhenBuilder {
1216 parent: self,
1217 predicate: std::sync::Arc::new(predicate),
1218 steps: vec![],
1219 }
1220 }
1221
1222 pub fn otherwise(self) -> OtherwiseBuilder {
1226 OtherwiseBuilder {
1227 parent: self,
1228 steps: vec![],
1229 }
1230 }
1231
1232 pub fn end_choice(mut self) -> RouteBuilder {
1236 let step = BuilderStep::Choice {
1237 whens: self.whens,
1238 otherwise: self._otherwise,
1239 };
1240 self.parent.steps.push(step);
1241 self.parent
1242 }
1243}
1244
1245pub struct WhenBuilder {
1247 parent: ChoiceBuilder,
1248 predicate: camel_api::FilterPredicate,
1249 steps: Vec<BuilderStep>,
1250}
1251
1252impl WhenBuilder {
1253 pub fn end_when(mut self) -> ChoiceBuilder {
1256 self.parent.whens.push(WhenStep {
1257 predicate: self.predicate,
1258 steps: self.steps,
1259 });
1260 self.parent
1261 }
1262}
1263
1264impl StepAccumulator for WhenBuilder {
1265 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1266 &mut self.steps
1267 }
1268}
1269
1270pub struct OtherwiseBuilder {
1272 parent: ChoiceBuilder,
1273 steps: Vec<BuilderStep>,
1274}
1275
1276impl OtherwiseBuilder {
1277 pub fn end_otherwise(self) -> ChoiceBuilder {
1279 let OtherwiseBuilder { mut parent, steps } = self;
1280 parent._otherwise = Some(steps);
1281 parent
1282 }
1283}
1284
1285impl StepAccumulator for OtherwiseBuilder {
1286 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1287 &mut self.steps
1288 }
1289}
1290
1291pub struct MulticastBuilder {
1299 parent: RouteBuilder,
1300 steps: Vec<BuilderStep>,
1301 config: MulticastConfig,
1302}
1303
1304impl MulticastBuilder {
1305 pub fn parallel(mut self, parallel: bool) -> Self {
1306 self.config = self.config.parallel(parallel);
1307 self
1308 }
1309
1310 pub fn parallel_limit(mut self, limit: usize) -> Self {
1311 self.config = self.config.parallel_limit(limit);
1312 self
1313 }
1314
1315 pub fn stop_on_exception(mut self, stop: bool) -> Self {
1316 self.config = self.config.stop_on_exception(stop);
1317 self
1318 }
1319
1320 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1321 self.config = self.config.timeout(duration);
1322 self
1323 }
1324
1325 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1326 self.config = self.config.aggregation(strategy);
1327 self
1328 }
1329
1330 pub fn end_multicast(mut self) -> RouteBuilder {
1331 let step = BuilderStep::Multicast {
1332 steps: self.steps,
1333 config: self.config,
1334 };
1335 self.parent.steps.push(step);
1336 self.parent
1337 }
1338}
1339
1340impl StepAccumulator for MulticastBuilder {
1341 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1342 &mut self.steps
1343 }
1344}
1345
1346pub struct ThrottleBuilder {
1354 parent: RouteBuilder,
1355 config: ThrottlerConfig,
1356 steps: Vec<BuilderStep>,
1357}
1358
1359impl ThrottleBuilder {
1360 pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1366 self.config = self.config.strategy(strategy);
1367 self
1368 }
1369
1370 pub fn end_throttle(mut self) -> RouteBuilder {
1373 let step = BuilderStep::Throttle {
1374 config: self.config,
1375 steps: self.steps,
1376 };
1377 self.parent.steps.push(step);
1378 self.parent
1379 }
1380}
1381
1382impl StepAccumulator for ThrottleBuilder {
1383 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1384 &mut self.steps
1385 }
1386}
1387
1388pub struct LoopBuilder {
1390 parent: RouteBuilder,
1391 config: LoopConfig,
1392 steps: Vec<BuilderStep>,
1393}
1394
1395impl LoopBuilder {
1396 pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1397 LoopInLoopBuilder {
1398 parent: self,
1399 config: LoopConfig {
1400 mode: LoopMode::Count(count),
1401 },
1402 steps: vec![],
1403 }
1404 }
1405
1406 pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1407 where
1408 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1409 {
1410 LoopInLoopBuilder {
1411 parent: self,
1412 config: LoopConfig {
1413 mode: LoopMode::While(std::sync::Arc::new(predicate)),
1414 },
1415 steps: vec![],
1416 }
1417 }
1418
1419 pub fn end_loop(mut self) -> RouteBuilder {
1420 let step = BuilderStep::Loop {
1421 config: self.config,
1422 steps: self.steps,
1423 };
1424 self.parent.steps.push(step);
1425 self.parent
1426 }
1427}
1428
1429impl StepAccumulator for LoopBuilder {
1430 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1431 &mut self.steps
1432 }
1433}
1434
1435pub struct LoopInLoopBuilder {
1436 parent: LoopBuilder,
1437 config: LoopConfig,
1438 steps: Vec<BuilderStep>,
1439}
1440
1441impl LoopInLoopBuilder {
1442 pub fn end_loop(mut self) -> LoopBuilder {
1443 let step = BuilderStep::Loop {
1444 config: self.config,
1445 steps: self.steps,
1446 };
1447 self.parent.steps.push(step);
1448 self.parent
1449 }
1450}
1451
1452impl StepAccumulator for LoopInLoopBuilder {
1453 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1454 &mut self.steps
1455 }
1456}
1457
1458pub struct LoadBalancerBuilder {
1466 parent: RouteBuilder,
1467 config: LoadBalancerConfig,
1468 steps: Vec<BuilderStep>,
1469}
1470
1471impl LoadBalancerBuilder {
1472 pub fn round_robin(mut self) -> Self {
1474 self.config = LoadBalancerConfig::round_robin();
1475 self
1476 }
1477
1478 pub fn random(mut self) -> Self {
1480 self.config = LoadBalancerConfig::random();
1481 self
1482 }
1483
1484 pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1489 self.config = LoadBalancerConfig::weighted(weights);
1490 self
1491 }
1492
1493 pub fn failover(mut self) -> Self {
1498 self.config = LoadBalancerConfig::failover();
1499 self
1500 }
1501
1502 pub fn parallel(mut self, parallel: bool) -> Self {
1507 self.config = self.config.parallel(parallel);
1508 self
1509 }
1510
1511 pub fn end_load_balance(mut self) -> RouteBuilder {
1514 let step = BuilderStep::LoadBalance {
1515 config: self.config,
1516 steps: self.steps,
1517 };
1518 self.parent.steps.push(step);
1519 self.parent
1520 }
1521}
1522
1523impl StepAccumulator for LoadBalancerBuilder {
1524 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1525 &mut self.steps
1526 }
1527}
1528
1529#[cfg(test)]
1534mod tests {
1535 use super::*;
1536 use camel_api::error_handler::ErrorHandlerConfig;
1537 use camel_api::load_balancer::LoadBalanceStrategy;
1538 use camel_api::{Exchange, Message};
1539 use camel_core::route::BuilderStep;
1540 use std::sync::Arc;
1541 use std::time::Duration;
1542 use tower::{Service, ServiceExt};
1543
1544 #[test]
1545 fn test_builder_from_creates_definition() {
1546 let definition = RouteBuilder::from("timer:tick")
1547 .route_id("test-route")
1548 .build()
1549 .unwrap();
1550 assert_eq!(definition.from_uri(), "timer:tick");
1551 }
1552
1553 #[test]
1554 fn test_builder_empty_from_uri_errors() {
1555 let result = RouteBuilder::from("").route_id("test-route").build();
1556 assert!(result.is_err());
1557 }
1558
1559 #[test]
1560 fn test_build_rejects_schemeless_uri() {
1561 let result = RouteBuilder::from("no-scheme-here")
1562 .route_id("test-route")
1563 .build();
1564 match result {
1565 Err(err) => {
1566 let err_msg = format!("{err}");
1567 assert!(
1568 err_msg.contains("scheme"),
1569 "expected scheme-related error, got: {err_msg}"
1570 );
1571 }
1572 Ok(_) => panic!("schemeless URI should fail"),
1573 }
1574 }
1575
1576 #[test]
1577 fn test_build_rejects_empty_scheme_uri() {
1578 let result = RouteBuilder::from(":missing-scheme")
1579 .route_id("test-route")
1580 .build();
1581 match result {
1582 Err(err) => {
1583 let err_msg = format!("{err}");
1584 assert!(
1585 err_msg.contains("scheme"),
1586 "expected scheme-related error, got: {err_msg}"
1587 );
1588 }
1589 Ok(_) => panic!("empty-scheme URI should fail"),
1590 }
1591 }
1592
1593 #[test]
1594 fn test_build_accepts_valid_uri() {
1595 let result = RouteBuilder::from("timer:tick")
1596 .route_id("test-route")
1597 .build();
1598 assert!(result.is_ok());
1599 }
1600
1601 #[test]
1602 fn test_build_canonical_rejects_schemeless_uri() {
1603 let result = RouteBuilder::from("no-scheme-here")
1604 .route_id("test-route")
1605 .build_canonical();
1606 assert!(result.is_err());
1607 }
1608
1609 #[test]
1610 fn test_builder_to_adds_step() {
1611 let definition = RouteBuilder::from("timer:tick")
1612 .route_id("test-route")
1613 .to("log:info")
1614 .build()
1615 .unwrap();
1616
1617 assert_eq!(definition.from_uri(), "timer:tick");
1618 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1620 }
1621
1622 #[test]
1623 fn test_builder_filter_adds_filter_step() {
1624 let definition = RouteBuilder::from("timer:tick")
1625 .route_id("test-route")
1626 .filter(|_ex| true)
1627 .to("mock:result")
1628 .end_filter()
1629 .build()
1630 .unwrap();
1631
1632 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1633 }
1634
1635 #[test]
1636 fn test_builder_set_header_adds_processor_step() {
1637 let definition = RouteBuilder::from("timer:tick")
1638 .route_id("test-route")
1639 .set_header("key", Value::String("value".into()))
1640 .build()
1641 .unwrap();
1642
1643 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1644 }
1645
1646 #[test]
1647 fn test_builder_map_body_adds_processor_step() {
1648 let definition = RouteBuilder::from("timer:tick")
1649 .route_id("test-route")
1650 .map_body(|body| body)
1651 .build()
1652 .unwrap();
1653
1654 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1655 }
1656
1657 #[test]
1658 fn test_builder_process_adds_processor_step() {
1659 let definition = RouteBuilder::from("timer:tick")
1660 .route_id("test-route")
1661 .process(|ex| async move { Ok(ex) })
1662 .build()
1663 .unwrap();
1664
1665 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1666 }
1667
1668 #[test]
1669 fn test_builder_chain_multiple_steps() {
1670 let definition = RouteBuilder::from("timer:tick")
1671 .route_id("test-route")
1672 .set_header("source", Value::String("timer".into()))
1673 .filter(|ex| ex.input.header("source").is_some())
1674 .to("log:info")
1675 .end_filter()
1676 .to("mock:result")
1677 .build()
1678 .unwrap();
1679
1680 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1684 }
1685
1686 #[test]
1687 fn test_loop_count_builder() {
1688 use camel_api::loop_eip::LoopMode;
1689
1690 let def = RouteBuilder::from("direct:start")
1691 .route_id("loop-test")
1692 .loop_count(3)
1693 .to("mock:inside")
1694 .end_loop()
1695 .to("mock:after")
1696 .build()
1697 .unwrap();
1698
1699 assert_eq!(def.steps().len(), 2);
1700 match &def.steps()[0] {
1701 BuilderStep::Loop { config, steps } => {
1702 assert!(matches!(config.mode, LoopMode::Count(3)));
1703 assert_eq!(steps.len(), 1);
1704 }
1705 other => panic!("Expected Loop, got {:?}", other),
1706 }
1707 assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1708 }
1709
1710 #[test]
1711 fn test_loop_while_builder() {
1712 use camel_api::loop_eip::LoopMode;
1713
1714 let def = RouteBuilder::from("direct:start")
1715 .route_id("loop-while-test")
1716 .loop_while(|_ex| true)
1717 .to("mock:retry")
1718 .end_loop()
1719 .build()
1720 .unwrap();
1721
1722 assert_eq!(def.steps().len(), 1);
1723 match &def.steps()[0] {
1724 BuilderStep::Loop { config, steps } => {
1725 assert!(matches!(config.mode, LoopMode::While(_)));
1726 assert_eq!(steps.len(), 1);
1727 }
1728 other => panic!("Expected Loop, got {:?}", other),
1729 }
1730 }
1731
1732 #[test]
1733 fn test_nested_loop_builder() {
1734 use camel_api::loop_eip::LoopMode;
1735
1736 let def = RouteBuilder::from("direct:start")
1737 .route_id("nested-loop-test")
1738 .loop_count(2)
1739 .to("mock:outer")
1740 .loop_count(3)
1741 .to("mock:inner")
1742 .end_loop()
1743 .end_loop()
1744 .to("mock:after")
1745 .build()
1746 .unwrap();
1747
1748 assert_eq!(def.steps().len(), 2);
1749 match &def.steps()[0] {
1750 BuilderStep::Loop { steps, .. } => {
1751 assert_eq!(steps.len(), 2);
1752 match &steps[1] {
1753 BuilderStep::Loop {
1754 config,
1755 steps: inner_steps,
1756 } => {
1757 assert!(matches!(config.mode, LoopMode::Count(3)));
1758 assert_eq!(inner_steps.len(), 1);
1759 }
1760 other => panic!("Expected nested Loop, got {:?}", other),
1761 }
1762 }
1763 other => panic!("Expected outer Loop, got {:?}", other),
1764 }
1765 }
1766
1767 #[tokio::test]
1772 async fn test_set_header_processor_works() {
1773 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1774 let exchange = Exchange::new(Message::new("test"));
1775 let result = svc.call(exchange).await.unwrap();
1776 assert_eq!(
1777 result.input.header("greeting"),
1778 Some(&Value::String("hello".into()))
1779 );
1780 }
1781
1782 #[tokio::test]
1783 async fn test_filter_processor_passes() {
1784 use camel_api::BoxProcessorExt;
1785 use camel_processor::FilterService;
1786
1787 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1788 let mut svc =
1789 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1790 let exchange = Exchange::new(Message::new("pass"));
1791 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1792 assert_eq!(result.input.body.as_text(), Some("pass"));
1793 }
1794
1795 #[tokio::test]
1796 async fn test_filter_processor_blocks() {
1797 use camel_api::BoxProcessorExt;
1798 use camel_processor::FilterService;
1799
1800 let sub = BoxProcessor::from_fn(|_ex| {
1801 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1802 });
1803 let mut svc =
1804 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1805 let exchange = Exchange::new(Message::new("reject"));
1806 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1807 assert_eq!(result.input.body.as_text(), Some("reject"));
1808 }
1809
1810 #[tokio::test]
1811 async fn test_map_body_processor_works() {
1812 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1813 if let Some(text) = body.as_text() {
1814 Body::Text(text.to_uppercase())
1815 } else {
1816 body
1817 }
1818 });
1819 let exchange = Exchange::new(Message::new("hello"));
1820 let result = mapper.oneshot(exchange).await.unwrap();
1821 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1822 }
1823
1824 #[tokio::test]
1825 async fn test_process_custom_processor_works() {
1826 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1827 ex.set_property("custom", Value::Bool(true));
1828 Ok(ex)
1829 });
1830 let exchange = Exchange::new(Message::default());
1831 let result = processor.oneshot(exchange).await.unwrap();
1832 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1833 }
1834
1835 #[tokio::test]
1840 async fn test_compose_pipeline_runs_steps_in_order() {
1841 use camel_core::route::compose_pipeline;
1842
1843 let processors = vec![
1844 BoxProcessor::new(SetHeader::new(
1845 IdentityProcessor,
1846 "step",
1847 Value::String("one".into()),
1848 )),
1849 BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1850 if let Some(text) = body.as_text() {
1851 Body::Text(format!("{}-processed", text))
1852 } else {
1853 body
1854 }
1855 })),
1856 ];
1857
1858 let pipeline = compose_pipeline(processors);
1859 let exchange = Exchange::new(Message::new("hello"));
1860 let result = pipeline.oneshot(exchange).await.unwrap();
1861
1862 assert_eq!(
1863 result.input.header("step"),
1864 Some(&Value::String("one".into()))
1865 );
1866 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1867 }
1868
1869 #[tokio::test]
1870 async fn test_compose_pipeline_empty_is_identity() {
1871 use camel_core::route::compose_pipeline;
1872
1873 let pipeline = compose_pipeline(vec![]);
1874 let exchange = Exchange::new(Message::new("unchanged"));
1875 let result = pipeline.oneshot(exchange).await.unwrap();
1876 assert_eq!(result.input.body.as_text(), Some("unchanged"));
1877 }
1878
1879 #[test]
1884 fn test_builder_circuit_breaker_sets_config() {
1885 use camel_api::circuit_breaker::CircuitBreakerConfig;
1886
1887 let config = CircuitBreakerConfig::new().failure_threshold(5);
1888 let definition = RouteBuilder::from("timer:tick")
1889 .route_id("test-route")
1890 .circuit_breaker(config)
1891 .build()
1892 .unwrap();
1893
1894 let cb = definition
1895 .circuit_breaker_config()
1896 .expect("circuit breaker should be set");
1897 assert_eq!(cb.failure_threshold, 5);
1898 }
1899
1900 #[test]
1901 fn test_builder_circuit_breaker_with_error_handler() {
1902 use camel_api::circuit_breaker::CircuitBreakerConfig;
1903 use camel_api::error_handler::ErrorHandlerConfig;
1904
1905 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1906 let eh_config = ErrorHandlerConfig::log_only();
1907
1908 let definition = RouteBuilder::from("timer:tick")
1909 .route_id("test-route")
1910 .to("log:info")
1911 .circuit_breaker(cb_config)
1912 .error_handler(eh_config)
1913 .build()
1914 .unwrap();
1915
1916 assert!(
1917 definition.circuit_breaker_config().is_some(),
1918 "circuit breaker config should be set"
1919 );
1920 }
1922
1923 #[test]
1924 fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1925 let definition = RouteBuilder::from("direct:start")
1926 .route_id("test-route")
1927 .dead_letter_channel("log:dlc")
1928 .on_exception(|e| matches!(e, CamelError::Io(_)))
1929 .retry(3)
1930 .handled_by("log:io")
1931 .end_on_exception()
1932 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1933 .retry(1)
1934 .end_on_exception()
1935 .to("mock:out")
1936 .build()
1937 .expect("route should build");
1938
1939 let cfg = definition
1940 .error_handler_config()
1941 .expect("error handler should be set");
1942 assert_eq!(cfg.policies.len(), 2);
1943 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1944 assert_eq!(
1945 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1946 Some(3)
1947 );
1948 assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1949 assert_eq!(
1950 cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1951 Some(1)
1952 );
1953 }
1954
1955 #[test]
1956 fn test_builder_on_exception_mixed_mode_rejected() {
1957 let result = RouteBuilder::from("direct:start")
1958 .route_id("test-route")
1959 .error_handler(ErrorHandlerConfig::log_only())
1960 .on_exception(|_e| true)
1961 .end_on_exception()
1962 .to("mock:out")
1963 .build();
1964
1965 let err = result.err().expect("mixed mode should fail with an error");
1966
1967 assert!(
1968 format!("{err}").contains("mixed error handler modes"),
1969 "unexpected error: {err}"
1970 );
1971 }
1972
1973 #[test]
1974 fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1975 let definition = RouteBuilder::from("direct:start")
1976 .route_id("test-route")
1977 .on_exception(|_e| true)
1978 .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1979 .with_jitter(0.5)
1980 .end_on_exception()
1981 .to("mock:out")
1982 .build()
1983 .expect("route should build");
1984
1985 let cfg = definition
1986 .error_handler_config()
1987 .expect("error handler should be set");
1988 assert_eq!(cfg.policies.len(), 1);
1989 assert!(cfg.policies[0].retry.is_none());
1990 }
1991
1992 #[test]
1993 fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1994 let definition = RouteBuilder::from("direct:start")
1995 .route_id("test-route")
1996 .dead_letter_channel("log:dlc")
1997 .to("mock:out")
1998 .build()
1999 .expect("route should build");
2000
2001 let cfg = definition
2002 .error_handler_config()
2003 .expect("error handler should be set");
2004 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
2005 assert!(cfg.policies.is_empty());
2006 }
2007
2008 #[test]
2009 fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
2010 let definition = RouteBuilder::from("direct:start")
2011 .route_id("test-route")
2012 .dead_letter_channel("log:first")
2013 .on_exception(|e| matches!(e, CamelError::Io(_)))
2014 .retry(2)
2015 .end_on_exception()
2016 .dead_letter_channel("log:second")
2017 .to("mock:out")
2018 .build()
2019 .expect("route should build");
2020
2021 let cfg = definition
2022 .error_handler_config()
2023 .expect("error handler should be set");
2024 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2025 assert_eq!(cfg.policies.len(), 1);
2026 assert_eq!(
2027 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
2028 Some(2)
2029 );
2030 }
2031
2032 #[test]
2033 fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
2034 let definition = RouteBuilder::from("direct:start")
2035 .route_id("test-route")
2036 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
2037 .retry(1)
2038 .end_on_exception()
2039 .to("mock:out")
2040 .build()
2041 .expect("route should build");
2042
2043 let cfg = definition
2044 .error_handler_config()
2045 .expect("error handler should be set");
2046 assert!(cfg.dlc_uri.is_none());
2047 assert_eq!(cfg.policies.len(), 1);
2048 }
2049
2050 #[test]
2051 fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
2052 let first = ErrorHandlerConfig::dead_letter_channel("log:first");
2053 let second = ErrorHandlerConfig::dead_letter_channel("log:second");
2054
2055 let definition = RouteBuilder::from("direct:start")
2056 .route_id("test-route")
2057 .error_handler(first)
2058 .error_handler(second)
2059 .to("mock:out")
2060 .build()
2061 .expect("route should build");
2062
2063 let cfg = definition
2064 .error_handler_config()
2065 .expect("error handler should be set");
2066 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2067 }
2068
2069 #[test]
2072 fn test_split_builder_typestate() {
2073 use camel_api::splitter::{SplitterConfig, split_body_lines};
2074
2075 let definition = RouteBuilder::from("timer:test?period=1000")
2077 .route_id("test-route")
2078 .split(SplitterConfig::new(split_body_lines()))
2079 .to("mock:per-fragment")
2080 .end_split()
2081 .to("mock:final")
2082 .build()
2083 .unwrap();
2084
2085 assert_eq!(definition.steps().len(), 2);
2087 }
2088
2089 #[test]
2090 fn test_split_builder_steps_collected() {
2091 use camel_api::splitter::{SplitterConfig, split_body_lines};
2092
2093 let definition = RouteBuilder::from("timer:test?period=1000")
2094 .route_id("test-route")
2095 .split(SplitterConfig::new(split_body_lines()))
2096 .set_header("fragment", Value::String("yes".into()))
2097 .to("mock:per-fragment")
2098 .end_split()
2099 .build()
2100 .unwrap();
2101
2102 assert_eq!(definition.steps().len(), 1);
2104 match &definition.steps()[0] {
2105 BuilderStep::Split { steps, .. } => {
2106 assert_eq!(steps.len(), 2); }
2108 other => panic!("Expected Split, got {:?}", other),
2109 }
2110 }
2111
2112 #[test]
2113 fn test_split_builder_config_propagated() {
2114 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
2115
2116 let definition = RouteBuilder::from("timer:test?period=1000")
2117 .route_id("test-route")
2118 .split(
2119 SplitterConfig::new(split_body_lines())
2120 .parallel(true)
2121 .parallel_limit(4)
2122 .aggregation(AggregationStrategy::CollectAll),
2123 )
2124 .to("mock:per-fragment")
2125 .end_split()
2126 .build()
2127 .unwrap();
2128
2129 match &definition.steps()[0] {
2130 BuilderStep::Split { config, .. } => {
2131 assert!(config.parallel);
2132 assert_eq!(config.parallel_limit, Some(4));
2133 assert!(matches!(
2134 config.aggregation,
2135 AggregationStrategy::CollectAll
2136 ));
2137 }
2138 other => panic!("Expected Split, got {:?}", other),
2139 }
2140 }
2141
2142 #[test]
2143 fn test_aggregate_builder_adds_step() {
2144 use camel_api::aggregator::AggregatorConfig;
2145 use camel_core::route::BuilderStep;
2146
2147 let definition = RouteBuilder::from("timer:tick")
2148 .route_id("test-route")
2149 .aggregate(
2150 AggregatorConfig::correlate_by("key")
2151 .complete_when_size(2)
2152 .build()
2153 .unwrap(),
2154 )
2155 .build()
2156 .unwrap();
2157
2158 assert_eq!(definition.steps().len(), 1);
2159 assert!(matches!(
2160 definition.steps()[0],
2161 BuilderStep::Aggregate { .. }
2162 ));
2163 }
2164
2165 #[test]
2166 fn test_aggregate_in_split_builder() {
2167 use camel_api::aggregator::AggregatorConfig;
2168 use camel_api::splitter::{SplitterConfig, split_body_lines};
2169 use camel_core::route::BuilderStep;
2170
2171 let definition = RouteBuilder::from("timer:tick")
2172 .route_id("test-route")
2173 .split(SplitterConfig::new(split_body_lines()))
2174 .aggregate(
2175 AggregatorConfig::correlate_by("key")
2176 .complete_when_size(1)
2177 .build()
2178 .unwrap(),
2179 )
2180 .end_split()
2181 .build()
2182 .unwrap();
2183
2184 assert_eq!(definition.steps().len(), 1);
2185 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2186 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2187 } else {
2188 panic!("expected Split step");
2189 }
2190 }
2191
2192 #[test]
2195 fn test_builder_set_body_static_adds_processor() {
2196 let definition = RouteBuilder::from("timer:tick")
2197 .route_id("test-route")
2198 .set_body("fixed")
2199 .build()
2200 .unwrap();
2201 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2202 }
2203
2204 #[test]
2205 fn test_builder_set_body_fn_adds_processor() {
2206 let definition = RouteBuilder::from("timer:tick")
2207 .route_id("test-route")
2208 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2209 .build()
2210 .unwrap();
2211 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2212 }
2213
2214 #[test]
2215 fn transform_alias_produces_same_as_set_body() {
2216 let route_transform = RouteBuilder::from("timer:tick")
2217 .route_id("test-route")
2218 .transform("hello")
2219 .build()
2220 .unwrap();
2221
2222 let route_set_body = RouteBuilder::from("timer:tick")
2223 .route_id("test-route")
2224 .set_body("hello")
2225 .build()
2226 .unwrap();
2227
2228 assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2229 }
2230
2231 #[test]
2232 fn test_builder_set_header_fn_adds_processor() {
2233 let definition = RouteBuilder::from("timer:tick")
2234 .route_id("test-route")
2235 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2236 .build()
2237 .unwrap();
2238 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2239 }
2240
2241 #[tokio::test]
2242 async fn test_set_body_static_processor_works() {
2243 use camel_core::route::compose_pipeline;
2244 let def = RouteBuilder::from("t:t")
2245 .route_id("test-route")
2246 .set_body("replaced")
2247 .build()
2248 .unwrap();
2249 let pipeline = compose_pipeline(
2250 def.steps()
2251 .iter()
2252 .filter_map(|s| {
2253 if let BuilderStep::Processor(p) = s {
2254 Some(p.clone())
2255 } else {
2256 None
2257 }
2258 })
2259 .collect(),
2260 );
2261 let exchange = Exchange::new(Message::new("original"));
2262 let result = pipeline.oneshot(exchange).await.unwrap();
2263 assert_eq!(result.input.body.as_text(), Some("replaced"));
2264 }
2265
2266 #[tokio::test]
2267 async fn test_set_body_fn_processor_works() {
2268 use camel_core::route::compose_pipeline;
2269 let def = RouteBuilder::from("t:t")
2270 .route_id("test-route")
2271 .set_body_fn(|ex: &Exchange| {
2272 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2273 })
2274 .build()
2275 .unwrap();
2276 let pipeline = compose_pipeline(
2277 def.steps()
2278 .iter()
2279 .filter_map(|s| {
2280 if let BuilderStep::Processor(p) = s {
2281 Some(p.clone())
2282 } else {
2283 None
2284 }
2285 })
2286 .collect(),
2287 );
2288 let exchange = Exchange::new(Message::new("hello"));
2289 let result = pipeline.oneshot(exchange).await.unwrap();
2290 assert_eq!(result.input.body.as_text(), Some("HELLO"));
2291 }
2292
2293 #[tokio::test]
2294 async fn test_set_header_fn_processor_works() {
2295 use camel_core::route::compose_pipeline;
2296 let def = RouteBuilder::from("t:t")
2297 .route_id("test-route")
2298 .set_header_fn("echo", |ex: &Exchange| {
2299 ex.input
2300 .body
2301 .as_text()
2302 .map(|t| Value::String(t.into()))
2303 .unwrap_or(Value::Null)
2304 })
2305 .build()
2306 .unwrap();
2307 let pipeline = compose_pipeline(
2308 def.steps()
2309 .iter()
2310 .filter_map(|s| {
2311 if let BuilderStep::Processor(p) = s {
2312 Some(p.clone())
2313 } else {
2314 None
2315 }
2316 })
2317 .collect(),
2318 );
2319 let exchange = Exchange::new(Message::new("ping"));
2320 let result = pipeline.oneshot(exchange).await.unwrap();
2321 assert_eq!(
2322 result.input.header("echo"),
2323 Some(&Value::String("ping".into()))
2324 );
2325 }
2326
2327 #[test]
2330 fn test_filter_builder_typestate() {
2331 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2332 .route_id("test-route")
2333 .filter(|_ex| true)
2334 .to("mock:inner")
2335 .end_filter()
2336 .to("mock:outer")
2337 .build();
2338 assert!(result.is_ok());
2339 }
2340
2341 #[test]
2342 fn test_filter_builder_steps_collected() {
2343 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2344 .route_id("test-route")
2345 .filter(|_ex| true)
2346 .to("mock:inner")
2347 .end_filter()
2348 .build()
2349 .unwrap();
2350
2351 assert_eq!(definition.steps().len(), 1);
2352 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2353 }
2354
2355 #[test]
2356 fn test_wire_tap_builder_adds_step() {
2357 let definition = RouteBuilder::from("timer:tick")
2358 .route_id("test-route")
2359 .wire_tap("mock:tap")
2360 .to("mock:result")
2361 .build()
2362 .unwrap();
2363
2364 assert_eq!(definition.steps().len(), 2);
2365 assert!(
2366 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2367 );
2368 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2369 }
2370
2371 #[test]
2374 fn test_multicast_builder_typestate() {
2375 let definition = RouteBuilder::from("timer:tick")
2376 .route_id("test-route")
2377 .multicast()
2378 .to("direct:a")
2379 .to("direct:b")
2380 .end_multicast()
2381 .to("mock:result")
2382 .build()
2383 .unwrap();
2384
2385 assert_eq!(definition.steps().len(), 2); }
2387
2388 #[test]
2389 fn test_multicast_builder_steps_collected() {
2390 let definition = RouteBuilder::from("timer:tick")
2391 .route_id("test-route")
2392 .multicast()
2393 .to("direct:a")
2394 .to("direct:b")
2395 .end_multicast()
2396 .build()
2397 .unwrap();
2398
2399 match &definition.steps()[0] {
2400 BuilderStep::Multicast { steps, .. } => {
2401 assert_eq!(steps.len(), 2);
2402 }
2403 other => panic!("Expected Multicast, got {:?}", other),
2404 }
2405 }
2406
2407 #[test]
2410 fn test_builder_concurrent_sets_concurrency() {
2411 use camel_component_api::ConcurrencyModel;
2412
2413 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2414 .route_id("test-route")
2415 .concurrent(16)
2416 .to("log:info")
2417 .build()
2418 .unwrap();
2419
2420 assert_eq!(
2421 definition.concurrency_override(),
2422 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2423 );
2424 }
2425
2426 #[test]
2427 fn test_builder_concurrent_zero_means_unbounded() {
2428 use camel_component_api::ConcurrencyModel;
2429
2430 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2431 .route_id("test-route")
2432 .concurrent(0)
2433 .to("log:info")
2434 .build()
2435 .unwrap();
2436
2437 assert_eq!(
2438 definition.concurrency_override(),
2439 Some(&ConcurrencyModel::Concurrent { max: None })
2440 );
2441 }
2442
2443 #[test]
2444 fn test_builder_sequential_sets_concurrency() {
2445 use camel_component_api::ConcurrencyModel;
2446
2447 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2448 .route_id("test-route")
2449 .sequential()
2450 .to("log:info")
2451 .build()
2452 .unwrap();
2453
2454 assert_eq!(
2455 definition.concurrency_override(),
2456 Some(&ConcurrencyModel::Sequential)
2457 );
2458 }
2459
2460 #[test]
2461 fn test_builder_default_concurrency_is_none() {
2462 let definition = RouteBuilder::from("timer:tick")
2463 .route_id("test-route")
2464 .to("log:info")
2465 .build()
2466 .unwrap();
2467
2468 assert_eq!(definition.concurrency_override(), None);
2469 }
2470
2471 #[test]
2474 fn test_builder_route_id_sets_id() {
2475 let definition = RouteBuilder::from("timer:tick")
2476 .route_id("my-route")
2477 .build()
2478 .unwrap();
2479
2480 assert_eq!(definition.route_id(), "my-route");
2481 }
2482
2483 #[test]
2484 fn test_build_without_route_id_fails() {
2485 let result = RouteBuilder::from("timer:tick?period=1000")
2486 .to("log:info")
2487 .build();
2488 let err = match result {
2489 Err(e) => e.to_string(),
2490 Ok(_) => panic!("build() should fail without route_id"),
2491 };
2492 assert!(
2493 err.contains("route_id"),
2494 "error should mention route_id, got: {}",
2495 err
2496 );
2497 }
2498
2499 #[test]
2500 fn test_builder_empty_route_id_rejected() {
2501 let result = RouteBuilder::from("timer:tick").route_id("").build();
2502 let err = result.err().expect("empty route_id should be rejected");
2503 assert!(matches!(err, CamelError::RouteError(_)));
2504 }
2505
2506 #[test]
2507 fn test_builder_whitespace_route_id_rejected() {
2508 let result = RouteBuilder::from("timer:tick").route_id(" ").build();
2509 assert!(result.is_err());
2510 }
2511
2512 #[test]
2513 fn test_builder_auto_startup_false() {
2514 let definition = RouteBuilder::from("timer:tick")
2515 .route_id("test-route")
2516 .auto_startup(false)
2517 .build()
2518 .unwrap();
2519
2520 assert!(!definition.auto_startup());
2521 }
2522
2523 #[test]
2524 fn test_builder_startup_order_custom() {
2525 let definition = RouteBuilder::from("timer:tick")
2526 .route_id("test-route")
2527 .startup_order(50)
2528 .build()
2529 .unwrap();
2530
2531 assert_eq!(definition.startup_order(), 50);
2532 }
2533
2534 #[test]
2535 fn test_builder_defaults() {
2536 let definition = RouteBuilder::from("timer:tick")
2537 .route_id("test-route")
2538 .build()
2539 .unwrap();
2540
2541 assert_eq!(definition.route_id(), "test-route");
2542 assert!(definition.auto_startup());
2543 assert_eq!(definition.startup_order(), 1000);
2544 }
2545
2546 #[test]
2549 fn test_choice_builder_single_when() {
2550 let definition = RouteBuilder::from("timer:tick")
2551 .route_id("test-route")
2552 .choice()
2553 .when(|ex: &Exchange| ex.input.header("type").is_some())
2554 .to("mock:typed")
2555 .end_when()
2556 .end_choice()
2557 .build()
2558 .unwrap();
2559 assert_eq!(definition.steps().len(), 1);
2560 assert!(
2561 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2562 if whens.len() == 1 && otherwise.is_none())
2563 );
2564 }
2565
2566 #[test]
2567 fn test_choice_builder_when_otherwise() {
2568 let definition = RouteBuilder::from("timer:tick")
2569 .route_id("test-route")
2570 .choice()
2571 .when(|ex: &Exchange| ex.input.header("a").is_some())
2572 .to("mock:a")
2573 .end_when()
2574 .otherwise()
2575 .to("mock:fallback")
2576 .end_otherwise()
2577 .end_choice()
2578 .build()
2579 .unwrap();
2580 assert!(
2581 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2582 if whens.len() == 1 && otherwise.is_some())
2583 );
2584 }
2585
2586 #[test]
2587 fn test_choice_builder_multiple_whens() {
2588 let definition = RouteBuilder::from("timer:tick")
2589 .route_id("test-route")
2590 .choice()
2591 .when(|ex: &Exchange| ex.input.header("a").is_some())
2592 .to("mock:a")
2593 .end_when()
2594 .when(|ex: &Exchange| ex.input.header("b").is_some())
2595 .to("mock:b")
2596 .end_when()
2597 .end_choice()
2598 .build()
2599 .unwrap();
2600 assert!(
2601 matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2602 if whens.len() == 2)
2603 );
2604 }
2605
2606 #[test]
2607 fn test_choice_step_after_choice() {
2608 let definition = RouteBuilder::from("timer:tick")
2610 .route_id("test-route")
2611 .choice()
2612 .when(|_ex: &Exchange| true)
2613 .to("mock:inner")
2614 .end_when()
2615 .end_choice()
2616 .to("mock:outer") .build()
2618 .unwrap();
2619 assert_eq!(definition.steps().len(), 2);
2620 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2621 }
2622
2623 #[test]
2626 fn test_throttle_builder_typestate() {
2627 let definition = RouteBuilder::from("timer:tick")
2628 .route_id("test-route")
2629 .throttle(10, std::time::Duration::from_secs(1))
2630 .to("mock:result")
2631 .end_throttle()
2632 .build()
2633 .unwrap();
2634
2635 assert_eq!(definition.steps().len(), 1);
2636 assert!(matches!(
2637 &definition.steps()[0],
2638 BuilderStep::Throttle { .. }
2639 ));
2640 }
2641
2642 #[test]
2643 fn test_throttle_builder_with_strategy() {
2644 let definition = RouteBuilder::from("timer:tick")
2645 .route_id("test-route")
2646 .throttle(10, std::time::Duration::from_secs(1))
2647 .strategy(ThrottleStrategy::Reject)
2648 .to("mock:result")
2649 .end_throttle()
2650 .build()
2651 .unwrap();
2652
2653 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2654 assert_eq!(config.strategy, ThrottleStrategy::Reject);
2655 } else {
2656 panic!("Expected Throttle step");
2657 }
2658 }
2659
2660 #[test]
2661 fn test_throttle_builder_steps_collected() {
2662 let definition = RouteBuilder::from("timer:tick")
2663 .route_id("test-route")
2664 .throttle(5, std::time::Duration::from_secs(1))
2665 .set_header("throttled", Value::Bool(true))
2666 .to("mock:throttled")
2667 .end_throttle()
2668 .build()
2669 .unwrap();
2670
2671 match &definition.steps()[0] {
2672 BuilderStep::Throttle { steps, .. } => {
2673 assert_eq!(steps.len(), 2); }
2675 other => panic!("Expected Throttle, got {:?}", other),
2676 }
2677 }
2678
2679 #[test]
2680 fn test_throttle_step_after_throttle() {
2681 let definition = RouteBuilder::from("timer:tick")
2683 .route_id("test-route")
2684 .throttle(10, std::time::Duration::from_secs(1))
2685 .to("mock:inner")
2686 .end_throttle()
2687 .to("mock:outer")
2688 .build()
2689 .unwrap();
2690
2691 assert_eq!(definition.steps().len(), 2);
2692 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2693 }
2694
2695 #[test]
2698 fn test_load_balance_builder_typestate() {
2699 let definition = RouteBuilder::from("timer:tick")
2700 .route_id("test-route")
2701 .load_balance()
2702 .round_robin()
2703 .to("mock:a")
2704 .to("mock:b")
2705 .end_load_balance()
2706 .build()
2707 .unwrap();
2708
2709 assert_eq!(definition.steps().len(), 1);
2710 assert!(matches!(
2711 &definition.steps()[0],
2712 BuilderStep::LoadBalance { .. }
2713 ));
2714 }
2715
2716 #[test]
2717 fn test_load_balance_builder_with_strategy() {
2718 let definition = RouteBuilder::from("timer:tick")
2719 .route_id("test-route")
2720 .load_balance()
2721 .random()
2722 .to("mock:result")
2723 .end_load_balance()
2724 .build()
2725 .unwrap();
2726
2727 if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2728 assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2729 } else {
2730 panic!("Expected LoadBalance step");
2731 }
2732 }
2733
2734 #[test]
2735 fn test_load_balance_builder_steps_collected() {
2736 let definition = RouteBuilder::from("timer:tick")
2737 .route_id("test-route")
2738 .load_balance()
2739 .set_header("lb", Value::Bool(true))
2740 .to("mock:a")
2741 .end_load_balance()
2742 .build()
2743 .unwrap();
2744
2745 match &definition.steps()[0] {
2746 BuilderStep::LoadBalance { steps, .. } => {
2747 assert_eq!(steps.len(), 2); }
2749 other => panic!("Expected LoadBalance, got {:?}", other),
2750 }
2751 }
2752
2753 #[test]
2754 fn test_load_balance_step_after_load_balance() {
2755 let definition = RouteBuilder::from("timer:tick")
2757 .route_id("test-route")
2758 .load_balance()
2759 .to("mock:inner")
2760 .end_load_balance()
2761 .to("mock:outer")
2762 .build()
2763 .unwrap();
2764
2765 assert_eq!(definition.steps().len(), 2);
2766 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2767 }
2768
2769 #[test]
2772 fn test_dynamic_router_builder() {
2773 let definition = RouteBuilder::from("timer:tick")
2774 .route_id("test-route")
2775 .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2776 .build()
2777 .unwrap();
2778
2779 assert_eq!(definition.steps().len(), 1);
2780 assert!(matches!(
2781 &definition.steps()[0],
2782 BuilderStep::DynamicRouter { .. }
2783 ));
2784 }
2785
2786 #[test]
2787 fn test_dynamic_router_builder_with_config() {
2788 let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2789 .max_iterations(100)
2790 .cache_size(500);
2791
2792 let definition = RouteBuilder::from("timer:tick")
2793 .route_id("test-route")
2794 .dynamic_router_with_config(config)
2795 .build()
2796 .unwrap();
2797
2798 assert_eq!(definition.steps().len(), 1);
2799 if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2800 assert_eq!(config.max_iterations, 100);
2801 assert_eq!(config.cache_size, 500);
2802 } else {
2803 panic!("Expected DynamicRouter step");
2804 }
2805 }
2806
2807 #[test]
2808 fn test_dynamic_router_step_after_router() {
2809 let definition = RouteBuilder::from("timer:tick")
2811 .route_id("test-route")
2812 .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2813 .to("mock:outer")
2814 .build()
2815 .unwrap();
2816
2817 assert_eq!(definition.steps().len(), 2);
2818 assert!(matches!(
2819 &definition.steps()[0],
2820 BuilderStep::DynamicRouter { .. }
2821 ));
2822 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2823 }
2824
2825 #[test]
2826 fn routing_slip_builder_creates_step() {
2827 use camel_api::RoutingSlipExpression;
2828
2829 let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2830
2831 let route = RouteBuilder::from("direct:start")
2832 .route_id("routing-slip-test")
2833 .routing_slip(expression)
2834 .build()
2835 .unwrap();
2836
2837 assert!(
2838 matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2839 "Expected RoutingSlip step"
2840 );
2841 }
2842
2843 #[test]
2844 fn routing_slip_with_config_builder_creates_step() {
2845 use camel_api::RoutingSlipConfig;
2846
2847 let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2848 .uri_delimiter("|")
2849 .cache_size(50)
2850 .ignore_invalid_endpoints(true);
2851
2852 let route = RouteBuilder::from("direct:start")
2853 .route_id("routing-slip-config-test")
2854 .routing_slip_with_config(config)
2855 .build()
2856 .unwrap();
2857
2858 if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2859 assert_eq!(config.uri_delimiter, "|");
2860 assert_eq!(config.cache_size, 50);
2861 assert!(config.ignore_invalid_endpoints);
2862 } else {
2863 panic!("Expected RoutingSlip step");
2864 }
2865 }
2866
2867 #[test]
2868 fn test_builder_marshal_adds_processor_step() {
2869 let definition = RouteBuilder::from("timer:tick")
2870 .route_id("test-route")
2871 .marshal("json")
2872 .unwrap()
2873 .build()
2874 .unwrap();
2875 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2876 }
2877
2878 #[test]
2879 fn test_builder_unmarshal_adds_processor_step() {
2880 let definition = RouteBuilder::from("timer:tick")
2881 .route_id("test-route")
2882 .unmarshal("json")
2883 .unwrap()
2884 .build()
2885 .unwrap();
2886 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2887 }
2888
2889 #[test]
2890 fn test_builder_stream_cache_adds_processor_step() {
2891 let definition = RouteBuilder::from("timer:tick")
2892 .route_id("test-route")
2893 .stream_cache(1024)
2894 .build()
2895 .unwrap();
2896 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2897 }
2898
2899 #[test]
2900 fn validate_adds_to_step_with_validator_uri() {
2901 let def = RouteBuilder::from("direct:in")
2902 .route_id("test")
2903 .validate("schemas/order.xsd")
2904 .build()
2905 .unwrap();
2906 let steps = def.steps();
2907 assert_eq!(steps.len(), 1);
2908 assert!(
2909 matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2910 "got: {:?}",
2911 steps[0]
2912 );
2913 }
2914
2915 #[test]
2916 fn test_builder_marshal_returns_err_for_unknown_format() {
2917 let result = RouteBuilder::from("timer:tick")
2918 .route_id("test-route")
2919 .marshal("csv");
2920 let err = match result {
2921 Err(e) => e,
2922 Ok(_) => panic!("marshal with unknown format should return Err"),
2923 };
2924 let msg = err.to_string();
2925 assert!(
2926 msg.contains("unknown data format"),
2927 "error should mention unknown format, got: {msg}"
2928 );
2929 assert!(
2930 msg.contains("csv"),
2931 "error should mention format name, got: {msg}"
2932 );
2933 }
2934
2935 #[test]
2936 fn test_builder_unmarshal_returns_err_for_unknown_format() {
2937 let result = RouteBuilder::from("timer:tick")
2938 .route_id("test-route")
2939 .unmarshal("csv");
2940 let err = match result {
2941 Err(e) => e,
2942 Ok(_) => panic!("unmarshal with unknown format should return Err"),
2943 };
2944 let msg = err.to_string();
2945 assert!(
2946 msg.contains("unknown data format"),
2947 "error should mention unknown format, got: {msg}"
2948 );
2949 assert!(
2950 msg.contains("csv"),
2951 "error should mention format name, got: {msg}"
2952 );
2953 }
2954
2955 #[test]
2956 fn test_builder_recipient_list_creates_step() {
2957 let route = RouteBuilder::from("direct:start")
2958 .route_id("recipient-list-test")
2959 .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2960 .build()
2961 .unwrap();
2962
2963 assert!(matches!(
2964 &route.steps()[0],
2965 BuilderStep::RecipientList { .. }
2966 ));
2967 }
2968
2969 #[test]
2970 fn test_builder_recipient_list_with_config_creates_step() {
2971 let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2972
2973 let route = RouteBuilder::from("direct:start")
2974 .route_id("recipient-list-config-test")
2975 .recipient_list_with_config(config)
2976 .build()
2977 .unwrap();
2978
2979 assert!(matches!(
2980 &route.steps()[0],
2981 BuilderStep::RecipientList { .. }
2982 ));
2983 }
2984
2985 #[test]
2986 fn test_builder_script_adds_script_step() {
2987 let route = RouteBuilder::from("direct:start")
2988 .route_id("script-test")
2989 .script("rhai", "headers[\"x\"] = \"y\"")
2990 .build()
2991 .unwrap();
2992
2993 assert!(matches!(
2994 &route.steps()[0],
2995 BuilderStep::Script { language, script }
2996 if language == "rhai" && script == "headers[\"x\"] = \"y\""
2997 ));
2998 }
2999
3000 #[test]
3001 fn test_builder_delay_and_delay_with_header_add_steps() {
3002 let route = RouteBuilder::from("direct:start")
3003 .route_id("delay-test")
3004 .delay(Duration::from_millis(250))
3005 .delay_with_header(Duration::from_millis(500), "x-delay")
3006 .build()
3007 .unwrap();
3008
3009 assert_eq!(route.steps().len(), 2);
3010 assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
3011 assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
3012 }
3013
3014 #[test]
3015 fn test_builder_log_and_stop_add_steps_in_order() {
3016 let route = RouteBuilder::from("direct:start")
3017 .route_id("log-stop-test")
3018 .log("hello", LogLevel::Info)
3019 .stop()
3020 .to("mock:after")
3021 .build()
3022 .unwrap();
3023
3024 assert_eq!(route.steps().len(), 3);
3025 assert!(matches!(
3026 &route.steps()[0],
3027 BuilderStep::Log { message, .. } if message == "hello"
3028 ));
3029 assert!(matches!(&route.steps()[1], BuilderStep::Stop));
3030 assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
3031 }
3032
3033 #[test]
3034 fn test_builder_stream_cache_default_adds_processor_step() {
3035 let route = RouteBuilder::from("direct:start")
3036 .route_id("stream-cache-default-test")
3037 .stream_cache_default()
3038 .build()
3039 .unwrap();
3040
3041 assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
3042 }
3043
3044 #[test]
3045 fn test_validate_preserves_existing_validator_prefix() {
3046 let route = RouteBuilder::from("direct:in")
3047 .route_id("validate-prefix-test")
3048 .validate("validator:schemas/order.xsd")
3049 .build()
3050 .unwrap();
3051
3052 assert!(matches!(
3053 &route.steps()[0],
3054 BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
3055 ));
3056 }
3057
3058 #[test]
3059 fn test_load_balance_builder_weighted_failover_parallel_config() {
3060 let route = RouteBuilder::from("direct:start")
3061 .route_id("lb-weighted-failover-parallel")
3062 .load_balance()
3063 .weighted(vec![
3064 ("direct:a".to_string(), 3),
3065 ("direct:b".to_string(), 1),
3066 ])
3067 .failover()
3068 .parallel(true)
3069 .to("mock:result")
3070 .end_load_balance()
3071 .build()
3072 .unwrap();
3073
3074 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3075 assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3076 assert!(config.parallel);
3077 } else {
3078 panic!("Expected LoadBalance step");
3079 }
3080 }
3081
3082 #[test]
3083 fn test_multicast_builder_all_config_setters() {
3084 let route = RouteBuilder::from("direct:start")
3085 .route_id("multicast-config-test")
3086 .multicast()
3087 .parallel(true)
3088 .parallel_limit(4)
3089 .stop_on_exception(true)
3090 .timeout(Duration::from_millis(300))
3091 .aggregation(MulticastStrategy::Original)
3092 .to("mock:a")
3093 .end_multicast()
3094 .build()
3095 .unwrap();
3096
3097 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3098 assert!(config.parallel);
3099 assert_eq!(config.parallel_limit, Some(4));
3100 assert!(config.stop_on_exception);
3101 assert_eq!(config.timeout, Some(Duration::from_millis(300)));
3102 assert!(matches!(config.aggregation, MulticastStrategy::Original));
3103 } else {
3104 panic!("Expected Multicast step");
3105 }
3106 }
3107
3108 #[test]
3109 fn test_build_canonical_rejects_unsupported_processor_step() {
3110 let err = RouteBuilder::from("direct:start")
3111 .route_id("canonical-reject")
3112 .set_header("k", Value::String("v".into()))
3113 .build_canonical()
3114 .unwrap_err();
3115
3116 assert!(format!("{err}").contains("does not support step `processor`"));
3117 }
3118
3119 #[test]
3122 fn test_load_balance_builder_weighted_strategy() {
3123 let route = RouteBuilder::from("direct:start")
3124 .route_id("lb-weighted")
3125 .load_balance()
3126 .weighted(vec![
3127 ("direct:a".to_string(), 5),
3128 ("direct:b".to_string(), 2),
3129 ("direct:c".to_string(), 1),
3130 ])
3131 .to("mock:result")
3132 .end_load_balance()
3133 .build()
3134 .unwrap();
3135
3136 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3137 assert!(matches!(config.strategy, LoadBalanceStrategy::Weighted(_)));
3138 } else {
3139 panic!("Expected LoadBalance step");
3140 }
3141 }
3142
3143 #[test]
3144 fn test_load_balance_builder_failover_strategy() {
3145 let route = RouteBuilder::from("direct:start")
3146 .route_id("lb-failover")
3147 .load_balance()
3148 .failover()
3149 .to("mock:primary")
3150 .end_load_balance()
3151 .build()
3152 .unwrap();
3153
3154 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3155 assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3156 assert!(!config.parallel);
3157 } else {
3158 panic!("Expected LoadBalance step");
3159 }
3160 }
3161
3162 #[test]
3163 fn test_load_balance_builder_parallel_false_explicit() {
3164 let route = RouteBuilder::from("direct:start")
3165 .route_id("lb-parallel-false")
3166 .load_balance()
3167 .round_robin()
3168 .parallel(false)
3169 .to("mock:result")
3170 .end_load_balance()
3171 .build()
3172 .unwrap();
3173
3174 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3175 assert!(!config.parallel);
3176 } else {
3177 panic!("Expected LoadBalance step");
3178 }
3179 }
3180
3181 #[test]
3184 fn test_filter_in_split_builder_typestate() {
3185 use camel_api::splitter::{SplitterConfig, split_body_lines};
3186
3187 let definition = RouteBuilder::from("timer:test")
3188 .route_id("filter-in-split")
3189 .split(SplitterConfig::new(split_body_lines()))
3190 .filter(|_ex| true)
3191 .to("mock:filtered")
3192 .end_filter()
3193 .end_split()
3194 .build()
3195 .unwrap();
3196
3197 assert_eq!(definition.steps().len(), 1);
3198 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3199 assert_eq!(steps.len(), 1);
3200 assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3201 } else {
3202 panic!("Expected Split step");
3203 }
3204 }
3205
3206 #[test]
3207 fn test_filter_in_split_builder_multiple_steps() {
3208 use camel_api::splitter::{SplitterConfig, split_body_lines};
3209
3210 let definition = RouteBuilder::from("timer:test")
3211 .route_id("filter-in-split-multi")
3212 .split(SplitterConfig::new(split_body_lines()))
3213 .to("mock:before-filter")
3214 .filter(|_ex| true)
3215 .to("mock:inside-filter")
3216 .end_filter()
3217 .to("mock:after-filter")
3218 .end_split()
3219 .build()
3220 .unwrap();
3221
3222 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3223 assert_eq!(steps.len(), 3);
3225 } else {
3226 panic!("Expected Split step");
3227 }
3228 }
3229
3230 #[test]
3233 fn test_build_canonical_with_circuit_breaker() {
3234 use camel_api::circuit_breaker::CircuitBreakerConfig;
3235
3236 let spec = RouteBuilder::from("direct:start")
3237 .route_id("canonical-cb")
3238 .circuit_breaker(CircuitBreakerConfig::new().failure_threshold(10))
3239 .to("mock:result")
3240 .build_canonical()
3241 .unwrap();
3242
3243 let cb = spec.circuit_breaker.expect("circuit breaker should be set");
3244 assert_eq!(cb.failure_threshold, 10);
3245 }
3246
3247 #[test]
3248 fn test_build_canonical_rejects_custom_split_aggregation() {
3249 use camel_api::splitter::{SplitterConfig, split_body_lines};
3250
3251 let err = RouteBuilder::from("direct:start")
3252 .route_id("canonical-custom-split")
3253 .split(SplitterConfig::new(split_body_lines()).aggregation(
3254 camel_api::splitter::AggregationStrategy::Custom(Arc::new(|_, ex| ex)),
3255 ))
3256 .to("mock:frag")
3257 .end_split()
3258 .build_canonical()
3259 .unwrap_err();
3260
3261 assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3263 }
3264
3265 #[test]
3266 fn test_build_canonical_rejects_custom_aggregate_strategy() {
3267 let err = RouteBuilder::from("direct:start")
3268 .route_id("canonical-custom-agg")
3269 .aggregate(
3270 AggregatorConfig::correlate_by("key")
3271 .complete_when_size(2)
3272 .strategy(AggregationStrategy::Custom(Arc::new(|_, ex| ex)))
3273 .build()
3274 .unwrap(),
3275 )
3276 .build_canonical()
3277 .unwrap_err();
3278
3279 assert!(format!("{err}").contains("custom aggregate strategy"));
3280 }
3281
3282 #[test]
3283 fn test_build_canonical_rejects_fn_correlation_strategy() {
3284 let err = RouteBuilder::from("direct:start")
3285 .route_id("canonical-fn-corr")
3286 .aggregate(AggregatorConfig {
3287 header_name: "key".to_string(),
3288 completion: CompletionMode::Single(CompletionCondition::Size(1)),
3289 correlation: CorrelationStrategy::Fn(Arc::new(|_| Some("key".to_string()))),
3290 strategy: AggregationStrategy::CollectAll,
3291 max_buckets: None,
3292 bucket_ttl: None,
3293 force_completion_on_stop: false,
3294 discard_on_timeout: false,
3295 })
3296 .build_canonical()
3297 .unwrap_err();
3298
3299 assert!(format!("{err}").contains("Fn correlation strategy"));
3300 }
3301
3302 #[test]
3303 fn test_build_canonical_rejects_predicate_completion() {
3304 let err = RouteBuilder::from("direct:start")
3305 .route_id("canonical-pred-completion")
3306 .aggregate(AggregatorConfig {
3307 header_name: "key".to_string(),
3308 completion: CompletionMode::Single(CompletionCondition::Predicate(Arc::new(
3309 |_| false,
3310 ))),
3311 correlation: CorrelationStrategy::HeaderName("key".to_string()),
3312 strategy: AggregationStrategy::CollectAll,
3313 max_buckets: None,
3314 bucket_ttl: None,
3315 force_completion_on_stop: false,
3316 discard_on_timeout: false,
3317 })
3318 .build_canonical()
3319 .unwrap_err();
3320
3321 assert!(format!("{err}").contains("predicate completion"));
3322 }
3323
3324 #[test]
3325 fn test_build_canonical_with_expression_correlation() {
3326 let spec = RouteBuilder::from("direct:start")
3327 .route_id("canonical-expr-corr")
3328 .aggregate(AggregatorConfig {
3329 header_name: "key".to_string(),
3330 completion: CompletionMode::Single(CompletionCondition::Size(1)),
3331 correlation: CorrelationStrategy::Expression {
3332 expr: "header.key".to_string(),
3333 language: "simple".to_string(),
3334 },
3335 strategy: AggregationStrategy::CollectAll,
3336 max_buckets: None,
3337 bucket_ttl: None,
3338 force_completion_on_stop: false,
3339 discard_on_timeout: false,
3340 })
3341 .build_canonical()
3342 .unwrap();
3343
3344 assert!(spec.steps.iter().any(|s| matches!(s, CanonicalStepSpec::Aggregate(a) if a.correlation_key == Some("header.key".to_string()))));
3345 }
3346
3347 #[test]
3348 fn test_build_canonical_split_rejected_with_closure_expression() {
3349 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
3350
3351 let err = RouteBuilder::from("direct:start")
3353 .route_id("canonical-split-last")
3354 .split(
3355 SplitterConfig::new(split_body_lines()).aggregation(AggregationStrategy::LastWins),
3356 )
3357 .to("mock:frag")
3358 .end_split()
3359 .build_canonical()
3360 .unwrap_err();
3361
3362 assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3363 }
3364
3365 #[test]
3368 fn test_on_exception_full_chain_retry_backoff_jitter_handled_by() {
3369 let definition = RouteBuilder::from("direct:start")
3370 .route_id("on-exception-full")
3371 .dead_letter_channel("log:dlc")
3372 .on_exception(|e| matches!(e, CamelError::Io(_)))
3373 .retry(5)
3374 .with_backoff(Duration::from_millis(10), 2.0, Duration::from_millis(500))
3375 .with_jitter(0.3)
3376 .handled_by("log:io-handler")
3377 .end_on_exception()
3378 .to("mock:out")
3379 .build()
3380 .unwrap();
3381
3382 let cfg = definition
3383 .error_handler_config()
3384 .expect("error handler should be set");
3385 assert_eq!(cfg.policies.len(), 1);
3386 let policy = &cfg.policies[0];
3387 let retry = policy.retry.as_ref().expect("retry should be set");
3388 assert_eq!(retry.max_attempts, 5);
3389 assert_eq!(retry.initial_delay, Duration::from_millis(10));
3390 assert_eq!(retry.multiplier, 2.0);
3391 assert_eq!(retry.max_delay, Duration::from_millis(500));
3392 assert!((retry.jitter_factor - 0.3).abs() < f64::EPSILON);
3393 assert_eq!(policy.handled_by.as_deref(), Some("log:io-handler"));
3394 }
3395
3396 #[test]
3397 fn test_on_exception_jitter_clamped_to_valid_range() {
3398 let definition = RouteBuilder::from("direct:start")
3399 .route_id("jitter-clamp")
3400 .on_exception(|_e| true)
3401 .retry(1)
3402 .with_jitter(5.0)
3403 .end_on_exception()
3404 .to("mock:out")
3405 .build()
3406 .unwrap();
3407
3408 let cfg = definition.error_handler_config().unwrap();
3409 let retry = cfg.policies[0].retry.as_ref().unwrap();
3410 assert!((retry.jitter_factor - 1.0).abs() < f64::EPSILON);
3411 }
3412
3413 #[test]
3416 fn test_builder_process_fn_adds_processor_step() {
3417 use camel_api::BoxProcessorExt;
3418 let processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
3419 let definition = RouteBuilder::from("timer:tick")
3420 .route_id("process-fn-test")
3421 .process_fn(processor)
3422 .build()
3423 .unwrap();
3424
3425 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3426 }
3427
3428 #[test]
3429 fn test_builder_convert_body_to_adds_processor_step() {
3430 let definition = RouteBuilder::from("timer:tick")
3431 .route_id("convert-body-test")
3432 .convert_body_to(BodyType::Json)
3433 .build()
3434 .unwrap();
3435
3436 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3437 }
3438
3439 #[test]
3440 fn test_builder_bean_adds_bean_step() {
3441 let definition = RouteBuilder::from("timer:tick")
3442 .route_id("bean-test")
3443 .bean("myBean", "process")
3444 .build()
3445 .unwrap();
3446
3447 assert!(
3448 matches!(&definition.steps()[0], BuilderStep::Bean { name, method }
3449 if name == "myBean" && method == "process")
3450 );
3451 }
3452
3453 #[test]
3456 fn test_throttle_builder_delay_strategy() {
3457 let definition = RouteBuilder::from("timer:tick")
3458 .route_id("throttle-delay")
3459 .throttle(10, Duration::from_secs(1))
3460 .strategy(ThrottleStrategy::Delay)
3461 .to("mock:result")
3462 .end_throttle()
3463 .build()
3464 .unwrap();
3465
3466 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3467 assert_eq!(config.strategy, ThrottleStrategy::Delay);
3468 } else {
3469 panic!("Expected Throttle step");
3470 }
3471 }
3472
3473 #[test]
3474 fn test_throttle_builder_drop_strategy() {
3475 let definition = RouteBuilder::from("timer:tick")
3476 .route_id("throttle-drop")
3477 .throttle(10, Duration::from_secs(1))
3478 .strategy(ThrottleStrategy::Drop)
3479 .to("mock:result")
3480 .end_throttle()
3481 .build()
3482 .unwrap();
3483
3484 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3485 assert_eq!(config.strategy, ThrottleStrategy::Drop);
3486 } else {
3487 panic!("Expected Throttle step");
3488 }
3489 }
3490
3491 #[test]
3494 fn test_nested_loop_while_builder() {
3495 use camel_api::loop_eip::LoopMode;
3496
3497 let def = RouteBuilder::from("direct:start")
3498 .route_id("nested-loop-while")
3499 .loop_count(2)
3500 .to("mock:outer")
3501 .loop_while(|_ex| true)
3502 .to("mock:inner")
3503 .end_loop()
3504 .end_loop()
3505 .build()
3506 .unwrap();
3507
3508 assert_eq!(def.steps().len(), 1);
3509 if let BuilderStep::Loop { steps, .. } = &def.steps()[0] {
3510 assert_eq!(steps.len(), 2);
3511 if let BuilderStep::Loop { config, .. } = &steps[1] {
3512 assert!(matches!(config.mode, LoopMode::While(_)));
3513 } else {
3514 panic!("Expected inner Loop step");
3515 }
3516 } else {
3517 panic!("Expected outer Loop step");
3518 }
3519 }
3520
3521 #[test]
3524 fn test_choice_builder_multiple_whens_with_otherwise() {
3525 let definition = RouteBuilder::from("timer:tick")
3526 .route_id("choice-multi-otherwise")
3527 .choice()
3528 .when(|ex: &Exchange| ex.input.header("a").is_some())
3529 .to("mock:a")
3530 .end_when()
3531 .when(|ex: &Exchange| ex.input.header("b").is_some())
3532 .to("mock:b")
3533 .end_when()
3534 .when(|ex: &Exchange| ex.input.header("c").is_some())
3535 .to("mock:c")
3536 .end_when()
3537 .otherwise()
3538 .to("mock:fallback")
3539 .end_otherwise()
3540 .end_choice()
3541 .build()
3542 .unwrap();
3543
3544 if let BuilderStep::Choice { whens, otherwise } = &definition.steps()[0] {
3545 assert_eq!(whens.len(), 3);
3546 assert!(otherwise.is_some());
3547 assert_eq!(otherwise.as_ref().unwrap().len(), 1);
3548 } else {
3549 panic!("Expected Choice step");
3550 }
3551 }
3552
3553 #[test]
3556 fn test_multicast_builder_parallel_only() {
3557 let route = RouteBuilder::from("direct:start")
3558 .route_id("multicast-parallel")
3559 .multicast()
3560 .parallel(true)
3561 .to("mock:a")
3562 .end_multicast()
3563 .build()
3564 .unwrap();
3565
3566 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3567 assert!(config.parallel);
3568 assert_eq!(config.parallel_limit, None);
3569 } else {
3570 panic!("Expected Multicast step");
3571 }
3572 }
3573
3574 #[test]
3575 fn test_multicast_builder_timeout_only() {
3576 let route = RouteBuilder::from("direct:start")
3577 .route_id("multicast-timeout")
3578 .multicast()
3579 .timeout(Duration::from_secs(5))
3580 .to("mock:a")
3581 .end_multicast()
3582 .build()
3583 .unwrap();
3584
3585 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3586 assert_eq!(config.timeout, Some(Duration::from_secs(5)));
3587 } else {
3588 panic!("Expected Multicast step");
3589 }
3590 }
3591
3592 #[test]
3593 fn test_multicast_builder_aggregation_collect_all() {
3594 let route = RouteBuilder::from("direct:start")
3595 .route_id("multicast-collect")
3596 .multicast()
3597 .aggregation(MulticastStrategy::CollectAll)
3598 .to("mock:a")
3599 .end_multicast()
3600 .build()
3601 .unwrap();
3602
3603 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3604 assert!(matches!(config.aggregation, MulticastStrategy::CollectAll));
3605 } else {
3606 panic!("Expected Multicast step");
3607 }
3608 }
3609
3610 #[test]
3613 fn test_build_canonical_aggregate_any_completion_mode() {
3614 let spec = RouteBuilder::from("direct:start")
3615 .route_id("canonical-any-completion")
3616 .aggregate(
3617 AggregatorConfig::correlate_by("key")
3618 .complete_on_size_or_timeout(10, Duration::from_secs(30))
3619 .build()
3620 .unwrap(),
3621 )
3622 .build_canonical()
3623 .unwrap();
3624
3625 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3626 assert_eq!(agg.completion_size, Some(10));
3627 assert_eq!(agg.completion_timeout_ms, Some(30_000));
3628 } else {
3629 panic!("Expected Aggregate step");
3630 }
3631 }
3632
3633 #[test]
3634 fn test_build_canonical_aggregate_timeout_completion() {
3635 let spec = RouteBuilder::from("direct:start")
3636 .route_id("canonical-timeout-completion")
3637 .aggregate(
3638 AggregatorConfig::correlate_by("key")
3639 .complete_on_timeout(Duration::from_millis(500))
3640 .build()
3641 .unwrap(),
3642 )
3643 .build_canonical()
3644 .unwrap();
3645
3646 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3647 assert_eq!(agg.completion_size, None);
3648 assert_eq!(agg.completion_timeout_ms, Some(500));
3649 } else {
3650 panic!("Expected Aggregate step");
3651 }
3652 }
3653
3654 #[test]
3657 fn test_build_canonical_aggregate_discard_on_timeout() {
3658 use camel_api::aggregator::AggregatorConfig;
3659
3660 let spec = RouteBuilder::from("direct:start")
3661 .route_id("canonical-discard-timeout")
3662 .aggregate(
3663 AggregatorConfig::correlate_by("key")
3664 .complete_when_size(1)
3665 .discard_on_timeout(true)
3666 .build()
3667 .unwrap(),
3668 )
3669 .build_canonical()
3670 .unwrap();
3671
3672 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3673 assert_eq!(agg.discard_on_timeout, Some(true));
3674 } else {
3675 panic!("Expected Aggregate step");
3676 }
3677 }
3678
3679 #[test]
3680 fn test_build_canonical_aggregate_force_completion_on_stop() {
3681 use camel_api::aggregator::AggregatorConfig;
3682
3683 let spec = RouteBuilder::from("direct:start")
3684 .route_id("canonical-force-stop")
3685 .aggregate(
3686 AggregatorConfig::correlate_by("key")
3687 .complete_when_size(1)
3688 .force_completion_on_stop(true)
3689 .build()
3690 .unwrap(),
3691 )
3692 .build_canonical()
3693 .unwrap();
3694
3695 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3696 assert_eq!(agg.force_completion_on_stop, Some(true));
3697 } else {
3698 panic!("Expected Aggregate step");
3699 }
3700 }
3701
3702 #[test]
3705 fn test_build_canonical_aggregate_max_buckets_and_ttl() {
3706 use camel_api::aggregator::AggregatorConfig;
3707
3708 let spec = RouteBuilder::from("direct:start")
3709 .route_id("canonical-buckets-ttl")
3710 .aggregate(
3711 AggregatorConfig::correlate_by("key")
3712 .complete_when_size(1)
3713 .max_buckets(100)
3714 .bucket_ttl(Duration::from_secs(60))
3715 .build()
3716 .unwrap(),
3717 )
3718 .build_canonical()
3719 .unwrap();
3720
3721 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3722 assert_eq!(agg.max_buckets, Some(100));
3723 assert_eq!(agg.bucket_ttl_ms, Some(60_000));
3724 } else {
3725 panic!("Expected Aggregate step");
3726 }
3727 }
3728
3729 #[test]
3732 fn test_split_builder_with_filter_inside() {
3733 use camel_api::splitter::{SplitterConfig, split_body_lines};
3734
3735 let definition = RouteBuilder::from("timer:test")
3736 .route_id("split-with-filter")
3737 .split(SplitterConfig::new(split_body_lines()))
3738 .filter(|_ex| true)
3739 .to("mock:filtered-frag")
3740 .end_filter()
3741 .end_split()
3742 .build()
3743 .unwrap();
3744
3745 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3746 assert_eq!(steps.len(), 1);
3747 assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3748 } else {
3749 panic!("Expected Split step");
3750 }
3751 }
3752
3753 #[test]
3756 fn test_wire_tap_multiple_taps() {
3757 let definition = RouteBuilder::from("timer:tick")
3758 .route_id("multi-wire-tap")
3759 .wire_tap("mock:tap1")
3760 .wire_tap("mock:tap2")
3761 .to("mock:result")
3762 .build()
3763 .unwrap();
3764
3765 assert_eq!(definition.steps().len(), 3);
3766 assert!(
3767 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap1")
3768 );
3769 assert!(
3770 matches!(&definition.steps()[1], BuilderStep::WireTap { uri } if uri == "mock:tap2")
3771 );
3772 }
3773
3774 #[test]
3777 fn test_builder_shorthand_then_explicit_mixed_mode() {
3778 let result = RouteBuilder::from("direct:start")
3779 .route_id("mixed-mode-2")
3780 .dead_letter_channel("log:dlc")
3781 .error_handler(ErrorHandlerConfig::log_only())
3782 .to("mock:out")
3783 .build();
3784
3785 let err = result.err().expect("mixed mode should fail");
3786 assert!(format!("{err}").contains("mixed error handler modes"));
3787 }
3788
3789 #[test]
3792 fn test_build_canonical_empty_from_uri_errors() {
3793 let result = RouteBuilder::from("").route_id("test").build_canonical();
3794 assert!(result.is_err());
3795 }
3796
3797 #[test]
3798 fn test_build_canonical_missing_route_id_errors() {
3799 let result = RouteBuilder::from("direct:start").build_canonical();
3800 assert!(result.is_err());
3801 let err = result.unwrap_err().to_string();
3802 assert!(err.contains("route_id"));
3803 }
3804
3805 #[test]
3808 fn test_split_builder_with_aggregate_inside() {
3809 use camel_api::aggregator::AggregatorConfig;
3810 use camel_api::splitter::{SplitterConfig, split_body_lines};
3811
3812 let definition = RouteBuilder::from("timer:test")
3813 .route_id("split-agg")
3814 .split(SplitterConfig::new(split_body_lines()))
3815 .aggregate(
3816 AggregatorConfig::correlate_by("frag-key")
3817 .complete_when_size(3)
3818 .build()
3819 .unwrap(),
3820 )
3821 .end_split()
3822 .build()
3823 .unwrap();
3824
3825 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3826 assert_eq!(steps.len(), 1);
3827 assert!(matches!(&steps[0], BuilderStep::Aggregate { .. }));
3828 } else {
3829 panic!("Expected Split step");
3830 }
3831 }
3832
3833 #[test]
3836 fn test_throttle_builder_with_steps_inside() {
3837 let definition = RouteBuilder::from("timer:tick")
3838 .route_id("throttle-steps")
3839 .throttle(10, Duration::from_secs(1))
3840 .set_header("throttled", Value::Bool(true))
3841 .to("mock:throttled")
3842 .end_throttle()
3843 .build()
3844 .unwrap();
3845
3846 if let BuilderStep::Throttle { steps, .. } = &definition.steps()[0] {
3847 assert_eq!(steps.len(), 2);
3848 } else {
3849 panic!("Expected Throttle step");
3850 }
3851 }
3852
3853 #[test]
3856 fn test_load_balance_builder_with_steps_inside() {
3857 let definition = RouteBuilder::from("timer:tick")
3858 .route_id("lb-steps")
3859 .load_balance()
3860 .round_robin()
3861 .set_header("lb", Value::Bool(true))
3862 .to("mock:lb")
3863 .end_load_balance()
3864 .build()
3865 .unwrap();
3866
3867 if let BuilderStep::LoadBalance { steps, .. } = &definition.steps()[0] {
3868 assert_eq!(steps.len(), 2);
3869 } else {
3870 panic!("Expected LoadBalance step");
3871 }
3872 }
3873
3874 #[test]
3877 fn test_multicast_builder_with_steps_inside() {
3878 let definition = RouteBuilder::from("timer:tick")
3879 .route_id("multicast-steps")
3880 .multicast()
3881 .set_header("mc", Value::Bool(true))
3882 .to("mock:multicast")
3883 .end_multicast()
3884 .build()
3885 .unwrap();
3886
3887 if let BuilderStep::Multicast { steps, .. } = &definition.steps()[0] {
3888 assert_eq!(steps.len(), 2);
3889 } else {
3890 panic!("Expected Multicast step");
3891 }
3892 }
3893
3894 #[test]
3897 fn test_loop_builder_with_steps_inside() {
3898 let definition = RouteBuilder::from("timer:tick")
3899 .route_id("loop-steps")
3900 .loop_count(3)
3901 .set_header("loop", Value::Bool(true))
3902 .to("mock:loop")
3903 .end_loop()
3904 .build()
3905 .unwrap();
3906
3907 if let BuilderStep::Loop { steps, .. } = &definition.steps()[0] {
3908 assert_eq!(steps.len(), 2);
3909 } else {
3910 panic!("Expected Loop step");
3911 }
3912 }
3913
3914 #[test]
3917 fn test_build_canonical_rejects_loop_step() {
3918 let err = RouteBuilder::from("direct:start")
3919 .route_id("canonical-loop")
3920 .loop_count(3)
3921 .to("mock:loop")
3922 .end_loop()
3923 .build_canonical()
3924 .unwrap_err();
3925
3926 assert!(format!("{err}").contains("does not support step `loop`"));
3927 }
3928
3929 #[test]
3930 fn test_build_canonical_rejects_multicast_step() {
3931 let err = RouteBuilder::from("direct:start")
3932 .route_id("canonical-multicast")
3933 .multicast()
3934 .to("mock:a")
3935 .end_multicast()
3936 .build_canonical()
3937 .unwrap_err();
3938
3939 assert!(format!("{err}").contains("does not support step `multicast`"));
3940 }
3941
3942 #[test]
3943 fn test_build_canonical_rejects_throttle_step() {
3944 let err = RouteBuilder::from("direct:start")
3945 .route_id("canonical-throttle")
3946 .throttle(10, Duration::from_secs(1))
3947 .to("mock:result")
3948 .end_throttle()
3949 .build_canonical()
3950 .unwrap_err();
3951
3952 assert!(format!("{err}").contains("does not support step `throttle`"));
3953 }
3954
3955 #[test]
3956 fn test_build_canonical_rejects_load_balancer_step() {
3957 let err = RouteBuilder::from("direct:start")
3958 .route_id("canonical-lb")
3959 .load_balance()
3960 .round_robin()
3961 .to("mock:result")
3962 .end_load_balance()
3963 .build_canonical()
3964 .unwrap_err();
3965
3966 assert!(format!("{err}").contains("does not support step `load_balancer`"));
3967 }
3968
3969 #[test]
3970 fn test_build_canonical_rejects_bean_step() {
3971 let err = RouteBuilder::from("direct:start")
3972 .route_id("canonical-bean")
3973 .bean("myBean", "process")
3974 .build_canonical()
3975 .unwrap_err();
3976
3977 assert!(format!("{err}").contains("does not support step `bean`"));
3978 }
3979
3980 #[test]
3981 fn test_build_canonical_rejects_script_step() {
3982 let err = RouteBuilder::from("direct:start")
3983 .route_id("canonical-script")
3984 .script("rhai", "x = 1")
3985 .build_canonical()
3986 .unwrap_err();
3987
3988 assert!(format!("{err}").contains("does not support step `script`"));
3989 }
3990
3991 #[test]
3992 fn test_build_canonical_accepts_delay_step() {
3993 let spec = RouteBuilder::from("direct:start")
3994 .route_id("canonical-delay")
3995 .delay(Duration::from_millis(100))
3996 .build_canonical()
3997 .unwrap();
3998
3999 assert!(
4000 spec.steps.iter().any(
4001 |s| matches!(s, CanonicalStepSpec::Delay { delay_ms, .. } if *delay_ms == 100)
4002 )
4003 );
4004 }
4005
4006 #[test]
4007 fn test_build_canonical_accepts_wire_tap_step() {
4008 let spec = RouteBuilder::from("direct:start")
4009 .route_id("canonical-wiretap")
4010 .wire_tap("mock:tap")
4011 .build_canonical()
4012 .unwrap();
4013
4014 assert!(
4015 spec.steps
4016 .iter()
4017 .any(|s| matches!(s, CanonicalStepSpec::WireTap { uri } if uri == "mock:tap"))
4018 );
4019 }
4020
4021 #[test]
4022 fn test_build_canonical_rejects_dynamic_router_step() {
4023 let err = RouteBuilder::from("direct:start")
4024 .route_id("canonical-dyn-router")
4025 .dynamic_router(Arc::new(|_| Some("mock:a".to_string())))
4026 .build_canonical()
4027 .unwrap_err();
4028
4029 assert!(format!("{err}").contains("does not support step `dynamic_router`"));
4030 }
4031
4032 #[test]
4033 fn test_build_canonical_rejects_routing_slip_step() {
4034 let err = RouteBuilder::from("direct:start")
4035 .route_id("canonical-routing-slip")
4036 .routing_slip(Arc::new(|_| Some("mock:a".to_string())))
4037 .build_canonical()
4038 .unwrap_err();
4039
4040 assert!(format!("{err}").contains("does not support step `routing_slip`"));
4041 }
4042
4043 #[test]
4044 fn test_build_canonical_rejects_recipient_list_step() {
4045 let err = RouteBuilder::from("direct:start")
4046 .route_id("canonical-recipient")
4047 .recipient_list(Arc::new(|_| "mock:a".to_string()))
4048 .build_canonical()
4049 .unwrap_err();
4050
4051 assert!(format!("{err}").contains("does not support step `recipient_list`"));
4052 }
4053
4054 #[test]
4057 fn test_build_canonical_rejects_any_mode_with_predicate() {
4058 let err = RouteBuilder::from("direct:start")
4059 .route_id("canonical-any-pred")
4060 .aggregate(AggregatorConfig {
4061 header_name: "key".to_string(),
4062 completion: CompletionMode::Any(vec![
4063 CompletionCondition::Size(5),
4064 CompletionCondition::Predicate(Arc::new(|_| false)),
4065 ]),
4066 correlation: CorrelationStrategy::HeaderName("key".to_string()),
4067 strategy: AggregationStrategy::CollectAll,
4068 max_buckets: None,
4069 bucket_ttl: None,
4070 force_completion_on_stop: false,
4071 discard_on_timeout: false,
4072 })
4073 .build_canonical()
4074 .unwrap_err();
4075
4076 assert!(format!("{err}").contains("predicate completion"));
4077 }
4078
4079 #[test]
4082 fn test_builder_validation_missing_from_uri() {
4083 let result = RouteBuilder::from("")
4084 .route_id("missing-uri-route")
4085 .to("log:info")
4086 .build();
4087 assert!(result.is_err(), "empty from URI should fail validation");
4088 let err = result.err().unwrap().to_string();
4089 assert!(
4090 err.contains("'from'") || err.contains("URI"),
4091 "error should mention from/URI, got: {err}"
4092 );
4093 }
4094
4095 #[test]
4096 fn test_builder_validation_invalid_step_uri_scheme() {
4097 let result = RouteBuilder::from("timer:tick")
4098 .route_id("bad-step-route")
4099 .to("not-a-valid-uri") .build();
4101 assert!(
4104 result.is_ok(),
4105 "builder should accept opaque step URIs; resolution happens later"
4106 );
4107 }
4108
4109 #[test]
4112 fn test_builder_duplicate_route_ids_produce_identical_definitions() {
4113 let route1 = RouteBuilder::from("direct:a")
4116 .route_id("dup-route")
4117 .to("mock:out")
4118 .build();
4119 let route2 = RouteBuilder::from("direct:b")
4120 .route_id("dup-route")
4121 .to("mock:out")
4122 .build();
4123
4124 assert!(route1.is_ok());
4125 assert!(route2.is_ok());
4126 assert_eq!(route1.unwrap().route_id(), route2.unwrap().route_id());
4127 }
4128}