1use camel_api::DelayConfig;
7use camel_api::aggregator::{
8 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
9};
10use camel_api::body::Body;
11use camel_api::body_converter::BodyType;
12use camel_api::circuit_breaker::CircuitBreakerConfig;
13use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
14use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
15use camel_api::load_balancer::LoadBalancerConfig;
16use camel_api::loop_eip::{LoopConfig, LoopMode};
17use camel_api::multicast::{MulticastConfig, MulticastStrategy};
18use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
19use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
20use camel_api::splitter::SplitterConfig;
21use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
22use camel_api::{
23 BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
24 ProcessorFn, Value,
25 runtime::{
26 CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
27 CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
28 CanonicalWhenSpec,
29 },
30};
31use camel_component_api::ConcurrencyModel;
32use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
33use camel_processor::{
34 ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
35 StreamCacheService, UnmarshalService, builtin_data_format,
36};
37
38pub trait StepAccumulator: Sized {
44 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
45
46 fn to(mut self, endpoint: impl Into<String>) -> Self {
47 self.steps_mut().push(BuilderStep::To(endpoint.into()));
48 self
49 }
50
51 fn process<F, Fut>(mut self, f: F) -> Self
52 where
53 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
54 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
55 {
56 let svc = ProcessorFn::new(f);
57 self.steps_mut()
58 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
59 self
60 }
61
62 fn process_fn(mut self, processor: BoxProcessor) -> Self {
63 self.steps_mut().push(BuilderStep::Processor(processor));
64 self
65 }
66
67 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
68 let svc = SetHeader::new(IdentityProcessor, key, value);
69 self.steps_mut()
70 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
71 self
72 }
73
74 fn map_body<F>(mut self, mapper: F) -> Self
75 where
76 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
77 {
78 let svc = MapBody::new(IdentityProcessor, mapper);
79 self.steps_mut()
80 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
81 self
82 }
83
84 fn set_body<B>(mut self, body: B) -> Self
85 where
86 B: Into<Body> + Clone + Send + Sync + 'static,
87 {
88 let body: Body = body.into();
89 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
90 self.steps_mut()
91 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
92 self
93 }
94
95 fn transform<B>(self, body: B) -> Self
100 where
101 B: Into<Body> + Clone + Send + Sync + 'static,
102 {
103 self.set_body(body)
104 }
105
106 fn set_body_fn<F>(mut self, expr: F) -> Self
107 where
108 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
109 {
110 let svc = SetBody::new(IdentityProcessor, expr);
111 self.steps_mut()
112 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
113 self
114 }
115
116 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
117 where
118 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
119 {
120 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
121 self.steps_mut()
122 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
123 self
124 }
125
126 fn aggregate(mut self, config: AggregatorConfig) -> Self {
127 self.steps_mut().push(BuilderStep::Aggregate { config });
128 self
129 }
130
131 fn stop(mut self) -> Self {
137 self.steps_mut().push(BuilderStep::Stop);
138 self
139 }
140
141 fn delay(mut self, duration: std::time::Duration) -> Self {
142 self.steps_mut().push(BuilderStep::Delay {
143 config: DelayConfig::from_duration(duration),
144 });
145 self
146 }
147
148 fn delay_with_header(
149 mut self,
150 duration: std::time::Duration,
151 header: impl Into<String>,
152 ) -> Self {
153 self.steps_mut().push(BuilderStep::Delay {
154 config: DelayConfig::from_duration_with_header(duration, header),
155 });
156 self
157 }
158
159 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
163 self.steps_mut().push(BuilderStep::Log {
164 level,
165 message: message.into(),
166 });
167 self
168 }
169
170 fn convert_body_to(mut self, target: BodyType) -> Self {
182 let svc = ConvertBodyTo::new(IdentityProcessor, target);
183 self.steps_mut()
184 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
185 self
186 }
187
188 fn stream_cache(mut self, threshold: usize) -> Self {
189 let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
190 let svc = StreamCacheService::new(IdentityProcessor, config);
191 self.steps_mut()
192 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
193 self
194 }
195
196 fn stream_cache_default(self) -> Self {
200 self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
201 }
202
203 fn marshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
214 let name = format.into();
215 let df = builtin_data_format(&name)
216 .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
217 let svc = MarshalService::new(IdentityProcessor, df);
218 self.steps_mut()
219 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
220 Ok(self)
221 }
222
223 fn unmarshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
234 let name = format.into();
235 let df = builtin_data_format(&name)
236 .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
237 let svc = UnmarshalService::new(IdentityProcessor, df);
238 self.steps_mut()
239 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
240 Ok(self)
241 }
242
243 fn validate(mut self, schema_path: impl Into<String>) -> Self {
252 let path = schema_path.into();
253 let uri = if path.starts_with("validator:") {
254 path
255 } else {
256 format!("validator:{path}")
257 };
258 self.steps_mut().push(BuilderStep::To(uri));
259 self
260 }
261
262 fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
273 self.steps_mut().push(BuilderStep::Script {
274 language: language.into(),
275 script: script.into(),
276 });
277 self
278 }
279
280 fn enrich(mut self, uri: impl Into<String>) -> Self {
286 self.steps_mut().push(BuilderStep::Enrich {
287 uri: uri.into(),
288 strategy: None,
289 timeout_ms: None,
290 });
291 self
292 }
293
294 fn poll_enrich(mut self, uri: impl Into<String>, timeout_ms: u64) -> Self {
300 self.steps_mut().push(BuilderStep::PollEnrich {
301 uri: uri.into(),
302 strategy: None,
303 timeout_ms: Some(timeout_ms),
304 });
305 self
306 }
307
308 fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
309 self.steps_mut().push(BuilderStep::Bean {
310 name: name.into(),
311 method: method.into(),
312 });
313 self
314 }
315}
316
317pub struct RouteBuilder {
332 from_uri: String,
333 steps: Vec<BuilderStep>,
334 error_handler: Option<ErrorHandlerConfig>,
335 error_handler_mode: ErrorHandlerMode,
336 circuit_breaker_config: Option<CircuitBreakerConfig>,
337 security_policy_config: Option<camel_api::security_policy::SecurityPolicyConfig>,
338 security_authenticator: Option<std::sync::Arc<dyn camel_auth::TokenAuthenticator>>,
339 concurrency: Option<ConcurrencyModel>,
340 route_id: Option<String>,
341 auto_startup: Option<bool>,
342 startup_order: Option<i32>,
343}
344
345#[derive(Default)]
346enum ErrorHandlerMode {
347 #[default]
348 None,
349 ExplicitConfig,
350 Shorthand {
351 dlc_uri: Option<String>,
352 specs: Vec<OnExceptionSpec>,
353 },
354 Mixed,
355}
356
357#[derive(Clone)]
358struct OnExceptionSpec {
359 matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
360 retry: Option<RedeliveryPolicy>,
361 handled_by: Option<String>,
362}
363
364impl RouteBuilder {
365 pub fn from(endpoint: &str) -> Self {
367 Self {
368 from_uri: endpoint.to_string(),
369 steps: Vec::new(),
370 error_handler: None,
371 error_handler_mode: ErrorHandlerMode::None,
372 circuit_breaker_config: None,
373 security_policy_config: None,
374 security_authenticator: None,
375 concurrency: None,
376 route_id: None,
377 auto_startup: None,
378 startup_order: None,
379 }
380 }
381
382 pub fn filter<F>(self, predicate: F) -> FilterBuilder
386 where
387 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
388 {
389 FilterBuilder {
390 parent: self,
391 predicate: std::sync::Arc::new(predicate),
392 steps: vec![],
393 }
394 }
395
396 pub fn choice(self) -> ChoiceBuilder {
402 ChoiceBuilder {
403 parent: self,
404 whens: vec![],
405 _otherwise: None,
406 }
407 }
408
409 pub fn wire_tap(mut self, endpoint: &str) -> Self {
413 self.steps.push(BuilderStep::WireTap {
414 uri: endpoint.to_string(),
415 });
416 self
417 }
418
419 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
421 self.error_handler_mode = match self.error_handler_mode {
422 ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
423 ErrorHandlerMode::ExplicitConfig
424 }
425 ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
426 };
427 self.error_handler = Some(config);
428 self
429 }
430
431 pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
433 let uri = uri.into();
434 self.error_handler_mode = match self.error_handler_mode {
435 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
436 dlc_uri: Some(uri),
437 specs: Vec::new(),
438 },
439 ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
440 dlc_uri: Some(uri),
441 specs,
442 },
443 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
444 };
445 self
446 }
447
448 pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
450 where
451 F: Fn(&CamelError) -> bool + Send + Sync + 'static,
452 {
453 self.error_handler_mode = match self.error_handler_mode {
454 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
455 dlc_uri: None,
456 specs: Vec::new(),
457 },
458 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
459 shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
460 };
461
462 OnExceptionBuilder {
463 parent: self,
464 policy: OnExceptionSpec {
465 matches: std::sync::Arc::new(matches),
466 retry: None,
467 handled_by: None,
468 },
469 }
470 }
471
472 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
474 self.circuit_breaker_config = Some(config);
475 self
476 }
477
478 pub fn security_policy(
479 mut self,
480 config: camel_api::security_policy::SecurityPolicyConfig,
481 ) -> Self {
482 self.security_policy_config = Some(config);
483 self
484 }
485
486 pub fn security_authenticator(
487 mut self,
488 auth: std::sync::Arc<dyn camel_auth::TokenAuthenticator>,
489 ) -> Self {
490 self.security_authenticator = Some(auth);
491 self
492 }
493
494 pub fn concurrent(mut self, max: usize) -> Self {
508 let max = if max == 0 { None } else { Some(max) };
509 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
510 self
511 }
512
513 pub fn sequential(mut self) -> Self {
518 self.concurrency = Some(ConcurrencyModel::Sequential);
519 self
520 }
521
522 pub fn route_id(mut self, id: impl Into<String>) -> Self {
526 self.route_id = Some(id.into());
527 self
528 }
529
530 pub fn auto_startup(mut self, auto: bool) -> Self {
534 self.auto_startup = Some(auto);
535 self
536 }
537
538 pub fn startup_order(mut self, order: i32) -> Self {
542 self.startup_order = Some(order);
543 self
544 }
545
546 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
552 SplitBuilder {
553 parent: self,
554 config,
555 steps: Vec::new(),
556 }
557 }
558
559 pub fn multicast(self) -> MulticastBuilder {
565 MulticastBuilder {
566 parent: self,
567 steps: Vec::new(),
568 config: MulticastConfig::new(),
569 }
570 }
571
572 pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
579 ThrottleBuilder {
580 parent: self,
581 config: ThrottlerConfig::new(max_requests, period),
582 steps: Vec::new(),
583 }
584 }
585
586 pub fn loop_count(self, count: usize) -> LoopBuilder {
588 LoopBuilder {
589 parent: self,
590 config: LoopConfig {
591 mode: LoopMode::Count(count),
592 },
593 steps: vec![],
594 }
595 }
596
597 pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
599 where
600 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
601 {
602 LoopBuilder {
603 parent: self,
604 config: LoopConfig {
605 mode: LoopMode::While(std::sync::Arc::new(predicate)),
606 },
607 steps: vec![],
608 }
609 }
610
611 pub fn load_balance(self) -> LoadBalancerBuilder {
617 LoadBalancerBuilder {
618 parent: self,
619 config: LoadBalancerConfig::round_robin(),
620 steps: Vec::new(),
621 }
622 }
623
624 pub fn dynamic_router(self, expression: RouterExpression) -> Self {
640 self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
641 }
642
643 pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
647 self.steps.push(BuilderStep::DynamicRouter { config });
648 self
649 }
650
651 pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
652 self.routing_slip_with_config(RoutingSlipConfig::new(expression))
653 }
654
655 pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
656 self.steps.push(BuilderStep::RoutingSlip { config });
657 self
658 }
659
660 pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
661 self.recipient_list_with_config(RecipientListConfig::new(expression))
662 }
663
664 pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
665 self.steps.push(BuilderStep::RecipientList { config });
666 self
667 }
668
669 pub fn build(self) -> Result<RouteDefinition, CamelError> {
675 validate_uri(&self.from_uri)?;
676 let route_id = self
677 .route_id
678 .filter(|s| !s.trim().is_empty())
679 .ok_or_else(|| {
680 CamelError::RouteError(
681 "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
682 .to_string(),
683 )
684 })?;
685 let resolved_error_handler = match self.error_handler_mode {
686 ErrorHandlerMode::None => self.error_handler,
687 ErrorHandlerMode::ExplicitConfig => self.error_handler,
688 ErrorHandlerMode::Mixed => {
689 return Err(CamelError::RouteError(
690 "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
691 ));
692 }
693 ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
694 let mut config = if let Some(uri) = dlc_uri {
695 ErrorHandlerConfig::dead_letter_channel(uri)
696 } else {
697 ErrorHandlerConfig::log_only()
698 };
699
700 for spec in specs {
701 let matcher = spec.matches.clone();
702 let mut builder = config.on_exception(move |e| matcher(e));
703
704 if let Some(retry) = spec.retry {
705 builder = builder.retry(retry.max_attempts).with_backoff(
706 retry.initial_delay,
707 retry.multiplier,
708 retry.max_delay,
709 );
710 if retry.jitter_factor > 0.0 {
711 builder = builder.with_jitter(retry.jitter_factor);
712 }
713 }
714
715 if let Some(uri) = spec.handled_by {
716 builder = builder.handled_by(uri);
717 }
718
719 config = builder.build();
720 }
721
722 Some(config)
723 }
724 };
725
726 let definition = RouteDefinition::new(self.from_uri, self.steps);
727 let definition = if let Some(eh) = resolved_error_handler {
728 definition.with_error_handler(eh)
729 } else {
730 definition
731 };
732 let definition = if let Some(cb) = self.circuit_breaker_config {
733 definition.with_circuit_breaker(cb)
734 } else {
735 definition
736 };
737 let definition = if let Some(sp) = self.security_policy_config {
738 definition.with_security_policy(sp)
739 } else {
740 definition
741 };
742 let definition = if let Some(auth) = self.security_authenticator {
743 definition.with_security_authenticator(auth)
744 } else {
745 definition
746 };
747 let definition = if let Some(concurrency) = self.concurrency {
748 definition.with_concurrency(concurrency)
749 } else {
750 definition
751 };
752 let definition = definition.with_route_id(route_id);
753 let definition = if let Some(auto) = self.auto_startup {
754 definition.with_auto_startup(auto)
755 } else {
756 definition
757 };
758 let definition = if let Some(order) = self.startup_order {
759 definition.with_startup_order(order)
760 } else {
761 definition
762 };
763 Ok(definition)
764 }
765
766 pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
768 validate_uri(&self.from_uri)?;
769 let route_id = self
770 .route_id
771 .filter(|s| !s.trim().is_empty())
772 .ok_or_else(|| {
773 CamelError::RouteError(
774 "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
775 .to_string(),
776 )
777 })?;
778
779 let steps = canonicalize_steps(self.steps)?;
780 let circuit_breaker = self
781 .circuit_breaker_config
782 .map(canonicalize_circuit_breaker);
783
784 if self.security_policy_config.is_some() {
785 return Err(CamelError::RouteError(
786 "routes with security_policy cannot use the canonical/hot-reload path (not yet supported)"
787 .into(),
788 ));
789 }
790
791 let spec = CanonicalRouteSpec {
792 route_id,
793 from: self.from_uri,
794 steps,
795 circuit_breaker,
796 auto_startup: None,
797 startup_order: None,
798 concurrency: None,
799 version: camel_api::CANONICAL_CONTRACT_VERSION,
800 };
801 spec.validate_contract()?;
802 Ok(spec)
803 }
804}
805
806pub struct OnExceptionBuilder {
807 parent: RouteBuilder,
808 policy: OnExceptionSpec,
809}
810
811impl OnExceptionBuilder {
812 pub fn retry(mut self, max_attempts: u32) -> Self {
813 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
814 self
815 }
816
817 pub fn with_backoff(
818 mut self,
819 initial: std::time::Duration,
820 multiplier: f64,
821 max: std::time::Duration,
822 ) -> Self {
823 if let Some(ref mut retry) = self.policy.retry {
824 retry.initial_delay = initial;
825 retry.multiplier = multiplier;
826 retry.max_delay = max;
827 } else {
828 tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
829 }
830 self
831 }
832
833 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
834 if let Some(ref mut retry) = self.policy.retry {
835 retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
836 } else {
837 tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
838 }
839 self
840 }
841
842 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
843 self.policy.handled_by = Some(uri.into());
844 self
845 }
846
847 pub fn end_on_exception(mut self) -> RouteBuilder {
848 if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
849 specs.push(self.policy);
850 }
851 self.parent
852 }
853}
854
855fn validate_uri(uri: &str) -> Result<(), CamelError> {
857 let trimmed = uri.trim();
858 if trimmed.is_empty() {
859 return Err(CamelError::RouteError(
860 "route must have a 'from' URI".to_string(),
861 ));
862 }
863 if !trimmed.contains(':') {
864 return Err(CamelError::RouteError(
865 "URI must have a scheme (e.g. 'timer:tick')".to_string(),
866 ));
867 }
868 let scheme = trimmed.split(':').next().unwrap_or("");
869 if scheme.trim().is_empty() {
870 return Err(CamelError::RouteError(
871 "URI scheme must not be empty".to_string(),
872 ));
873 }
874 Ok(())
875}
876
877fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
878 let mut canonical = Vec::with_capacity(steps.len());
879 for step in steps {
880 canonical.push(canonicalize_step(step)?);
881 }
882 Ok(canonical)
883}
884
885fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
886 match step {
887 BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
888 BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
889 BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
890 BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
891 BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
892 delay_ms: config.delay_ms,
893 dynamic_header: config.dynamic_header,
894 }),
895 BuilderStep::DeclarativeScript { expression } => {
896 Ok(CanonicalStepSpec::Script { expression })
897 }
898 BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
899 predicate,
900 steps: canonicalize_steps(steps)?,
901 }),
902 BuilderStep::DeclarativeChoice { whens, otherwise } => {
903 let mut canonical_whens = Vec::with_capacity(whens.len());
904 for DeclarativeWhenStep { predicate, steps } in whens {
905 canonical_whens.push(CanonicalWhenSpec {
906 predicate,
907 steps: canonicalize_steps(steps)?,
908 });
909 }
910 let otherwise = match otherwise {
911 Some(steps) => Some(canonicalize_steps(steps)?),
912 None => None,
913 };
914 Ok(CanonicalStepSpec::Choice {
915 whens: canonical_whens,
916 otherwise,
917 })
918 }
919 BuilderStep::DeclarativeSplit {
920 expression,
921 aggregation,
922 parallel,
923 parallel_limit,
924 stop_on_exception,
925 steps,
926 } => Ok(CanonicalStepSpec::Split {
927 expression: CanonicalSplitExpressionSpec::Language(expression),
928 aggregation: canonicalize_split_aggregation(aggregation)?,
929 parallel,
930 parallel_limit,
931 stop_on_exception,
932 steps: canonicalize_steps(steps)?,
933 }),
934 BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate(
935 canonicalize_aggregate(config)?,
936 )),
937 other => {
938 let step_name = canonical_step_name(&other);
939 let detail = camel_api::canonical_contract_rejection_reason(step_name)
940 .unwrap_or("not included in canonical v1");
941 Err(CamelError::RouteError(format!(
942 "canonical v1 does not support step `{step_name}`: {detail}"
943 )))
944 }
945 }
946}
947
948fn canonicalize_split_aggregation(
949 strategy: camel_api::splitter::AggregationStrategy,
950) -> Result<CanonicalSplitAggregationSpec, CamelError> {
951 match strategy {
952 camel_api::splitter::AggregationStrategy::LastWins => {
953 Ok(CanonicalSplitAggregationSpec::LastWins)
954 }
955 camel_api::splitter::AggregationStrategy::CollectAll => {
956 Ok(CanonicalSplitAggregationSpec::CollectAll)
957 }
958 camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
959 "canonical v1 does not support custom split aggregation".to_string(),
960 )),
961 camel_api::splitter::AggregationStrategy::Original => {
962 Ok(CanonicalSplitAggregationSpec::Original)
963 }
964 }
965}
966
967fn extract_completion_fields(
968 mode: &CompletionMode,
969) -> Result<(Option<usize>, Option<u64>), CamelError> {
970 match mode {
971 CompletionMode::Single(cond) => match cond {
972 CompletionCondition::Size(n) => Ok((Some(*n), None)),
973 CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
974 CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
975 "canonical v1 does not support aggregate predicate completion".to_string(),
976 )),
977 },
978 CompletionMode::Any(conds) => {
979 let mut size = None;
980 let mut timeout_ms = None;
981 for cond in conds {
982 match cond {
983 CompletionCondition::Size(n) => size = Some(*n),
984 CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
985 CompletionCondition::Predicate(_) => {
986 return Err(CamelError::RouteError(
987 "canonical v1 does not support aggregate predicate completion"
988 .to_string(),
989 ));
990 }
991 }
992 }
993 Ok((size, timeout_ms))
994 }
995 }
996}
997
998fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
999 let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
1000
1001 let header = match &config.correlation {
1002 CorrelationStrategy::HeaderName(h) => h.clone(),
1003 CorrelationStrategy::Expression { expr, .. } => expr.clone(),
1004 CorrelationStrategy::Fn(_) => {
1005 return Err(CamelError::RouteError(
1006 "canonical v1 does not support Fn correlation strategy".to_string(),
1007 ));
1008 }
1009 };
1010
1011 let correlation_key = match &config.correlation {
1012 CorrelationStrategy::HeaderName(_) => None,
1013 CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
1014 CorrelationStrategy::Fn(_) => unreachable!(),
1015 };
1016
1017 let strategy = match config.strategy {
1018 AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
1019 AggregationStrategy::Custom(_) => {
1020 return Err(CamelError::RouteError(
1021 "canonical v1 does not support custom aggregate strategy".to_string(),
1022 ));
1023 }
1024 };
1025 let bucket_ttl_ms = config
1026 .bucket_ttl
1027 .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
1028
1029 Ok(CanonicalAggregateSpec {
1030 header,
1031 completion_size,
1032 completion_timeout_ms,
1033 correlation_key,
1034 force_completion_on_stop: if config.force_completion_on_stop {
1035 Some(true)
1036 } else {
1037 None
1038 },
1039 discard_on_timeout: if config.discard_on_timeout {
1040 Some(true)
1041 } else {
1042 None
1043 },
1044 strategy,
1045 max_buckets: config.max_buckets,
1046 bucket_ttl_ms,
1047 })
1048}
1049
1050fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
1051 CanonicalCircuitBreakerSpec {
1052 failure_threshold: config.failure_threshold,
1053 open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
1054 }
1055}
1056
1057fn canonical_step_name(step: &BuilderStep) -> &'static str {
1058 match step {
1059 BuilderStep::Processor(_) => "processor",
1060 BuilderStep::To(_) => "to",
1061 BuilderStep::Stop => "stop",
1062 BuilderStep::Log { .. } => "log",
1063 BuilderStep::DeclarativeSetHeader { .. } => "set_header",
1064 BuilderStep::DeclarativeSetBody { .. } => "set_body",
1065 BuilderStep::DeclarativeFilter { .. } => "filter",
1066 BuilderStep::DeclarativeChoice { .. } => "choice",
1067 BuilderStep::DeclarativeScript { .. } => "script",
1068 BuilderStep::DeclarativeFunction { .. } => "function",
1069 BuilderStep::DeclarativeSplit { .. } => "split",
1070 BuilderStep::Split { .. } => "split",
1071 BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
1072 BuilderStep::Aggregate { .. } => "aggregate",
1073 BuilderStep::Filter { .. } => "filter",
1074 BuilderStep::Choice { .. } => "choice",
1075 BuilderStep::WireTap { .. } => "wire_tap",
1076 BuilderStep::Delay { .. } => "delay",
1077 BuilderStep::Multicast { .. } => "multicast",
1078 BuilderStep::DeclarativeLog { .. } => "log",
1079 BuilderStep::Bean { .. } => "bean",
1080 BuilderStep::Script { .. } => "script",
1081 BuilderStep::Throttle { .. } => "throttle",
1082 BuilderStep::LoadBalance { .. } => "load_balancer",
1083 BuilderStep::DynamicRouter { .. } => "dynamic_router",
1084 BuilderStep::RoutingSlip { .. } => "routing_slip",
1085 BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
1086 BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
1087 BuilderStep::RecipientList { .. } => "recipient_list",
1088 BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
1089 BuilderStep::DeclarativeSetProperty { .. } => "set_property",
1090 BuilderStep::DeclarativeStreamSplit { .. } => "stream_split",
1091 BuilderStep::Enrich { .. } => "enrich",
1092 BuilderStep::PollEnrich { .. } => "poll_enrich",
1093 }
1094}
1095
1096impl StepAccumulator for RouteBuilder {
1097 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1098 &mut self.steps
1099 }
1100}
1101
1102pub struct SplitBuilder {
1110 parent: RouteBuilder,
1111 config: SplitterConfig,
1112 steps: Vec<BuilderStep>,
1113}
1114
1115impl SplitBuilder {
1116 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1118 where
1119 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1120 {
1121 FilterInSplitBuilder {
1122 parent: self,
1123 predicate: std::sync::Arc::new(predicate),
1124 steps: vec![],
1125 }
1126 }
1127
1128 pub fn end_split(mut self) -> RouteBuilder {
1131 let split_step = BuilderStep::Split {
1132 config: self.config,
1133 steps: self.steps,
1134 };
1135 self.parent.steps.push(split_step);
1136 self.parent
1137 }
1138}
1139
1140impl StepAccumulator for SplitBuilder {
1141 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1142 &mut self.steps
1143 }
1144}
1145
1146pub struct FilterBuilder {
1148 parent: RouteBuilder,
1149 predicate: FilterPredicate,
1150 steps: Vec<BuilderStep>,
1151}
1152
1153impl FilterBuilder {
1154 pub fn end_filter(mut self) -> RouteBuilder {
1157 let step = BuilderStep::Filter {
1158 predicate: self.predicate,
1159 steps: self.steps,
1160 };
1161 self.parent.steps.push(step);
1162 self.parent
1163 }
1164}
1165
1166impl StepAccumulator for FilterBuilder {
1167 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1168 &mut self.steps
1169 }
1170}
1171
1172pub struct FilterInSplitBuilder {
1174 parent: SplitBuilder,
1175 predicate: FilterPredicate,
1176 steps: Vec<BuilderStep>,
1177}
1178
1179impl FilterInSplitBuilder {
1180 pub fn end_filter(mut self) -> SplitBuilder {
1182 let step = BuilderStep::Filter {
1183 predicate: self.predicate,
1184 steps: self.steps,
1185 };
1186 self.parent.steps.push(step);
1187 self.parent
1188 }
1189}
1190
1191impl StepAccumulator for FilterInSplitBuilder {
1192 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1193 &mut self.steps
1194 }
1195}
1196
1197pub struct ChoiceBuilder {
1204 parent: RouteBuilder,
1205 whens: Vec<WhenStep>,
1206 _otherwise: Option<Vec<BuilderStep>>,
1207}
1208
1209impl ChoiceBuilder {
1210 pub fn when<F>(self, predicate: F) -> WhenBuilder
1213 where
1214 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1215 {
1216 WhenBuilder {
1217 parent: self,
1218 predicate: std::sync::Arc::new(predicate),
1219 steps: vec![],
1220 }
1221 }
1222
1223 pub fn otherwise(self) -> OtherwiseBuilder {
1227 OtherwiseBuilder {
1228 parent: self,
1229 steps: vec![],
1230 }
1231 }
1232
1233 pub fn end_choice(mut self) -> RouteBuilder {
1237 let step = BuilderStep::Choice {
1238 whens: self.whens,
1239 otherwise: self._otherwise,
1240 };
1241 self.parent.steps.push(step);
1242 self.parent
1243 }
1244}
1245
1246pub struct WhenBuilder {
1248 parent: ChoiceBuilder,
1249 predicate: camel_api::FilterPredicate,
1250 steps: Vec<BuilderStep>,
1251}
1252
1253impl WhenBuilder {
1254 pub fn end_when(mut self) -> ChoiceBuilder {
1257 self.parent.whens.push(WhenStep {
1258 predicate: self.predicate,
1259 steps: self.steps,
1260 });
1261 self.parent
1262 }
1263}
1264
1265impl StepAccumulator for WhenBuilder {
1266 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1267 &mut self.steps
1268 }
1269}
1270
1271pub struct OtherwiseBuilder {
1273 parent: ChoiceBuilder,
1274 steps: Vec<BuilderStep>,
1275}
1276
1277impl OtherwiseBuilder {
1278 pub fn end_otherwise(self) -> ChoiceBuilder {
1280 let OtherwiseBuilder { mut parent, steps } = self;
1281 parent._otherwise = Some(steps);
1282 parent
1283 }
1284}
1285
1286impl StepAccumulator for OtherwiseBuilder {
1287 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1288 &mut self.steps
1289 }
1290}
1291
1292pub struct MulticastBuilder {
1300 parent: RouteBuilder,
1301 steps: Vec<BuilderStep>,
1302 config: MulticastConfig,
1303}
1304
1305impl MulticastBuilder {
1306 pub fn parallel(mut self, parallel: bool) -> Self {
1307 self.config = self.config.parallel(parallel);
1308 self
1309 }
1310
1311 pub fn parallel_limit(mut self, limit: usize) -> Self {
1312 self.config = self.config.parallel_limit(limit);
1313 self
1314 }
1315
1316 pub fn stop_on_exception(mut self, stop: bool) -> Self {
1317 self.config = self.config.stop_on_exception(stop);
1318 self
1319 }
1320
1321 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1322 self.config = self.config.timeout(duration);
1323 self
1324 }
1325
1326 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1327 self.config = self.config.aggregation(strategy);
1328 self
1329 }
1330
1331 pub fn end_multicast(mut self) -> RouteBuilder {
1332 let step = BuilderStep::Multicast {
1333 steps: self.steps,
1334 config: self.config,
1335 };
1336 self.parent.steps.push(step);
1337 self.parent
1338 }
1339}
1340
1341impl StepAccumulator for MulticastBuilder {
1342 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1343 &mut self.steps
1344 }
1345}
1346
1347pub struct ThrottleBuilder {
1355 parent: RouteBuilder,
1356 config: ThrottlerConfig,
1357 steps: Vec<BuilderStep>,
1358}
1359
1360impl ThrottleBuilder {
1361 pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1367 self.config = self.config.strategy(strategy);
1368 self
1369 }
1370
1371 pub fn end_throttle(mut self) -> RouteBuilder {
1374 let step = BuilderStep::Throttle {
1375 config: self.config,
1376 steps: self.steps,
1377 };
1378 self.parent.steps.push(step);
1379 self.parent
1380 }
1381}
1382
1383impl StepAccumulator for ThrottleBuilder {
1384 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1385 &mut self.steps
1386 }
1387}
1388
1389pub struct LoopBuilder {
1391 parent: RouteBuilder,
1392 config: LoopConfig,
1393 steps: Vec<BuilderStep>,
1394}
1395
1396impl LoopBuilder {
1397 pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1398 LoopInLoopBuilder {
1399 parent: self,
1400 config: LoopConfig {
1401 mode: LoopMode::Count(count),
1402 },
1403 steps: vec![],
1404 }
1405 }
1406
1407 pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1408 where
1409 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1410 {
1411 LoopInLoopBuilder {
1412 parent: self,
1413 config: LoopConfig {
1414 mode: LoopMode::While(std::sync::Arc::new(predicate)),
1415 },
1416 steps: vec![],
1417 }
1418 }
1419
1420 pub fn end_loop(mut self) -> RouteBuilder {
1421 let step = BuilderStep::Loop {
1422 config: self.config,
1423 steps: self.steps,
1424 };
1425 self.parent.steps.push(step);
1426 self.parent
1427 }
1428}
1429
1430impl StepAccumulator for LoopBuilder {
1431 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1432 &mut self.steps
1433 }
1434}
1435
1436pub struct LoopInLoopBuilder {
1437 parent: LoopBuilder,
1438 config: LoopConfig,
1439 steps: Vec<BuilderStep>,
1440}
1441
1442impl LoopInLoopBuilder {
1443 pub fn end_loop(mut self) -> LoopBuilder {
1444 let step = BuilderStep::Loop {
1445 config: self.config,
1446 steps: self.steps,
1447 };
1448 self.parent.steps.push(step);
1449 self.parent
1450 }
1451}
1452
1453impl StepAccumulator for LoopInLoopBuilder {
1454 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1455 &mut self.steps
1456 }
1457}
1458
1459pub struct LoadBalancerBuilder {
1467 parent: RouteBuilder,
1468 config: LoadBalancerConfig,
1469 steps: Vec<BuilderStep>,
1470}
1471
1472impl LoadBalancerBuilder {
1473 pub fn round_robin(mut self) -> Self {
1475 self.config = LoadBalancerConfig::round_robin();
1476 self
1477 }
1478
1479 pub fn random(mut self) -> Self {
1481 self.config = LoadBalancerConfig::random();
1482 self
1483 }
1484
1485 pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1490 self.config = LoadBalancerConfig::weighted(weights);
1491 self
1492 }
1493
1494 pub fn failover(mut self) -> Self {
1499 self.config = LoadBalancerConfig::failover();
1500 self
1501 }
1502
1503 pub fn parallel(mut self, parallel: bool) -> Self {
1508 self.config = self.config.parallel(parallel);
1509 self
1510 }
1511
1512 pub fn end_load_balance(mut self) -> RouteBuilder {
1515 let step = BuilderStep::LoadBalance {
1516 config: self.config,
1517 steps: self.steps,
1518 };
1519 self.parent.steps.push(step);
1520 self.parent
1521 }
1522}
1523
1524impl StepAccumulator for LoadBalancerBuilder {
1525 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1526 &mut self.steps
1527 }
1528}
1529
1530#[cfg(test)]
1535mod tests {
1536 use super::*;
1537 use camel_api::error_handler::ErrorHandlerConfig;
1538 use camel_api::load_balancer::LoadBalanceStrategy;
1539 use camel_api::{Exchange, Message};
1540 use camel_core::route::BuilderStep;
1541 use std::sync::Arc;
1542 use std::time::Duration;
1543 use tower::{Service, ServiceExt};
1544
1545 #[test]
1546 fn test_builder_from_creates_definition() {
1547 let definition = RouteBuilder::from("timer:tick")
1548 .route_id("test-route")
1549 .build()
1550 .unwrap();
1551 assert_eq!(definition.from_uri(), "timer:tick");
1552 }
1553
1554 #[test]
1555 fn test_builder_empty_from_uri_errors() {
1556 let result = RouteBuilder::from("").route_id("test-route").build();
1557 assert!(result.is_err());
1558 }
1559
1560 #[test]
1561 fn test_build_rejects_schemeless_uri() {
1562 let result = RouteBuilder::from("no-scheme-here")
1563 .route_id("test-route")
1564 .build();
1565 match result {
1566 Err(err) => {
1567 let err_msg = format!("{err}");
1568 assert!(
1569 err_msg.contains("scheme"),
1570 "expected scheme-related error, got: {err_msg}"
1571 );
1572 }
1573 Ok(_) => panic!("schemeless URI should fail"),
1574 }
1575 }
1576
1577 #[test]
1578 fn test_build_rejects_empty_scheme_uri() {
1579 let result = RouteBuilder::from(":missing-scheme")
1580 .route_id("test-route")
1581 .build();
1582 match result {
1583 Err(err) => {
1584 let err_msg = format!("{err}");
1585 assert!(
1586 err_msg.contains("scheme"),
1587 "expected scheme-related error, got: {err_msg}"
1588 );
1589 }
1590 Ok(_) => panic!("empty-scheme URI should fail"),
1591 }
1592 }
1593
1594 #[test]
1595 fn test_build_accepts_valid_uri() {
1596 let result = RouteBuilder::from("timer:tick")
1597 .route_id("test-route")
1598 .build();
1599 assert!(result.is_ok());
1600 }
1601
1602 #[test]
1603 fn test_build_canonical_rejects_schemeless_uri() {
1604 let result = RouteBuilder::from("no-scheme-here")
1605 .route_id("test-route")
1606 .build_canonical();
1607 assert!(result.is_err());
1608 }
1609
1610 #[test]
1611 fn test_builder_to_adds_step() {
1612 let definition = RouteBuilder::from("timer:tick")
1613 .route_id("test-route")
1614 .to("log:info")
1615 .build()
1616 .unwrap();
1617
1618 assert_eq!(definition.from_uri(), "timer:tick");
1619 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1621 }
1622
1623 #[test]
1624 fn test_builder_filter_adds_filter_step() {
1625 let definition = RouteBuilder::from("timer:tick")
1626 .route_id("test-route")
1627 .filter(|_ex| true)
1628 .to("mock:result")
1629 .end_filter()
1630 .build()
1631 .unwrap();
1632
1633 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1634 }
1635
1636 #[test]
1637 fn test_builder_set_header_adds_processor_step() {
1638 let definition = RouteBuilder::from("timer:tick")
1639 .route_id("test-route")
1640 .set_header("key", Value::String("value".into()))
1641 .build()
1642 .unwrap();
1643
1644 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1645 }
1646
1647 #[test]
1648 fn test_builder_map_body_adds_processor_step() {
1649 let definition = RouteBuilder::from("timer:tick")
1650 .route_id("test-route")
1651 .map_body(|body| body)
1652 .build()
1653 .unwrap();
1654
1655 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1656 }
1657
1658 #[test]
1659 fn test_builder_process_adds_processor_step() {
1660 let definition = RouteBuilder::from("timer:tick")
1661 .route_id("test-route")
1662 .process(|ex| async move { Ok(ex) })
1663 .build()
1664 .unwrap();
1665
1666 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1667 }
1668
1669 #[test]
1670 fn test_builder_chain_multiple_steps() {
1671 let definition = RouteBuilder::from("timer:tick")
1672 .route_id("test-route")
1673 .set_header("source", Value::String("timer".into()))
1674 .filter(|ex| ex.input.header("source").is_some())
1675 .to("log:info")
1676 .end_filter()
1677 .to("mock:result")
1678 .build()
1679 .unwrap();
1680
1681 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1685 }
1686
1687 #[test]
1688 fn test_loop_count_builder() {
1689 use camel_api::loop_eip::LoopMode;
1690
1691 let def = RouteBuilder::from("direct:start")
1692 .route_id("loop-test")
1693 .loop_count(3)
1694 .to("mock:inside")
1695 .end_loop()
1696 .to("mock:after")
1697 .build()
1698 .unwrap();
1699
1700 assert_eq!(def.steps().len(), 2);
1701 match &def.steps()[0] {
1702 BuilderStep::Loop { config, steps } => {
1703 assert!(matches!(config.mode, LoopMode::Count(3)));
1704 assert_eq!(steps.len(), 1);
1705 }
1706 other => panic!("Expected Loop, got {:?}", other),
1707 }
1708 assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1709 }
1710
1711 #[test]
1712 fn test_loop_while_builder() {
1713 use camel_api::loop_eip::LoopMode;
1714
1715 let def = RouteBuilder::from("direct:start")
1716 .route_id("loop-while-test")
1717 .loop_while(|_ex| true)
1718 .to("mock:retry")
1719 .end_loop()
1720 .build()
1721 .unwrap();
1722
1723 assert_eq!(def.steps().len(), 1);
1724 match &def.steps()[0] {
1725 BuilderStep::Loop { config, steps } => {
1726 assert!(matches!(config.mode, LoopMode::While(_)));
1727 assert_eq!(steps.len(), 1);
1728 }
1729 other => panic!("Expected Loop, got {:?}", other),
1730 }
1731 }
1732
1733 #[test]
1734 fn test_nested_loop_builder() {
1735 use camel_api::loop_eip::LoopMode;
1736
1737 let def = RouteBuilder::from("direct:start")
1738 .route_id("nested-loop-test")
1739 .loop_count(2)
1740 .to("mock:outer")
1741 .loop_count(3)
1742 .to("mock:inner")
1743 .end_loop()
1744 .end_loop()
1745 .to("mock:after")
1746 .build()
1747 .unwrap();
1748
1749 assert_eq!(def.steps().len(), 2);
1750 match &def.steps()[0] {
1751 BuilderStep::Loop { steps, .. } => {
1752 assert_eq!(steps.len(), 2);
1753 match &steps[1] {
1754 BuilderStep::Loop {
1755 config,
1756 steps: inner_steps,
1757 } => {
1758 assert!(matches!(config.mode, LoopMode::Count(3)));
1759 assert_eq!(inner_steps.len(), 1);
1760 }
1761 other => panic!("Expected nested Loop, got {:?}", other),
1762 }
1763 }
1764 other => panic!("Expected outer Loop, got {:?}", other),
1765 }
1766 }
1767
1768 #[tokio::test]
1773 async fn test_set_header_processor_works() {
1774 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1775 let exchange = Exchange::new(Message::new("test"));
1776 let result = svc.call(exchange).await.unwrap();
1777 assert_eq!(
1778 result.input.header("greeting"),
1779 Some(&Value::String("hello".into()))
1780 );
1781 }
1782
1783 #[tokio::test]
1784 async fn test_filter_processor_passes() {
1785 use camel_api::BoxProcessorExt;
1786 use camel_processor::FilterService;
1787
1788 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1789 let mut svc =
1790 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1791 let exchange = Exchange::new(Message::new("pass"));
1792 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1793 assert_eq!(result.input.body.as_text(), Some("pass"));
1794 }
1795
1796 #[tokio::test]
1797 async fn test_filter_processor_blocks() {
1798 use camel_api::BoxProcessorExt;
1799 use camel_processor::FilterService;
1800
1801 let sub = BoxProcessor::from_fn(|_ex| {
1802 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1803 });
1804 let mut svc =
1805 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1806 let exchange = Exchange::new(Message::new("reject"));
1807 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1808 assert_eq!(result.input.body.as_text(), Some("reject"));
1809 }
1810
1811 #[tokio::test]
1812 async fn test_map_body_processor_works() {
1813 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1814 if let Some(text) = body.as_text() {
1815 Body::Text(text.to_uppercase())
1816 } else {
1817 body
1818 }
1819 });
1820 let exchange = Exchange::new(Message::new("hello"));
1821 let result = mapper.oneshot(exchange).await.unwrap();
1822 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1823 }
1824
1825 #[tokio::test]
1826 async fn test_process_custom_processor_works() {
1827 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1828 ex.set_property("custom", Value::Bool(true));
1829 Ok(ex)
1830 });
1831 let exchange = Exchange::new(Message::default());
1832 let result = processor.oneshot(exchange).await.unwrap();
1833 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1834 }
1835
1836 #[tokio::test]
1841 async fn test_compose_pipeline_runs_steps_in_order() {
1842 use camel_core::route::compose_pipeline;
1843
1844 let processors = vec![
1845 BoxProcessor::new(SetHeader::new(
1846 IdentityProcessor,
1847 "step",
1848 Value::String("one".into()),
1849 )),
1850 BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1851 if let Some(text) = body.as_text() {
1852 Body::Text(format!("{}-processed", text))
1853 } else {
1854 body
1855 }
1856 })),
1857 ];
1858
1859 let pipeline = compose_pipeline(processors);
1860 let exchange = Exchange::new(Message::new("hello"));
1861 let result = pipeline.oneshot(exchange).await.unwrap();
1862
1863 assert_eq!(
1864 result.input.header("step"),
1865 Some(&Value::String("one".into()))
1866 );
1867 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1868 }
1869
1870 #[tokio::test]
1871 async fn test_compose_pipeline_empty_is_identity() {
1872 use camel_core::route::compose_pipeline;
1873
1874 let pipeline = compose_pipeline(vec![]);
1875 let exchange = Exchange::new(Message::new("unchanged"));
1876 let result = pipeline.oneshot(exchange).await.unwrap();
1877 assert_eq!(result.input.body.as_text(), Some("unchanged"));
1878 }
1879
1880 #[test]
1885 fn test_builder_circuit_breaker_sets_config() {
1886 use camel_api::circuit_breaker::CircuitBreakerConfig;
1887
1888 let config = CircuitBreakerConfig::new().failure_threshold(5);
1889 let definition = RouteBuilder::from("timer:tick")
1890 .route_id("test-route")
1891 .circuit_breaker(config)
1892 .build()
1893 .unwrap();
1894
1895 let cb = definition
1896 .circuit_breaker_config()
1897 .expect("circuit breaker should be set");
1898 assert_eq!(cb.failure_threshold, 5);
1899 }
1900
1901 #[test]
1902 fn test_builder_circuit_breaker_with_error_handler() {
1903 use camel_api::circuit_breaker::CircuitBreakerConfig;
1904 use camel_api::error_handler::ErrorHandlerConfig;
1905
1906 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1907 let eh_config = ErrorHandlerConfig::log_only();
1908
1909 let definition = RouteBuilder::from("timer:tick")
1910 .route_id("test-route")
1911 .to("log:info")
1912 .circuit_breaker(cb_config)
1913 .error_handler(eh_config)
1914 .build()
1915 .unwrap();
1916
1917 assert!(
1918 definition.circuit_breaker_config().is_some(),
1919 "circuit breaker config should be set"
1920 );
1921 }
1923
1924 #[test]
1925 fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1926 let definition = RouteBuilder::from("direct:start")
1927 .route_id("test-route")
1928 .dead_letter_channel("log:dlc")
1929 .on_exception(|e| matches!(e, CamelError::Io(_)))
1930 .retry(3)
1931 .handled_by("log:io")
1932 .end_on_exception()
1933 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1934 .retry(1)
1935 .end_on_exception()
1936 .to("mock:out")
1937 .build()
1938 .expect("route should build");
1939
1940 let cfg = definition
1941 .error_handler_config()
1942 .expect("error handler should be set");
1943 assert_eq!(cfg.policies.len(), 2);
1944 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1945 assert_eq!(
1946 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1947 Some(3)
1948 );
1949 assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1950 assert_eq!(
1951 cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1952 Some(1)
1953 );
1954 }
1955
1956 #[test]
1957 fn test_builder_on_exception_mixed_mode_rejected() {
1958 let result = RouteBuilder::from("direct:start")
1959 .route_id("test-route")
1960 .error_handler(ErrorHandlerConfig::log_only())
1961 .on_exception(|_e| true)
1962 .end_on_exception()
1963 .to("mock:out")
1964 .build();
1965
1966 let err = result.err().expect("mixed mode should fail with an error");
1967
1968 assert!(
1969 format!("{err}").contains("mixed error handler modes"),
1970 "unexpected error: {err}"
1971 );
1972 }
1973
1974 #[test]
1975 fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1976 let definition = RouteBuilder::from("direct:start")
1977 .route_id("test-route")
1978 .on_exception(|_e| true)
1979 .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1980 .with_jitter(0.5)
1981 .end_on_exception()
1982 .to("mock:out")
1983 .build()
1984 .expect("route should build");
1985
1986 let cfg = definition
1987 .error_handler_config()
1988 .expect("error handler should be set");
1989 assert_eq!(cfg.policies.len(), 1);
1990 assert!(cfg.policies[0].retry.is_none());
1991 }
1992
1993 #[test]
1994 fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1995 let definition = RouteBuilder::from("direct:start")
1996 .route_id("test-route")
1997 .dead_letter_channel("log:dlc")
1998 .to("mock:out")
1999 .build()
2000 .expect("route should build");
2001
2002 let cfg = definition
2003 .error_handler_config()
2004 .expect("error handler should be set");
2005 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
2006 assert!(cfg.policies.is_empty());
2007 }
2008
2009 #[test]
2010 fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
2011 let definition = RouteBuilder::from("direct:start")
2012 .route_id("test-route")
2013 .dead_letter_channel("log:first")
2014 .on_exception(|e| matches!(e, CamelError::Io(_)))
2015 .retry(2)
2016 .end_on_exception()
2017 .dead_letter_channel("log:second")
2018 .to("mock:out")
2019 .build()
2020 .expect("route should build");
2021
2022 let cfg = definition
2023 .error_handler_config()
2024 .expect("error handler should be set");
2025 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2026 assert_eq!(cfg.policies.len(), 1);
2027 assert_eq!(
2028 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
2029 Some(2)
2030 );
2031 }
2032
2033 #[test]
2034 fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
2035 let definition = RouteBuilder::from("direct:start")
2036 .route_id("test-route")
2037 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
2038 .retry(1)
2039 .end_on_exception()
2040 .to("mock:out")
2041 .build()
2042 .expect("route should build");
2043
2044 let cfg = definition
2045 .error_handler_config()
2046 .expect("error handler should be set");
2047 assert!(cfg.dlc_uri.is_none());
2048 assert_eq!(cfg.policies.len(), 1);
2049 }
2050
2051 #[test]
2052 fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
2053 let first = ErrorHandlerConfig::dead_letter_channel("log:first");
2054 let second = ErrorHandlerConfig::dead_letter_channel("log:second");
2055
2056 let definition = RouteBuilder::from("direct:start")
2057 .route_id("test-route")
2058 .error_handler(first)
2059 .error_handler(second)
2060 .to("mock:out")
2061 .build()
2062 .expect("route should build");
2063
2064 let cfg = definition
2065 .error_handler_config()
2066 .expect("error handler should be set");
2067 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2068 }
2069
2070 #[test]
2073 fn test_split_builder_typestate() {
2074 use camel_api::splitter::{SplitterConfig, split_body_lines};
2075
2076 let definition = RouteBuilder::from("timer:test?period=1000")
2078 .route_id("test-route")
2079 .split(SplitterConfig::new(split_body_lines()))
2080 .to("mock:per-fragment")
2081 .end_split()
2082 .to("mock:final")
2083 .build()
2084 .unwrap();
2085
2086 assert_eq!(definition.steps().len(), 2);
2088 }
2089
2090 #[test]
2091 fn test_split_builder_steps_collected() {
2092 use camel_api::splitter::{SplitterConfig, split_body_lines};
2093
2094 let definition = RouteBuilder::from("timer:test?period=1000")
2095 .route_id("test-route")
2096 .split(SplitterConfig::new(split_body_lines()))
2097 .set_header("fragment", Value::String("yes".into()))
2098 .to("mock:per-fragment")
2099 .end_split()
2100 .build()
2101 .unwrap();
2102
2103 assert_eq!(definition.steps().len(), 1);
2105 match &definition.steps()[0] {
2106 BuilderStep::Split { steps, .. } => {
2107 assert_eq!(steps.len(), 2); }
2109 other => panic!("Expected Split, got {:?}", other),
2110 }
2111 }
2112
2113 #[test]
2114 fn test_split_builder_config_propagated() {
2115 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
2116
2117 let definition = RouteBuilder::from("timer:test?period=1000")
2118 .route_id("test-route")
2119 .split(
2120 SplitterConfig::new(split_body_lines())
2121 .parallel(true)
2122 .parallel_limit(4)
2123 .aggregation(AggregationStrategy::CollectAll),
2124 )
2125 .to("mock:per-fragment")
2126 .end_split()
2127 .build()
2128 .unwrap();
2129
2130 match &definition.steps()[0] {
2131 BuilderStep::Split { config, .. } => {
2132 assert!(config.parallel);
2133 assert_eq!(config.parallel_limit, Some(4));
2134 assert!(matches!(
2135 config.aggregation,
2136 AggregationStrategy::CollectAll
2137 ));
2138 }
2139 other => panic!("Expected Split, got {:?}", other),
2140 }
2141 }
2142
2143 #[test]
2144 fn test_aggregate_builder_adds_step() {
2145 use camel_api::aggregator::AggregatorConfig;
2146 use camel_core::route::BuilderStep;
2147
2148 let definition = RouteBuilder::from("timer:tick")
2149 .route_id("test-route")
2150 .aggregate(
2151 AggregatorConfig::correlate_by("key")
2152 .complete_when_size(2)
2153 .build()
2154 .unwrap(),
2155 )
2156 .build()
2157 .unwrap();
2158
2159 assert_eq!(definition.steps().len(), 1);
2160 assert!(matches!(
2161 definition.steps()[0],
2162 BuilderStep::Aggregate { .. }
2163 ));
2164 }
2165
2166 #[test]
2167 fn test_aggregate_in_split_builder() {
2168 use camel_api::aggregator::AggregatorConfig;
2169 use camel_api::splitter::{SplitterConfig, split_body_lines};
2170 use camel_core::route::BuilderStep;
2171
2172 let definition = RouteBuilder::from("timer:tick")
2173 .route_id("test-route")
2174 .split(SplitterConfig::new(split_body_lines()))
2175 .aggregate(
2176 AggregatorConfig::correlate_by("key")
2177 .complete_when_size(1)
2178 .build()
2179 .unwrap(),
2180 )
2181 .end_split()
2182 .build()
2183 .unwrap();
2184
2185 assert_eq!(definition.steps().len(), 1);
2186 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2187 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2188 } else {
2189 panic!("expected Split step");
2190 }
2191 }
2192
2193 #[test]
2196 fn test_builder_set_body_static_adds_processor() {
2197 let definition = RouteBuilder::from("timer:tick")
2198 .route_id("test-route")
2199 .set_body("fixed")
2200 .build()
2201 .unwrap();
2202 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2203 }
2204
2205 #[test]
2206 fn test_builder_set_body_fn_adds_processor() {
2207 let definition = RouteBuilder::from("timer:tick")
2208 .route_id("test-route")
2209 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2210 .build()
2211 .unwrap();
2212 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2213 }
2214
2215 #[test]
2216 fn transform_alias_produces_same_as_set_body() {
2217 let route_transform = RouteBuilder::from("timer:tick")
2218 .route_id("test-route")
2219 .transform("hello")
2220 .build()
2221 .unwrap();
2222
2223 let route_set_body = RouteBuilder::from("timer:tick")
2224 .route_id("test-route")
2225 .set_body("hello")
2226 .build()
2227 .unwrap();
2228
2229 assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2230 }
2231
2232 #[test]
2233 fn test_builder_set_header_fn_adds_processor() {
2234 let definition = RouteBuilder::from("timer:tick")
2235 .route_id("test-route")
2236 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2237 .build()
2238 .unwrap();
2239 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2240 }
2241
2242 #[tokio::test]
2243 async fn test_set_body_static_processor_works() {
2244 use camel_core::route::compose_pipeline;
2245 let def = RouteBuilder::from("t:t")
2246 .route_id("test-route")
2247 .set_body("replaced")
2248 .build()
2249 .unwrap();
2250 let pipeline = compose_pipeline(
2251 def.steps()
2252 .iter()
2253 .filter_map(|s| {
2254 if let BuilderStep::Processor(p) = s {
2255 Some(p.clone())
2256 } else {
2257 None
2258 }
2259 })
2260 .collect(),
2261 );
2262 let exchange = Exchange::new(Message::new("original"));
2263 let result = pipeline.oneshot(exchange).await.unwrap();
2264 assert_eq!(result.input.body.as_text(), Some("replaced"));
2265 }
2266
2267 #[tokio::test]
2268 async fn test_set_body_fn_processor_works() {
2269 use camel_core::route::compose_pipeline;
2270 let def = RouteBuilder::from("t:t")
2271 .route_id("test-route")
2272 .set_body_fn(|ex: &Exchange| {
2273 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2274 })
2275 .build()
2276 .unwrap();
2277 let pipeline = compose_pipeline(
2278 def.steps()
2279 .iter()
2280 .filter_map(|s| {
2281 if let BuilderStep::Processor(p) = s {
2282 Some(p.clone())
2283 } else {
2284 None
2285 }
2286 })
2287 .collect(),
2288 );
2289 let exchange = Exchange::new(Message::new("hello"));
2290 let result = pipeline.oneshot(exchange).await.unwrap();
2291 assert_eq!(result.input.body.as_text(), Some("HELLO"));
2292 }
2293
2294 #[tokio::test]
2295 async fn test_set_header_fn_processor_works() {
2296 use camel_core::route::compose_pipeline;
2297 let def = RouteBuilder::from("t:t")
2298 .route_id("test-route")
2299 .set_header_fn("echo", |ex: &Exchange| {
2300 ex.input
2301 .body
2302 .as_text()
2303 .map(|t| Value::String(t.into()))
2304 .unwrap_or(Value::Null)
2305 })
2306 .build()
2307 .unwrap();
2308 let pipeline = compose_pipeline(
2309 def.steps()
2310 .iter()
2311 .filter_map(|s| {
2312 if let BuilderStep::Processor(p) = s {
2313 Some(p.clone())
2314 } else {
2315 None
2316 }
2317 })
2318 .collect(),
2319 );
2320 let exchange = Exchange::new(Message::new("ping"));
2321 let result = pipeline.oneshot(exchange).await.unwrap();
2322 assert_eq!(
2323 result.input.header("echo"),
2324 Some(&Value::String("ping".into()))
2325 );
2326 }
2327
2328 #[test]
2331 fn test_filter_builder_typestate() {
2332 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2333 .route_id("test-route")
2334 .filter(|_ex| true)
2335 .to("mock:inner")
2336 .end_filter()
2337 .to("mock:outer")
2338 .build();
2339 assert!(result.is_ok());
2340 }
2341
2342 #[test]
2343 fn test_filter_builder_steps_collected() {
2344 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2345 .route_id("test-route")
2346 .filter(|_ex| true)
2347 .to("mock:inner")
2348 .end_filter()
2349 .build()
2350 .unwrap();
2351
2352 assert_eq!(definition.steps().len(), 1);
2353 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2354 }
2355
2356 #[test]
2357 fn test_wire_tap_builder_adds_step() {
2358 let definition = RouteBuilder::from("timer:tick")
2359 .route_id("test-route")
2360 .wire_tap("mock:tap")
2361 .to("mock:result")
2362 .build()
2363 .unwrap();
2364
2365 assert_eq!(definition.steps().len(), 2);
2366 assert!(
2367 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2368 );
2369 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2370 }
2371
2372 #[test]
2375 fn test_multicast_builder_typestate() {
2376 let definition = RouteBuilder::from("timer:tick")
2377 .route_id("test-route")
2378 .multicast()
2379 .to("direct:a")
2380 .to("direct:b")
2381 .end_multicast()
2382 .to("mock:result")
2383 .build()
2384 .unwrap();
2385
2386 assert_eq!(definition.steps().len(), 2); }
2388
2389 #[test]
2390 fn test_multicast_builder_steps_collected() {
2391 let definition = RouteBuilder::from("timer:tick")
2392 .route_id("test-route")
2393 .multicast()
2394 .to("direct:a")
2395 .to("direct:b")
2396 .end_multicast()
2397 .build()
2398 .unwrap();
2399
2400 match &definition.steps()[0] {
2401 BuilderStep::Multicast { steps, .. } => {
2402 assert_eq!(steps.len(), 2);
2403 }
2404 other => panic!("Expected Multicast, got {:?}", other),
2405 }
2406 }
2407
2408 #[test]
2411 fn test_builder_concurrent_sets_concurrency() {
2412 use camel_component_api::ConcurrencyModel;
2413
2414 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2415 .route_id("test-route")
2416 .concurrent(16)
2417 .to("log:info")
2418 .build()
2419 .unwrap();
2420
2421 assert_eq!(
2422 definition.concurrency_override(),
2423 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2424 );
2425 }
2426
2427 #[test]
2428 fn test_builder_concurrent_zero_means_unbounded() {
2429 use camel_component_api::ConcurrencyModel;
2430
2431 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2432 .route_id("test-route")
2433 .concurrent(0)
2434 .to("log:info")
2435 .build()
2436 .unwrap();
2437
2438 assert_eq!(
2439 definition.concurrency_override(),
2440 Some(&ConcurrencyModel::Concurrent { max: None })
2441 );
2442 }
2443
2444 #[test]
2445 fn test_builder_sequential_sets_concurrency() {
2446 use camel_component_api::ConcurrencyModel;
2447
2448 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2449 .route_id("test-route")
2450 .sequential()
2451 .to("log:info")
2452 .build()
2453 .unwrap();
2454
2455 assert_eq!(
2456 definition.concurrency_override(),
2457 Some(&ConcurrencyModel::Sequential)
2458 );
2459 }
2460
2461 #[test]
2462 fn test_builder_default_concurrency_is_none() {
2463 let definition = RouteBuilder::from("timer:tick")
2464 .route_id("test-route")
2465 .to("log:info")
2466 .build()
2467 .unwrap();
2468
2469 assert_eq!(definition.concurrency_override(), None);
2470 }
2471
2472 #[test]
2475 fn test_builder_route_id_sets_id() {
2476 let definition = RouteBuilder::from("timer:tick")
2477 .route_id("my-route")
2478 .build()
2479 .unwrap();
2480
2481 assert_eq!(definition.route_id(), "my-route");
2482 }
2483
2484 #[test]
2485 fn test_build_without_route_id_fails() {
2486 let result = RouteBuilder::from("timer:tick?period=1000")
2487 .to("log:info")
2488 .build();
2489 let err = match result {
2490 Err(e) => e.to_string(),
2491 Ok(_) => panic!("build() should fail without route_id"),
2492 };
2493 assert!(
2494 err.contains("route_id"),
2495 "error should mention route_id, got: {}",
2496 err
2497 );
2498 }
2499
2500 #[test]
2501 fn test_builder_empty_route_id_rejected() {
2502 let result = RouteBuilder::from("timer:tick").route_id("").build();
2503 let err = result.err().expect("empty route_id should be rejected");
2504 assert!(matches!(err, CamelError::RouteError(_)));
2505 }
2506
2507 #[test]
2508 fn test_builder_whitespace_route_id_rejected() {
2509 let result = RouteBuilder::from("timer:tick").route_id(" ").build();
2510 assert!(result.is_err());
2511 }
2512
2513 #[test]
2514 fn test_builder_auto_startup_false() {
2515 let definition = RouteBuilder::from("timer:tick")
2516 .route_id("test-route")
2517 .auto_startup(false)
2518 .build()
2519 .unwrap();
2520
2521 assert!(!definition.auto_startup());
2522 }
2523
2524 #[test]
2525 fn test_builder_startup_order_custom() {
2526 let definition = RouteBuilder::from("timer:tick")
2527 .route_id("test-route")
2528 .startup_order(50)
2529 .build()
2530 .unwrap();
2531
2532 assert_eq!(definition.startup_order(), 50);
2533 }
2534
2535 #[test]
2536 fn test_builder_defaults() {
2537 let definition = RouteBuilder::from("timer:tick")
2538 .route_id("test-route")
2539 .build()
2540 .unwrap();
2541
2542 assert_eq!(definition.route_id(), "test-route");
2543 assert!(definition.auto_startup());
2544 assert_eq!(definition.startup_order(), 1000);
2545 }
2546
2547 #[test]
2550 fn test_choice_builder_single_when() {
2551 let definition = RouteBuilder::from("timer:tick")
2552 .route_id("test-route")
2553 .choice()
2554 .when(|ex: &Exchange| ex.input.header("type").is_some())
2555 .to("mock:typed")
2556 .end_when()
2557 .end_choice()
2558 .build()
2559 .unwrap();
2560 assert_eq!(definition.steps().len(), 1);
2561 assert!(
2562 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2563 if whens.len() == 1 && otherwise.is_none())
2564 );
2565 }
2566
2567 #[test]
2568 fn test_choice_builder_when_otherwise() {
2569 let definition = RouteBuilder::from("timer:tick")
2570 .route_id("test-route")
2571 .choice()
2572 .when(|ex: &Exchange| ex.input.header("a").is_some())
2573 .to("mock:a")
2574 .end_when()
2575 .otherwise()
2576 .to("mock:fallback")
2577 .end_otherwise()
2578 .end_choice()
2579 .build()
2580 .unwrap();
2581 assert!(
2582 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2583 if whens.len() == 1 && otherwise.is_some())
2584 );
2585 }
2586
2587 #[test]
2588 fn test_choice_builder_multiple_whens() {
2589 let definition = RouteBuilder::from("timer:tick")
2590 .route_id("test-route")
2591 .choice()
2592 .when(|ex: &Exchange| ex.input.header("a").is_some())
2593 .to("mock:a")
2594 .end_when()
2595 .when(|ex: &Exchange| ex.input.header("b").is_some())
2596 .to("mock:b")
2597 .end_when()
2598 .end_choice()
2599 .build()
2600 .unwrap();
2601 assert!(
2602 matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2603 if whens.len() == 2)
2604 );
2605 }
2606
2607 #[test]
2608 fn test_choice_step_after_choice() {
2609 let definition = RouteBuilder::from("timer:tick")
2611 .route_id("test-route")
2612 .choice()
2613 .when(|_ex: &Exchange| true)
2614 .to("mock:inner")
2615 .end_when()
2616 .end_choice()
2617 .to("mock:outer") .build()
2619 .unwrap();
2620 assert_eq!(definition.steps().len(), 2);
2621 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2622 }
2623
2624 #[test]
2627 fn test_throttle_builder_typestate() {
2628 let definition = RouteBuilder::from("timer:tick")
2629 .route_id("test-route")
2630 .throttle(10, std::time::Duration::from_secs(1))
2631 .to("mock:result")
2632 .end_throttle()
2633 .build()
2634 .unwrap();
2635
2636 assert_eq!(definition.steps().len(), 1);
2637 assert!(matches!(
2638 &definition.steps()[0],
2639 BuilderStep::Throttle { .. }
2640 ));
2641 }
2642
2643 #[test]
2644 fn test_throttle_builder_with_strategy() {
2645 let definition = RouteBuilder::from("timer:tick")
2646 .route_id("test-route")
2647 .throttle(10, std::time::Duration::from_secs(1))
2648 .strategy(ThrottleStrategy::Reject)
2649 .to("mock:result")
2650 .end_throttle()
2651 .build()
2652 .unwrap();
2653
2654 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2655 assert_eq!(config.strategy, ThrottleStrategy::Reject);
2656 } else {
2657 panic!("Expected Throttle step");
2658 }
2659 }
2660
2661 #[test]
2662 fn test_throttle_builder_steps_collected() {
2663 let definition = RouteBuilder::from("timer:tick")
2664 .route_id("test-route")
2665 .throttle(5, std::time::Duration::from_secs(1))
2666 .set_header("throttled", Value::Bool(true))
2667 .to("mock:throttled")
2668 .end_throttle()
2669 .build()
2670 .unwrap();
2671
2672 match &definition.steps()[0] {
2673 BuilderStep::Throttle { steps, .. } => {
2674 assert_eq!(steps.len(), 2); }
2676 other => panic!("Expected Throttle, got {:?}", other),
2677 }
2678 }
2679
2680 #[test]
2681 fn test_throttle_step_after_throttle() {
2682 let definition = RouteBuilder::from("timer:tick")
2684 .route_id("test-route")
2685 .throttle(10, std::time::Duration::from_secs(1))
2686 .to("mock:inner")
2687 .end_throttle()
2688 .to("mock:outer")
2689 .build()
2690 .unwrap();
2691
2692 assert_eq!(definition.steps().len(), 2);
2693 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2694 }
2695
2696 #[test]
2699 fn test_load_balance_builder_typestate() {
2700 let definition = RouteBuilder::from("timer:tick")
2701 .route_id("test-route")
2702 .load_balance()
2703 .round_robin()
2704 .to("mock:a")
2705 .to("mock:b")
2706 .end_load_balance()
2707 .build()
2708 .unwrap();
2709
2710 assert_eq!(definition.steps().len(), 1);
2711 assert!(matches!(
2712 &definition.steps()[0],
2713 BuilderStep::LoadBalance { .. }
2714 ));
2715 }
2716
2717 #[test]
2718 fn test_load_balance_builder_with_strategy() {
2719 let definition = RouteBuilder::from("timer:tick")
2720 .route_id("test-route")
2721 .load_balance()
2722 .random()
2723 .to("mock:result")
2724 .end_load_balance()
2725 .build()
2726 .unwrap();
2727
2728 if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2729 assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2730 } else {
2731 panic!("Expected LoadBalance step");
2732 }
2733 }
2734
2735 #[test]
2736 fn test_load_balance_builder_steps_collected() {
2737 let definition = RouteBuilder::from("timer:tick")
2738 .route_id("test-route")
2739 .load_balance()
2740 .set_header("lb", Value::Bool(true))
2741 .to("mock:a")
2742 .end_load_balance()
2743 .build()
2744 .unwrap();
2745
2746 match &definition.steps()[0] {
2747 BuilderStep::LoadBalance { steps, .. } => {
2748 assert_eq!(steps.len(), 2); }
2750 other => panic!("Expected LoadBalance, got {:?}", other),
2751 }
2752 }
2753
2754 #[test]
2755 fn test_load_balance_step_after_load_balance() {
2756 let definition = RouteBuilder::from("timer:tick")
2758 .route_id("test-route")
2759 .load_balance()
2760 .to("mock:inner")
2761 .end_load_balance()
2762 .to("mock:outer")
2763 .build()
2764 .unwrap();
2765
2766 assert_eq!(definition.steps().len(), 2);
2767 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2768 }
2769
2770 #[test]
2773 fn test_dynamic_router_builder() {
2774 let definition = RouteBuilder::from("timer:tick")
2775 .route_id("test-route")
2776 .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2777 .build()
2778 .unwrap();
2779
2780 assert_eq!(definition.steps().len(), 1);
2781 assert!(matches!(
2782 &definition.steps()[0],
2783 BuilderStep::DynamicRouter { .. }
2784 ));
2785 }
2786
2787 #[test]
2788 fn test_dynamic_router_builder_with_config() {
2789 let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2790 .max_iterations(100)
2791 .cache_size(500);
2792
2793 let definition = RouteBuilder::from("timer:tick")
2794 .route_id("test-route")
2795 .dynamic_router_with_config(config)
2796 .build()
2797 .unwrap();
2798
2799 assert_eq!(definition.steps().len(), 1);
2800 if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2801 assert_eq!(config.max_iterations, 100);
2802 assert_eq!(config.cache_size, 500);
2803 } else {
2804 panic!("Expected DynamicRouter step");
2805 }
2806 }
2807
2808 #[test]
2809 fn test_dynamic_router_step_after_router() {
2810 let definition = RouteBuilder::from("timer:tick")
2812 .route_id("test-route")
2813 .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2814 .to("mock:outer")
2815 .build()
2816 .unwrap();
2817
2818 assert_eq!(definition.steps().len(), 2);
2819 assert!(matches!(
2820 &definition.steps()[0],
2821 BuilderStep::DynamicRouter { .. }
2822 ));
2823 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2824 }
2825
2826 #[test]
2827 fn routing_slip_builder_creates_step() {
2828 use camel_api::RoutingSlipExpression;
2829
2830 let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2831
2832 let route = RouteBuilder::from("direct:start")
2833 .route_id("routing-slip-test")
2834 .routing_slip(expression)
2835 .build()
2836 .unwrap();
2837
2838 assert!(
2839 matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2840 "Expected RoutingSlip step"
2841 );
2842 }
2843
2844 #[test]
2845 fn routing_slip_with_config_builder_creates_step() {
2846 use camel_api::RoutingSlipConfig;
2847
2848 let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2849 .uri_delimiter("|")
2850 .cache_size(50)
2851 .ignore_invalid_endpoints(true);
2852
2853 let route = RouteBuilder::from("direct:start")
2854 .route_id("routing-slip-config-test")
2855 .routing_slip_with_config(config)
2856 .build()
2857 .unwrap();
2858
2859 if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2860 assert_eq!(config.uri_delimiter, "|");
2861 assert_eq!(config.cache_size, 50);
2862 assert!(config.ignore_invalid_endpoints);
2863 } else {
2864 panic!("Expected RoutingSlip step");
2865 }
2866 }
2867
2868 #[test]
2869 fn test_builder_marshal_adds_processor_step() {
2870 let definition = RouteBuilder::from("timer:tick")
2871 .route_id("test-route")
2872 .marshal("json")
2873 .unwrap()
2874 .build()
2875 .unwrap();
2876 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2877 }
2878
2879 #[test]
2880 fn test_builder_unmarshal_adds_processor_step() {
2881 let definition = RouteBuilder::from("timer:tick")
2882 .route_id("test-route")
2883 .unmarshal("json")
2884 .unwrap()
2885 .build()
2886 .unwrap();
2887 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2888 }
2889
2890 #[test]
2891 fn test_builder_stream_cache_adds_processor_step() {
2892 let definition = RouteBuilder::from("timer:tick")
2893 .route_id("test-route")
2894 .stream_cache(1024)
2895 .build()
2896 .unwrap();
2897 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2898 }
2899
2900 #[test]
2901 fn validate_adds_to_step_with_validator_uri() {
2902 let def = RouteBuilder::from("direct:in")
2903 .route_id("test")
2904 .validate("schemas/order.xsd")
2905 .build()
2906 .unwrap();
2907 let steps = def.steps();
2908 assert_eq!(steps.len(), 1);
2909 assert!(
2910 matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2911 "got: {:?}",
2912 steps[0]
2913 );
2914 }
2915
2916 #[test]
2917 fn test_builder_marshal_returns_err_for_unknown_format() {
2918 let result = RouteBuilder::from("timer:tick")
2919 .route_id("test-route")
2920 .marshal("csv");
2921 let err = match result {
2922 Err(e) => e,
2923 Ok(_) => panic!("marshal with unknown format should return Err"),
2924 };
2925 let msg = err.to_string();
2926 assert!(
2927 msg.contains("unknown data format"),
2928 "error should mention unknown format, got: {msg}"
2929 );
2930 assert!(
2931 msg.contains("csv"),
2932 "error should mention format name, got: {msg}"
2933 );
2934 }
2935
2936 #[test]
2937 fn test_builder_unmarshal_returns_err_for_unknown_format() {
2938 let result = RouteBuilder::from("timer:tick")
2939 .route_id("test-route")
2940 .unmarshal("csv");
2941 let err = match result {
2942 Err(e) => e,
2943 Ok(_) => panic!("unmarshal with unknown format should return Err"),
2944 };
2945 let msg = err.to_string();
2946 assert!(
2947 msg.contains("unknown data format"),
2948 "error should mention unknown format, got: {msg}"
2949 );
2950 assert!(
2951 msg.contains("csv"),
2952 "error should mention format name, got: {msg}"
2953 );
2954 }
2955
2956 #[test]
2957 fn test_builder_recipient_list_creates_step() {
2958 let route = RouteBuilder::from("direct:start")
2959 .route_id("recipient-list-test")
2960 .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2961 .build()
2962 .unwrap();
2963
2964 assert!(matches!(
2965 &route.steps()[0],
2966 BuilderStep::RecipientList { .. }
2967 ));
2968 }
2969
2970 #[test]
2971 fn test_builder_recipient_list_with_config_creates_step() {
2972 let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2973
2974 let route = RouteBuilder::from("direct:start")
2975 .route_id("recipient-list-config-test")
2976 .recipient_list_with_config(config)
2977 .build()
2978 .unwrap();
2979
2980 assert!(matches!(
2981 &route.steps()[0],
2982 BuilderStep::RecipientList { .. }
2983 ));
2984 }
2985
2986 #[test]
2987 fn test_builder_script_adds_script_step() {
2988 let route = RouteBuilder::from("direct:start")
2989 .route_id("script-test")
2990 .script("rhai", "headers[\"x\"] = \"y\"")
2991 .build()
2992 .unwrap();
2993
2994 assert!(matches!(
2995 &route.steps()[0],
2996 BuilderStep::Script { language, script }
2997 if language == "rhai" && script == "headers[\"x\"] = \"y\""
2998 ));
2999 }
3000
3001 #[test]
3002 fn test_builder_delay_and_delay_with_header_add_steps() {
3003 let route = RouteBuilder::from("direct:start")
3004 .route_id("delay-test")
3005 .delay(Duration::from_millis(250))
3006 .delay_with_header(Duration::from_millis(500), "x-delay")
3007 .build()
3008 .unwrap();
3009
3010 assert_eq!(route.steps().len(), 2);
3011 assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
3012 assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
3013 }
3014
3015 #[test]
3016 fn test_builder_log_and_stop_add_steps_in_order() {
3017 let route = RouteBuilder::from("direct:start")
3018 .route_id("log-stop-test")
3019 .log("hello", LogLevel::Info)
3020 .stop()
3021 .to("mock:after")
3022 .build()
3023 .unwrap();
3024
3025 assert_eq!(route.steps().len(), 3);
3026 assert!(matches!(
3027 &route.steps()[0],
3028 BuilderStep::Log { message, .. } if message == "hello"
3029 ));
3030 assert!(matches!(&route.steps()[1], BuilderStep::Stop));
3031 assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
3032 }
3033
3034 #[test]
3035 fn test_builder_stream_cache_default_adds_processor_step() {
3036 let route = RouteBuilder::from("direct:start")
3037 .route_id("stream-cache-default-test")
3038 .stream_cache_default()
3039 .build()
3040 .unwrap();
3041
3042 assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
3043 }
3044
3045 #[test]
3046 fn test_validate_preserves_existing_validator_prefix() {
3047 let route = RouteBuilder::from("direct:in")
3048 .route_id("validate-prefix-test")
3049 .validate("validator:schemas/order.xsd")
3050 .build()
3051 .unwrap();
3052
3053 assert!(matches!(
3054 &route.steps()[0],
3055 BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
3056 ));
3057 }
3058
3059 #[test]
3060 fn test_load_balance_builder_weighted_failover_parallel_config() {
3061 let route = RouteBuilder::from("direct:start")
3062 .route_id("lb-weighted-failover-parallel")
3063 .load_balance()
3064 .weighted(vec![
3065 ("direct:a".to_string(), 3),
3066 ("direct:b".to_string(), 1),
3067 ])
3068 .failover()
3069 .parallel(true)
3070 .to("mock:result")
3071 .end_load_balance()
3072 .build()
3073 .unwrap();
3074
3075 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3076 assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3077 assert!(config.parallel);
3078 } else {
3079 panic!("Expected LoadBalance step");
3080 }
3081 }
3082
3083 #[test]
3084 fn test_multicast_builder_all_config_setters() {
3085 let route = RouteBuilder::from("direct:start")
3086 .route_id("multicast-config-test")
3087 .multicast()
3088 .parallel(true)
3089 .parallel_limit(4)
3090 .stop_on_exception(true)
3091 .timeout(Duration::from_millis(300))
3092 .aggregation(MulticastStrategy::Original)
3093 .to("mock:a")
3094 .end_multicast()
3095 .build()
3096 .unwrap();
3097
3098 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3099 assert!(config.parallel);
3100 assert_eq!(config.parallel_limit, Some(4));
3101 assert!(config.stop_on_exception);
3102 assert_eq!(config.timeout, Some(Duration::from_millis(300)));
3103 assert!(matches!(config.aggregation, MulticastStrategy::Original));
3104 } else {
3105 panic!("Expected Multicast step");
3106 }
3107 }
3108
3109 #[test]
3110 fn test_build_canonical_rejects_unsupported_processor_step() {
3111 let err = RouteBuilder::from("direct:start")
3112 .route_id("canonical-reject")
3113 .set_header("k", Value::String("v".into()))
3114 .build_canonical()
3115 .unwrap_err();
3116
3117 assert!(format!("{err}").contains("does not support step `processor`"));
3118 }
3119
3120 #[test]
3123 fn test_load_balance_builder_weighted_strategy() {
3124 let route = RouteBuilder::from("direct:start")
3125 .route_id("lb-weighted")
3126 .load_balance()
3127 .weighted(vec![
3128 ("direct:a".to_string(), 5),
3129 ("direct:b".to_string(), 2),
3130 ("direct:c".to_string(), 1),
3131 ])
3132 .to("mock:result")
3133 .end_load_balance()
3134 .build()
3135 .unwrap();
3136
3137 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3138 assert!(matches!(config.strategy, LoadBalanceStrategy::Weighted(_)));
3139 } else {
3140 panic!("Expected LoadBalance step");
3141 }
3142 }
3143
3144 #[test]
3145 fn test_load_balance_builder_failover_strategy() {
3146 let route = RouteBuilder::from("direct:start")
3147 .route_id("lb-failover")
3148 .load_balance()
3149 .failover()
3150 .to("mock:primary")
3151 .end_load_balance()
3152 .build()
3153 .unwrap();
3154
3155 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3156 assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3157 assert!(!config.parallel);
3158 } else {
3159 panic!("Expected LoadBalance step");
3160 }
3161 }
3162
3163 #[test]
3164 fn test_load_balance_builder_parallel_false_explicit() {
3165 let route = RouteBuilder::from("direct:start")
3166 .route_id("lb-parallel-false")
3167 .load_balance()
3168 .round_robin()
3169 .parallel(false)
3170 .to("mock:result")
3171 .end_load_balance()
3172 .build()
3173 .unwrap();
3174
3175 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3176 assert!(!config.parallel);
3177 } else {
3178 panic!("Expected LoadBalance step");
3179 }
3180 }
3181
3182 #[test]
3185 fn test_filter_in_split_builder_typestate() {
3186 use camel_api::splitter::{SplitterConfig, split_body_lines};
3187
3188 let definition = RouteBuilder::from("timer:test")
3189 .route_id("filter-in-split")
3190 .split(SplitterConfig::new(split_body_lines()))
3191 .filter(|_ex| true)
3192 .to("mock:filtered")
3193 .end_filter()
3194 .end_split()
3195 .build()
3196 .unwrap();
3197
3198 assert_eq!(definition.steps().len(), 1);
3199 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3200 assert_eq!(steps.len(), 1);
3201 assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3202 } else {
3203 panic!("Expected Split step");
3204 }
3205 }
3206
3207 #[test]
3208 fn test_filter_in_split_builder_multiple_steps() {
3209 use camel_api::splitter::{SplitterConfig, split_body_lines};
3210
3211 let definition = RouteBuilder::from("timer:test")
3212 .route_id("filter-in-split-multi")
3213 .split(SplitterConfig::new(split_body_lines()))
3214 .to("mock:before-filter")
3215 .filter(|_ex| true)
3216 .to("mock:inside-filter")
3217 .end_filter()
3218 .to("mock:after-filter")
3219 .end_split()
3220 .build()
3221 .unwrap();
3222
3223 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3224 assert_eq!(steps.len(), 3);
3226 } else {
3227 panic!("Expected Split step");
3228 }
3229 }
3230
3231 #[test]
3234 fn test_build_canonical_with_circuit_breaker() {
3235 use camel_api::circuit_breaker::CircuitBreakerConfig;
3236
3237 let spec = RouteBuilder::from("direct:start")
3238 .route_id("canonical-cb")
3239 .circuit_breaker(CircuitBreakerConfig::new().failure_threshold(10))
3240 .to("mock:result")
3241 .build_canonical()
3242 .unwrap();
3243
3244 let cb = spec.circuit_breaker.expect("circuit breaker should be set");
3245 assert_eq!(cb.failure_threshold, 10);
3246 }
3247
3248 #[test]
3249 fn test_build_canonical_rejects_custom_split_aggregation() {
3250 use camel_api::splitter::{SplitterConfig, split_body_lines};
3251
3252 let err = RouteBuilder::from("direct:start")
3253 .route_id("canonical-custom-split")
3254 .split(SplitterConfig::new(split_body_lines()).aggregation(
3255 camel_api::splitter::AggregationStrategy::Custom(Arc::new(|_, ex| ex)),
3256 ))
3257 .to("mock:frag")
3258 .end_split()
3259 .build_canonical()
3260 .unwrap_err();
3261
3262 assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3264 }
3265
3266 #[test]
3267 fn test_build_canonical_rejects_custom_aggregate_strategy() {
3268 let err = RouteBuilder::from("direct:start")
3269 .route_id("canonical-custom-agg")
3270 .aggregate(
3271 AggregatorConfig::correlate_by("key")
3272 .complete_when_size(2)
3273 .strategy(AggregationStrategy::Custom(Arc::new(|_, ex| ex)))
3274 .build()
3275 .unwrap(),
3276 )
3277 .build_canonical()
3278 .unwrap_err();
3279
3280 assert!(format!("{err}").contains("custom aggregate strategy"));
3281 }
3282
3283 #[test]
3284 fn test_build_canonical_rejects_fn_correlation_strategy() {
3285 let err = RouteBuilder::from("direct:start")
3286 .route_id("canonical-fn-corr")
3287 .aggregate(AggregatorConfig {
3288 header_name: "key".to_string(),
3289 completion: CompletionMode::Single(CompletionCondition::Size(1)),
3290 correlation: CorrelationStrategy::Fn(Arc::new(|_| Some("key".to_string()))),
3291 strategy: AggregationStrategy::CollectAll,
3292 max_buckets: None,
3293 bucket_ttl: None,
3294 force_completion_on_stop: false,
3295 discard_on_timeout: false,
3296 })
3297 .build_canonical()
3298 .unwrap_err();
3299
3300 assert!(format!("{err}").contains("Fn correlation strategy"));
3301 }
3302
3303 #[test]
3304 fn test_build_canonical_rejects_predicate_completion() {
3305 let err = RouteBuilder::from("direct:start")
3306 .route_id("canonical-pred-completion")
3307 .aggregate(AggregatorConfig {
3308 header_name: "key".to_string(),
3309 completion: CompletionMode::Single(CompletionCondition::Predicate(Arc::new(
3310 |_| false,
3311 ))),
3312 correlation: CorrelationStrategy::HeaderName("key".to_string()),
3313 strategy: AggregationStrategy::CollectAll,
3314 max_buckets: None,
3315 bucket_ttl: None,
3316 force_completion_on_stop: false,
3317 discard_on_timeout: false,
3318 })
3319 .build_canonical()
3320 .unwrap_err();
3321
3322 assert!(format!("{err}").contains("predicate completion"));
3323 }
3324
3325 #[test]
3326 fn test_build_canonical_with_expression_correlation() {
3327 let spec = RouteBuilder::from("direct:start")
3328 .route_id("canonical-expr-corr")
3329 .aggregate(AggregatorConfig {
3330 header_name: "key".to_string(),
3331 completion: CompletionMode::Single(CompletionCondition::Size(1)),
3332 correlation: CorrelationStrategy::Expression {
3333 expr: "header.key".to_string(),
3334 language: "simple".to_string(),
3335 },
3336 strategy: AggregationStrategy::CollectAll,
3337 max_buckets: None,
3338 bucket_ttl: None,
3339 force_completion_on_stop: false,
3340 discard_on_timeout: false,
3341 })
3342 .build_canonical()
3343 .unwrap();
3344
3345 assert!(spec.steps.iter().any(|s| matches!(s, CanonicalStepSpec::Aggregate(a) if a.correlation_key == Some("header.key".to_string()))));
3346 }
3347
3348 #[test]
3349 fn test_build_canonical_split_rejected_with_closure_expression() {
3350 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
3351
3352 let err = RouteBuilder::from("direct:start")
3354 .route_id("canonical-split-last")
3355 .split(
3356 SplitterConfig::new(split_body_lines()).aggregation(AggregationStrategy::LastWins),
3357 )
3358 .to("mock:frag")
3359 .end_split()
3360 .build_canonical()
3361 .unwrap_err();
3362
3363 assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3364 }
3365
3366 #[test]
3369 fn test_on_exception_full_chain_retry_backoff_jitter_handled_by() {
3370 let definition = RouteBuilder::from("direct:start")
3371 .route_id("on-exception-full")
3372 .dead_letter_channel("log:dlc")
3373 .on_exception(|e| matches!(e, CamelError::Io(_)))
3374 .retry(5)
3375 .with_backoff(Duration::from_millis(10), 2.0, Duration::from_millis(500))
3376 .with_jitter(0.3)
3377 .handled_by("log:io-handler")
3378 .end_on_exception()
3379 .to("mock:out")
3380 .build()
3381 .unwrap();
3382
3383 let cfg = definition
3384 .error_handler_config()
3385 .expect("error handler should be set");
3386 assert_eq!(cfg.policies.len(), 1);
3387 let policy = &cfg.policies[0];
3388 let retry = policy.retry.as_ref().expect("retry should be set");
3389 assert_eq!(retry.max_attempts, 5);
3390 assert_eq!(retry.initial_delay, Duration::from_millis(10));
3391 assert_eq!(retry.multiplier, 2.0);
3392 assert_eq!(retry.max_delay, Duration::from_millis(500));
3393 assert!((retry.jitter_factor - 0.3).abs() < f64::EPSILON);
3394 assert_eq!(policy.handled_by.as_deref(), Some("log:io-handler"));
3395 }
3396
3397 #[test]
3398 fn test_on_exception_jitter_clamped_to_valid_range() {
3399 let definition = RouteBuilder::from("direct:start")
3400 .route_id("jitter-clamp")
3401 .on_exception(|_e| true)
3402 .retry(1)
3403 .with_jitter(5.0)
3404 .end_on_exception()
3405 .to("mock:out")
3406 .build()
3407 .unwrap();
3408
3409 let cfg = definition.error_handler_config().unwrap();
3410 let retry = cfg.policies[0].retry.as_ref().unwrap();
3411 assert!((retry.jitter_factor - 1.0).abs() < f64::EPSILON);
3412 }
3413
3414 #[test]
3417 fn test_builder_process_fn_adds_processor_step() {
3418 use camel_api::BoxProcessorExt;
3419 let processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
3420 let definition = RouteBuilder::from("timer:tick")
3421 .route_id("process-fn-test")
3422 .process_fn(processor)
3423 .build()
3424 .unwrap();
3425
3426 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3427 }
3428
3429 #[test]
3430 fn test_builder_convert_body_to_adds_processor_step() {
3431 let definition = RouteBuilder::from("timer:tick")
3432 .route_id("convert-body-test")
3433 .convert_body_to(BodyType::Json)
3434 .build()
3435 .unwrap();
3436
3437 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3438 }
3439
3440 #[test]
3441 fn test_builder_bean_adds_bean_step() {
3442 let definition = RouteBuilder::from("timer:tick")
3443 .route_id("bean-test")
3444 .bean("myBean", "process")
3445 .build()
3446 .unwrap();
3447
3448 assert!(
3449 matches!(&definition.steps()[0], BuilderStep::Bean { name, method }
3450 if name == "myBean" && method == "process")
3451 );
3452 }
3453
3454 #[test]
3457 fn test_throttle_builder_delay_strategy() {
3458 let definition = RouteBuilder::from("timer:tick")
3459 .route_id("throttle-delay")
3460 .throttle(10, Duration::from_secs(1))
3461 .strategy(ThrottleStrategy::Delay)
3462 .to("mock:result")
3463 .end_throttle()
3464 .build()
3465 .unwrap();
3466
3467 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3468 assert_eq!(config.strategy, ThrottleStrategy::Delay);
3469 } else {
3470 panic!("Expected Throttle step");
3471 }
3472 }
3473
3474 #[test]
3475 fn test_throttle_builder_drop_strategy() {
3476 let definition = RouteBuilder::from("timer:tick")
3477 .route_id("throttle-drop")
3478 .throttle(10, Duration::from_secs(1))
3479 .strategy(ThrottleStrategy::Drop)
3480 .to("mock:result")
3481 .end_throttle()
3482 .build()
3483 .unwrap();
3484
3485 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3486 assert_eq!(config.strategy, ThrottleStrategy::Drop);
3487 } else {
3488 panic!("Expected Throttle step");
3489 }
3490 }
3491
3492 #[test]
3495 fn test_nested_loop_while_builder() {
3496 use camel_api::loop_eip::LoopMode;
3497
3498 let def = RouteBuilder::from("direct:start")
3499 .route_id("nested-loop-while")
3500 .loop_count(2)
3501 .to("mock:outer")
3502 .loop_while(|_ex| true)
3503 .to("mock:inner")
3504 .end_loop()
3505 .end_loop()
3506 .build()
3507 .unwrap();
3508
3509 assert_eq!(def.steps().len(), 1);
3510 if let BuilderStep::Loop { steps, .. } = &def.steps()[0] {
3511 assert_eq!(steps.len(), 2);
3512 if let BuilderStep::Loop { config, .. } = &steps[1] {
3513 assert!(matches!(config.mode, LoopMode::While(_)));
3514 } else {
3515 panic!("Expected inner Loop step");
3516 }
3517 } else {
3518 panic!("Expected outer Loop step");
3519 }
3520 }
3521
3522 #[test]
3525 fn test_choice_builder_multiple_whens_with_otherwise() {
3526 let definition = RouteBuilder::from("timer:tick")
3527 .route_id("choice-multi-otherwise")
3528 .choice()
3529 .when(|ex: &Exchange| ex.input.header("a").is_some())
3530 .to("mock:a")
3531 .end_when()
3532 .when(|ex: &Exchange| ex.input.header("b").is_some())
3533 .to("mock:b")
3534 .end_when()
3535 .when(|ex: &Exchange| ex.input.header("c").is_some())
3536 .to("mock:c")
3537 .end_when()
3538 .otherwise()
3539 .to("mock:fallback")
3540 .end_otherwise()
3541 .end_choice()
3542 .build()
3543 .unwrap();
3544
3545 if let BuilderStep::Choice { whens, otherwise } = &definition.steps()[0] {
3546 assert_eq!(whens.len(), 3);
3547 assert!(otherwise.is_some());
3548 assert_eq!(otherwise.as_ref().unwrap().len(), 1);
3549 } else {
3550 panic!("Expected Choice step");
3551 }
3552 }
3553
3554 #[test]
3557 fn test_multicast_builder_parallel_only() {
3558 let route = RouteBuilder::from("direct:start")
3559 .route_id("multicast-parallel")
3560 .multicast()
3561 .parallel(true)
3562 .to("mock:a")
3563 .end_multicast()
3564 .build()
3565 .unwrap();
3566
3567 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3568 assert!(config.parallel);
3569 assert_eq!(config.parallel_limit, None);
3570 } else {
3571 panic!("Expected Multicast step");
3572 }
3573 }
3574
3575 #[test]
3576 fn test_multicast_builder_timeout_only() {
3577 let route = RouteBuilder::from("direct:start")
3578 .route_id("multicast-timeout")
3579 .multicast()
3580 .timeout(Duration::from_secs(5))
3581 .to("mock:a")
3582 .end_multicast()
3583 .build()
3584 .unwrap();
3585
3586 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3587 assert_eq!(config.timeout, Some(Duration::from_secs(5)));
3588 } else {
3589 panic!("Expected Multicast step");
3590 }
3591 }
3592
3593 #[test]
3594 fn test_multicast_builder_aggregation_collect_all() {
3595 let route = RouteBuilder::from("direct:start")
3596 .route_id("multicast-collect")
3597 .multicast()
3598 .aggregation(MulticastStrategy::CollectAll)
3599 .to("mock:a")
3600 .end_multicast()
3601 .build()
3602 .unwrap();
3603
3604 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3605 assert!(matches!(config.aggregation, MulticastStrategy::CollectAll));
3606 } else {
3607 panic!("Expected Multicast step");
3608 }
3609 }
3610
3611 #[test]
3614 fn test_build_canonical_aggregate_any_completion_mode() {
3615 let spec = RouteBuilder::from("direct:start")
3616 .route_id("canonical-any-completion")
3617 .aggregate(
3618 AggregatorConfig::correlate_by("key")
3619 .complete_on_size_or_timeout(10, Duration::from_secs(30))
3620 .build()
3621 .unwrap(),
3622 )
3623 .build_canonical()
3624 .unwrap();
3625
3626 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3627 assert_eq!(agg.completion_size, Some(10));
3628 assert_eq!(agg.completion_timeout_ms, Some(30_000));
3629 } else {
3630 panic!("Expected Aggregate step");
3631 }
3632 }
3633
3634 #[test]
3635 fn test_build_canonical_aggregate_timeout_completion() {
3636 let spec = RouteBuilder::from("direct:start")
3637 .route_id("canonical-timeout-completion")
3638 .aggregate(
3639 AggregatorConfig::correlate_by("key")
3640 .complete_on_timeout(Duration::from_millis(500))
3641 .build()
3642 .unwrap(),
3643 )
3644 .build_canonical()
3645 .unwrap();
3646
3647 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3648 assert_eq!(agg.completion_size, None);
3649 assert_eq!(agg.completion_timeout_ms, Some(500));
3650 } else {
3651 panic!("Expected Aggregate step");
3652 }
3653 }
3654
3655 #[test]
3658 fn test_build_canonical_aggregate_discard_on_timeout() {
3659 use camel_api::aggregator::AggregatorConfig;
3660
3661 let spec = RouteBuilder::from("direct:start")
3662 .route_id("canonical-discard-timeout")
3663 .aggregate(
3664 AggregatorConfig::correlate_by("key")
3665 .complete_when_size(1)
3666 .discard_on_timeout(true)
3667 .build()
3668 .unwrap(),
3669 )
3670 .build_canonical()
3671 .unwrap();
3672
3673 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3674 assert_eq!(agg.discard_on_timeout, Some(true));
3675 } else {
3676 panic!("Expected Aggregate step");
3677 }
3678 }
3679
3680 #[test]
3681 fn test_build_canonical_aggregate_force_completion_on_stop() {
3682 use camel_api::aggregator::AggregatorConfig;
3683
3684 let spec = RouteBuilder::from("direct:start")
3685 .route_id("canonical-force-stop")
3686 .aggregate(
3687 AggregatorConfig::correlate_by("key")
3688 .complete_when_size(1)
3689 .force_completion_on_stop(true)
3690 .build()
3691 .unwrap(),
3692 )
3693 .build_canonical()
3694 .unwrap();
3695
3696 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3697 assert_eq!(agg.force_completion_on_stop, Some(true));
3698 } else {
3699 panic!("Expected Aggregate step");
3700 }
3701 }
3702
3703 #[test]
3706 fn test_build_canonical_aggregate_max_buckets_and_ttl() {
3707 use camel_api::aggregator::AggregatorConfig;
3708
3709 let spec = RouteBuilder::from("direct:start")
3710 .route_id("canonical-buckets-ttl")
3711 .aggregate(
3712 AggregatorConfig::correlate_by("key")
3713 .complete_when_size(1)
3714 .max_buckets(100)
3715 .bucket_ttl(Duration::from_secs(60))
3716 .build()
3717 .unwrap(),
3718 )
3719 .build_canonical()
3720 .unwrap();
3721
3722 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3723 assert_eq!(agg.max_buckets, Some(100));
3724 assert_eq!(agg.bucket_ttl_ms, Some(60_000));
3725 } else {
3726 panic!("Expected Aggregate step");
3727 }
3728 }
3729
3730 #[test]
3733 fn test_split_builder_with_filter_inside() {
3734 use camel_api::splitter::{SplitterConfig, split_body_lines};
3735
3736 let definition = RouteBuilder::from("timer:test")
3737 .route_id("split-with-filter")
3738 .split(SplitterConfig::new(split_body_lines()))
3739 .filter(|_ex| true)
3740 .to("mock:filtered-frag")
3741 .end_filter()
3742 .end_split()
3743 .build()
3744 .unwrap();
3745
3746 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3747 assert_eq!(steps.len(), 1);
3748 assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3749 } else {
3750 panic!("Expected Split step");
3751 }
3752 }
3753
3754 #[test]
3757 fn test_wire_tap_multiple_taps() {
3758 let definition = RouteBuilder::from("timer:tick")
3759 .route_id("multi-wire-tap")
3760 .wire_tap("mock:tap1")
3761 .wire_tap("mock:tap2")
3762 .to("mock:result")
3763 .build()
3764 .unwrap();
3765
3766 assert_eq!(definition.steps().len(), 3);
3767 assert!(
3768 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap1")
3769 );
3770 assert!(
3771 matches!(&definition.steps()[1], BuilderStep::WireTap { uri } if uri == "mock:tap2")
3772 );
3773 }
3774
3775 #[test]
3778 fn test_builder_shorthand_then_explicit_mixed_mode() {
3779 let result = RouteBuilder::from("direct:start")
3780 .route_id("mixed-mode-2")
3781 .dead_letter_channel("log:dlc")
3782 .error_handler(ErrorHandlerConfig::log_only())
3783 .to("mock:out")
3784 .build();
3785
3786 let err = result.err().expect("mixed mode should fail");
3787 assert!(format!("{err}").contains("mixed error handler modes"));
3788 }
3789
3790 #[test]
3793 fn test_build_canonical_empty_from_uri_errors() {
3794 let result = RouteBuilder::from("").route_id("test").build_canonical();
3795 assert!(result.is_err());
3796 }
3797
3798 #[test]
3799 fn test_build_canonical_missing_route_id_errors() {
3800 let result = RouteBuilder::from("direct:start").build_canonical();
3801 assert!(result.is_err());
3802 let err = result.unwrap_err().to_string();
3803 assert!(err.contains("route_id"));
3804 }
3805
3806 #[test]
3809 fn test_split_builder_with_aggregate_inside() {
3810 use camel_api::aggregator::AggregatorConfig;
3811 use camel_api::splitter::{SplitterConfig, split_body_lines};
3812
3813 let definition = RouteBuilder::from("timer:test")
3814 .route_id("split-agg")
3815 .split(SplitterConfig::new(split_body_lines()))
3816 .aggregate(
3817 AggregatorConfig::correlate_by("frag-key")
3818 .complete_when_size(3)
3819 .build()
3820 .unwrap(),
3821 )
3822 .end_split()
3823 .build()
3824 .unwrap();
3825
3826 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3827 assert_eq!(steps.len(), 1);
3828 assert!(matches!(&steps[0], BuilderStep::Aggregate { .. }));
3829 } else {
3830 panic!("Expected Split step");
3831 }
3832 }
3833
3834 #[test]
3837 fn test_throttle_builder_with_steps_inside() {
3838 let definition = RouteBuilder::from("timer:tick")
3839 .route_id("throttle-steps")
3840 .throttle(10, Duration::from_secs(1))
3841 .set_header("throttled", Value::Bool(true))
3842 .to("mock:throttled")
3843 .end_throttle()
3844 .build()
3845 .unwrap();
3846
3847 if let BuilderStep::Throttle { steps, .. } = &definition.steps()[0] {
3848 assert_eq!(steps.len(), 2);
3849 } else {
3850 panic!("Expected Throttle step");
3851 }
3852 }
3853
3854 #[test]
3857 fn test_load_balance_builder_with_steps_inside() {
3858 let definition = RouteBuilder::from("timer:tick")
3859 .route_id("lb-steps")
3860 .load_balance()
3861 .round_robin()
3862 .set_header("lb", Value::Bool(true))
3863 .to("mock:lb")
3864 .end_load_balance()
3865 .build()
3866 .unwrap();
3867
3868 if let BuilderStep::LoadBalance { steps, .. } = &definition.steps()[0] {
3869 assert_eq!(steps.len(), 2);
3870 } else {
3871 panic!("Expected LoadBalance step");
3872 }
3873 }
3874
3875 #[test]
3878 fn test_multicast_builder_with_steps_inside() {
3879 let definition = RouteBuilder::from("timer:tick")
3880 .route_id("multicast-steps")
3881 .multicast()
3882 .set_header("mc", Value::Bool(true))
3883 .to("mock:multicast")
3884 .end_multicast()
3885 .build()
3886 .unwrap();
3887
3888 if let BuilderStep::Multicast { steps, .. } = &definition.steps()[0] {
3889 assert_eq!(steps.len(), 2);
3890 } else {
3891 panic!("Expected Multicast step");
3892 }
3893 }
3894
3895 #[test]
3898 fn test_loop_builder_with_steps_inside() {
3899 let definition = RouteBuilder::from("timer:tick")
3900 .route_id("loop-steps")
3901 .loop_count(3)
3902 .set_header("loop", Value::Bool(true))
3903 .to("mock:loop")
3904 .end_loop()
3905 .build()
3906 .unwrap();
3907
3908 if let BuilderStep::Loop { steps, .. } = &definition.steps()[0] {
3909 assert_eq!(steps.len(), 2);
3910 } else {
3911 panic!("Expected Loop step");
3912 }
3913 }
3914
3915 #[test]
3918 fn test_build_canonical_rejects_loop_step() {
3919 let err = RouteBuilder::from("direct:start")
3920 .route_id("canonical-loop")
3921 .loop_count(3)
3922 .to("mock:loop")
3923 .end_loop()
3924 .build_canonical()
3925 .unwrap_err();
3926
3927 assert!(format!("{err}").contains("does not support step `loop`"));
3928 }
3929
3930 #[test]
3931 fn test_build_canonical_rejects_multicast_step() {
3932 let err = RouteBuilder::from("direct:start")
3933 .route_id("canonical-multicast")
3934 .multicast()
3935 .to("mock:a")
3936 .end_multicast()
3937 .build_canonical()
3938 .unwrap_err();
3939
3940 assert!(format!("{err}").contains("does not support step `multicast`"));
3941 }
3942
3943 #[test]
3944 fn test_build_canonical_rejects_throttle_step() {
3945 let err = RouteBuilder::from("direct:start")
3946 .route_id("canonical-throttle")
3947 .throttle(10, Duration::from_secs(1))
3948 .to("mock:result")
3949 .end_throttle()
3950 .build_canonical()
3951 .unwrap_err();
3952
3953 assert!(format!("{err}").contains("does not support step `throttle`"));
3954 }
3955
3956 #[test]
3957 fn test_build_canonical_rejects_load_balancer_step() {
3958 let err = RouteBuilder::from("direct:start")
3959 .route_id("canonical-lb")
3960 .load_balance()
3961 .round_robin()
3962 .to("mock:result")
3963 .end_load_balance()
3964 .build_canonical()
3965 .unwrap_err();
3966
3967 assert!(format!("{err}").contains("does not support step `load_balancer`"));
3968 }
3969
3970 #[test]
3971 fn test_build_canonical_rejects_bean_step() {
3972 let err = RouteBuilder::from("direct:start")
3973 .route_id("canonical-bean")
3974 .bean("myBean", "process")
3975 .build_canonical()
3976 .unwrap_err();
3977
3978 assert!(format!("{err}").contains("does not support step `bean`"));
3979 }
3980
3981 #[test]
3982 fn test_build_canonical_rejects_script_step() {
3983 let err = RouteBuilder::from("direct:start")
3984 .route_id("canonical-script")
3985 .script("rhai", "x = 1")
3986 .build_canonical()
3987 .unwrap_err();
3988
3989 assert!(format!("{err}").contains("does not support step `script`"));
3990 }
3991
3992 #[test]
3993 fn test_build_canonical_accepts_delay_step() {
3994 let spec = RouteBuilder::from("direct:start")
3995 .route_id("canonical-delay")
3996 .delay(Duration::from_millis(100))
3997 .build_canonical()
3998 .unwrap();
3999
4000 assert!(
4001 spec.steps.iter().any(
4002 |s| matches!(s, CanonicalStepSpec::Delay { delay_ms, .. } if *delay_ms == 100)
4003 )
4004 );
4005 }
4006
4007 #[test]
4008 fn test_build_canonical_accepts_wire_tap_step() {
4009 let spec = RouteBuilder::from("direct:start")
4010 .route_id("canonical-wiretap")
4011 .wire_tap("mock:tap")
4012 .build_canonical()
4013 .unwrap();
4014
4015 assert!(
4016 spec.steps
4017 .iter()
4018 .any(|s| matches!(s, CanonicalStepSpec::WireTap { uri } if uri == "mock:tap"))
4019 );
4020 }
4021
4022 #[test]
4023 fn test_build_canonical_rejects_dynamic_router_step() {
4024 let err = RouteBuilder::from("direct:start")
4025 .route_id("canonical-dyn-router")
4026 .dynamic_router(Arc::new(|_| Some("mock:a".to_string())))
4027 .build_canonical()
4028 .unwrap_err();
4029
4030 assert!(format!("{err}").contains("does not support step `dynamic_router`"));
4031 }
4032
4033 #[test]
4034 fn test_build_canonical_rejects_routing_slip_step() {
4035 let err = RouteBuilder::from("direct:start")
4036 .route_id("canonical-routing-slip")
4037 .routing_slip(Arc::new(|_| Some("mock:a".to_string())))
4038 .build_canonical()
4039 .unwrap_err();
4040
4041 assert!(format!("{err}").contains("does not support step `routing_slip`"));
4042 }
4043
4044 #[test]
4045 fn test_build_canonical_rejects_recipient_list_step() {
4046 let err = RouteBuilder::from("direct:start")
4047 .route_id("canonical-recipient")
4048 .recipient_list(Arc::new(|_| "mock:a".to_string()))
4049 .build_canonical()
4050 .unwrap_err();
4051
4052 assert!(format!("{err}").contains("does not support step `recipient_list`"));
4053 }
4054
4055 #[test]
4058 fn test_build_canonical_rejects_any_mode_with_predicate() {
4059 let err = RouteBuilder::from("direct:start")
4060 .route_id("canonical-any-pred")
4061 .aggregate(AggregatorConfig {
4062 header_name: "key".to_string(),
4063 completion: CompletionMode::Any(vec![
4064 CompletionCondition::Size(5),
4065 CompletionCondition::Predicate(Arc::new(|_| false)),
4066 ]),
4067 correlation: CorrelationStrategy::HeaderName("key".to_string()),
4068 strategy: AggregationStrategy::CollectAll,
4069 max_buckets: None,
4070 bucket_ttl: None,
4071 force_completion_on_stop: false,
4072 discard_on_timeout: false,
4073 })
4074 .build_canonical()
4075 .unwrap_err();
4076
4077 assert!(format!("{err}").contains("predicate completion"));
4078 }
4079
4080 #[test]
4083 fn test_builder_validation_missing_from_uri() {
4084 let result = RouteBuilder::from("")
4085 .route_id("missing-uri-route")
4086 .to("log:info")
4087 .build();
4088 assert!(result.is_err(), "empty from URI should fail validation");
4089 let err = result.err().unwrap().to_string();
4090 assert!(
4091 err.contains("'from'") || err.contains("URI"),
4092 "error should mention from/URI, got: {err}"
4093 );
4094 }
4095
4096 #[test]
4097 fn test_builder_validation_invalid_step_uri_scheme() {
4098 let result = RouteBuilder::from("timer:tick")
4099 .route_id("bad-step-route")
4100 .to("not-a-valid-uri") .build();
4102 assert!(
4105 result.is_ok(),
4106 "builder should accept opaque step URIs; resolution happens later"
4107 );
4108 }
4109
4110 #[test]
4113 fn test_builder_duplicate_route_ids_produce_identical_definitions() {
4114 let route1 = RouteBuilder::from("direct:a")
4117 .route_id("dup-route")
4118 .to("mock:out")
4119 .build();
4120 let route2 = RouteBuilder::from("direct:b")
4121 .route_id("dup-route")
4122 .to("mock:out")
4123 .build();
4124
4125 assert!(route1.is_ok());
4126 assert!(route2.is_ok());
4127 assert_eq!(route1.unwrap().route_id(), route2.unwrap().route_id());
4128 }
4129}