1use camel_api::DelayConfig;
7use camel_api::aggregator::{
8 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
9};
10use camel_api::body::Body;
11use camel_api::body_converter::BodyType;
12use camel_api::circuit_breaker::CircuitBreakerConfig;
13use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
14use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
15use camel_api::load_balancer::LoadBalancerConfig;
16use camel_api::loop_eip::{LoopConfig, LoopMode};
17use camel_api::multicast::{MulticastConfig, MulticastStrategy};
18use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
19use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
20use camel_api::splitter::SplitterConfig;
21use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
22use camel_api::{
23 BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
24 ProcessorFn, Value,
25 runtime::{
26 CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
27 CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
28 CanonicalWhenSpec,
29 },
30};
31use camel_component_api::ConcurrencyModel;
32use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
33use camel_processor::{
34 ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
35 StreamCacheService, UnmarshalService, builtin_data_format,
36};
37
38pub trait StepAccumulator: Sized {
44 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
45
46 fn to(mut self, endpoint: impl Into<String>) -> Self {
47 self.steps_mut().push(BuilderStep::To(endpoint.into()));
48 self
49 }
50
51 fn process<F, Fut>(mut self, f: F) -> Self
52 where
53 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
54 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
55 {
56 let svc = ProcessorFn::new(f);
57 self.steps_mut()
58 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
59 self
60 }
61
62 fn process_fn(mut self, processor: BoxProcessor) -> Self {
63 self.steps_mut().push(BuilderStep::Processor(processor));
64 self
65 }
66
67 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
68 let svc = SetHeader::new(IdentityProcessor, key, value);
69 self.steps_mut()
70 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
71 self
72 }
73
74 fn map_body<F>(mut self, mapper: F) -> Self
75 where
76 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
77 {
78 let svc = MapBody::new(IdentityProcessor, mapper);
79 self.steps_mut()
80 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
81 self
82 }
83
84 fn set_body<B>(mut self, body: B) -> Self
85 where
86 B: Into<Body> + Clone + Send + Sync + 'static,
87 {
88 let body: Body = body.into();
89 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
90 self.steps_mut()
91 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
92 self
93 }
94
95 fn transform<B>(self, body: B) -> Self
100 where
101 B: Into<Body> + Clone + Send + Sync + 'static,
102 {
103 self.set_body(body)
104 }
105
106 fn set_body_fn<F>(mut self, expr: F) -> Self
107 where
108 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
109 {
110 let svc = SetBody::new(IdentityProcessor, expr);
111 self.steps_mut()
112 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
113 self
114 }
115
116 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
117 where
118 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
119 {
120 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
121 self.steps_mut()
122 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
123 self
124 }
125
126 fn aggregate(mut self, config: AggregatorConfig) -> Self {
127 self.steps_mut().push(BuilderStep::Aggregate { config });
128 self
129 }
130
131 fn stop(mut self) -> Self {
137 self.steps_mut().push(BuilderStep::Stop);
138 self
139 }
140
141 fn delay(mut self, duration: std::time::Duration) -> Self {
142 self.steps_mut().push(BuilderStep::Delay {
143 config: DelayConfig::from_duration(duration),
144 });
145 self
146 }
147
148 fn delay_with_header(
149 mut self,
150 duration: std::time::Duration,
151 header: impl Into<String>,
152 ) -> Self {
153 self.steps_mut().push(BuilderStep::Delay {
154 config: DelayConfig::from_duration_with_header(duration, header),
155 });
156 self
157 }
158
159 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
163 self.steps_mut().push(BuilderStep::Log {
164 level,
165 message: message.into(),
166 });
167 self
168 }
169
170 fn convert_body_to(mut self, target: BodyType) -> Self {
182 let svc = ConvertBodyTo::new(IdentityProcessor, target);
183 self.steps_mut()
184 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
185 self
186 }
187
188 fn stream_cache(mut self, threshold: usize) -> Self {
189 let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
190 let svc = StreamCacheService::new(IdentityProcessor, config);
191 self.steps_mut()
192 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
193 self
194 }
195
196 fn stream_cache_default(self) -> Self {
200 self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
201 }
202
203 fn marshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
214 let name = format.into();
215 let df = builtin_data_format(&name)
216 .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
217 let svc = MarshalService::new(IdentityProcessor, df);
218 self.steps_mut()
219 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
220 Ok(self)
221 }
222
223 fn unmarshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
234 let name = format.into();
235 let df = builtin_data_format(&name)
236 .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
237 let svc = UnmarshalService::new(IdentityProcessor, df);
238 self.steps_mut()
239 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
240 Ok(self)
241 }
242
243 fn validate(mut self, schema_path: impl Into<String>) -> Self {
252 let path = schema_path.into();
253 let uri = if path.starts_with("validator:") {
254 path
255 } else {
256 format!("validator:{path}")
257 };
258 self.steps_mut().push(BuilderStep::To(uri));
259 self
260 }
261
262 fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
273 self.steps_mut().push(BuilderStep::Script {
274 language: language.into(),
275 script: script.into(),
276 });
277 self
278 }
279
280 fn enrich(mut self, uri: impl Into<String>) -> Self {
286 self.steps_mut().push(BuilderStep::Enrich {
287 uri: uri.into(),
288 strategy: None,
289 timeout_ms: None,
290 });
291 self
292 }
293
294 fn poll_enrich(mut self, uri: impl Into<String>, timeout_ms: u64) -> Self {
300 self.steps_mut().push(BuilderStep::PollEnrich {
301 uri: uri.into(),
302 strategy: None,
303 timeout_ms: Some(timeout_ms),
304 });
305 self
306 }
307
308 fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
309 self.steps_mut().push(BuilderStep::Bean {
310 name: name.into(),
311 method: method.into(),
312 });
313 self
314 }
315}
316
317pub struct RouteBuilder {
332 from_uri: String,
333 steps: Vec<BuilderStep>,
334 error_handler: Option<ErrorHandlerConfig>,
335 error_handler_mode: ErrorHandlerMode,
336 circuit_breaker_config: Option<CircuitBreakerConfig>,
337 security_policy_config: Option<camel_api::security_policy::SecurityPolicyConfig>,
338 security_authenticator: Option<std::sync::Arc<dyn camel_auth::TokenAuthenticator>>,
339 concurrency: Option<ConcurrencyModel>,
340 route_id: Option<String>,
341 auto_startup: Option<bool>,
342 startup_order: Option<i32>,
343}
344
345#[derive(Default)]
346enum ErrorHandlerMode {
347 #[default]
348 None,
349 ExplicitConfig,
350 Shorthand {
351 dlc_uri: Option<String>,
352 specs: Vec<OnExceptionSpec>,
353 },
354 Mixed,
355}
356
357#[derive(Clone)]
358struct OnExceptionSpec {
359 matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
360 retry: Option<RedeliveryPolicy>,
361 handled_by: Option<String>,
362}
363
364impl RouteBuilder {
365 pub fn from(endpoint: &str) -> Self {
367 Self {
368 from_uri: endpoint.to_string(),
369 steps: Vec::new(),
370 error_handler: None,
371 error_handler_mode: ErrorHandlerMode::None,
372 circuit_breaker_config: None,
373 security_policy_config: None,
374 security_authenticator: None,
375 concurrency: None,
376 route_id: None,
377 auto_startup: None,
378 startup_order: None,
379 }
380 }
381
382 pub fn filter<F>(self, predicate: F) -> FilterBuilder
386 where
387 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
388 {
389 FilterBuilder {
390 parent: self,
391 predicate: std::sync::Arc::new(predicate),
392 steps: vec![],
393 }
394 }
395
396 pub fn choice(self) -> ChoiceBuilder {
402 ChoiceBuilder {
403 parent: self,
404 whens: vec![],
405 _otherwise: None,
406 }
407 }
408
409 pub fn wire_tap(mut self, endpoint: &str) -> Self {
413 self.steps.push(BuilderStep::WireTap {
414 uri: endpoint.to_string(),
415 });
416 self
417 }
418
419 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
421 self.error_handler_mode = match self.error_handler_mode {
422 ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
423 ErrorHandlerMode::ExplicitConfig
424 }
425 ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
426 };
427 self.error_handler = Some(config);
428 self
429 }
430
431 pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
433 let uri = uri.into();
434 self.error_handler_mode = match self.error_handler_mode {
435 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
436 dlc_uri: Some(uri),
437 specs: Vec::new(),
438 },
439 ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
440 dlc_uri: Some(uri),
441 specs,
442 },
443 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
444 };
445 self
446 }
447
448 pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
450 where
451 F: Fn(&CamelError) -> bool + Send + Sync + 'static,
452 {
453 self.error_handler_mode = match self.error_handler_mode {
454 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
455 dlc_uri: None,
456 specs: Vec::new(),
457 },
458 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
459 shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
460 };
461
462 OnExceptionBuilder {
463 parent: self,
464 policy: OnExceptionSpec {
465 matches: std::sync::Arc::new(matches),
466 retry: None,
467 handled_by: None,
468 },
469 }
470 }
471
472 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
474 self.circuit_breaker_config = Some(config);
475 self
476 }
477
478 pub fn security_policy(
479 mut self,
480 config: camel_api::security_policy::SecurityPolicyConfig,
481 ) -> Self {
482 self.security_policy_config = Some(config);
483 self
484 }
485
486 pub fn security_authenticator(
487 mut self,
488 auth: std::sync::Arc<dyn camel_auth::TokenAuthenticator>,
489 ) -> Self {
490 self.security_authenticator = Some(auth);
491 self
492 }
493
494 pub fn concurrent(mut self, max: usize) -> Self {
508 let max = if max == 0 { None } else { Some(max) };
509 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
510 self
511 }
512
513 pub fn sequential(mut self) -> Self {
518 self.concurrency = Some(ConcurrencyModel::Sequential);
519 self
520 }
521
522 pub fn route_id(mut self, id: impl Into<String>) -> Self {
526 self.route_id = Some(id.into());
527 self
528 }
529
530 pub fn auto_startup(mut self, auto: bool) -> Self {
534 self.auto_startup = Some(auto);
535 self
536 }
537
538 pub fn startup_order(mut self, order: i32) -> Self {
542 self.startup_order = Some(order);
543 self
544 }
545
546 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
552 SplitBuilder {
553 parent: self,
554 config,
555 steps: Vec::new(),
556 }
557 }
558
559 pub fn multicast(self) -> MulticastBuilder {
565 MulticastBuilder {
566 parent: self,
567 steps: Vec::new(),
568 config: MulticastConfig::new(),
569 }
570 }
571
572 pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
579 ThrottleBuilder {
580 parent: self,
581 config: ThrottlerConfig::new(max_requests, period),
582 steps: Vec::new(),
583 }
584 }
585
586 pub fn loop_count(self, count: usize) -> LoopBuilder {
588 LoopBuilder {
589 parent: self,
590 config: LoopConfig {
591 mode: LoopMode::Count(count),
592 },
593 steps: vec![],
594 }
595 }
596
597 pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
599 where
600 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
601 {
602 LoopBuilder {
603 parent: self,
604 config: LoopConfig {
605 mode: LoopMode::While(std::sync::Arc::new(predicate)),
606 },
607 steps: vec![],
608 }
609 }
610
611 pub fn load_balance(self) -> LoadBalancerBuilder {
617 LoadBalancerBuilder {
618 parent: self,
619 config: LoadBalancerConfig::round_robin(),
620 steps: Vec::new(),
621 }
622 }
623
624 pub fn dynamic_router(self, expression: RouterExpression) -> Self {
640 self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
641 }
642
643 pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
647 self.steps.push(BuilderStep::DynamicRouter { config });
648 self
649 }
650
651 pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
652 self.routing_slip_with_config(RoutingSlipConfig::new(expression))
653 }
654
655 pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
656 self.steps.push(BuilderStep::RoutingSlip { config });
657 self
658 }
659
660 pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
661 self.recipient_list_with_config(RecipientListConfig::new(expression))
662 }
663
664 pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
665 self.steps.push(BuilderStep::RecipientList { config });
666 self
667 }
668
669 pub fn build(self) -> Result<RouteDefinition, CamelError> {
675 validate_uri(&self.from_uri)?;
676 let route_id = self
677 .route_id
678 .filter(|s| !s.trim().is_empty())
679 .ok_or_else(|| {
680 CamelError::RouteError(
681 "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
682 .to_string(),
683 )
684 })?;
685 let resolved_error_handler = match self.error_handler_mode {
686 ErrorHandlerMode::None => self.error_handler,
687 ErrorHandlerMode::ExplicitConfig => self.error_handler,
688 ErrorHandlerMode::Mixed => {
689 return Err(CamelError::RouteError(
690 "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
691 ));
692 }
693 ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
694 let mut config = if let Some(uri) = dlc_uri {
695 ErrorHandlerConfig::dead_letter_channel(uri)
696 } else {
697 ErrorHandlerConfig::log_only()
698 };
699
700 for spec in specs {
701 let matcher = spec.matches.clone();
702 let mut builder = config.on_exception(move |e| matcher(e));
703
704 if let Some(retry) = spec.retry {
705 builder = builder.retry(retry.max_attempts).with_backoff(
706 retry.initial_delay,
707 retry.multiplier,
708 retry.max_delay,
709 );
710 if retry.jitter_factor > 0.0 {
711 builder = builder.with_jitter(retry.jitter_factor);
712 }
713 }
714
715 if let Some(uri) = spec.handled_by {
716 builder = builder.handled_by(uri);
717 }
718
719 config = builder.build();
720 }
721
722 Some(config)
723 }
724 };
725
726 let definition = RouteDefinition::new(self.from_uri, self.steps);
727 let definition = if let Some(eh) = resolved_error_handler {
728 definition.with_error_handler(eh)
729 } else {
730 definition
731 };
732 let definition = if let Some(cb) = self.circuit_breaker_config {
733 definition.with_circuit_breaker(cb)
734 } else {
735 definition
736 };
737 let definition = if let Some(sp) = self.security_policy_config {
738 definition.with_security_policy(sp)
739 } else {
740 definition
741 };
742 let definition = if let Some(auth) = self.security_authenticator {
743 definition.with_security_authenticator(auth)
744 } else {
745 definition
746 };
747 let definition = if let Some(concurrency) = self.concurrency {
748 definition.with_concurrency(concurrency)
749 } else {
750 definition
751 };
752 let definition = definition.with_route_id(route_id);
753 let definition = if let Some(auto) = self.auto_startup {
754 definition.with_auto_startup(auto)
755 } else {
756 definition
757 };
758 let definition = if let Some(order) = self.startup_order {
759 definition.with_startup_order(order)
760 } else {
761 definition
762 };
763 Ok(definition)
764 }
765
766 pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
768 validate_uri(&self.from_uri)?;
769 let route_id = self
770 .route_id
771 .filter(|s| !s.trim().is_empty())
772 .ok_or_else(|| {
773 CamelError::RouteError(
774 "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
775 .to_string(),
776 )
777 })?;
778
779 let steps = canonicalize_steps(self.steps)?;
780 let circuit_breaker = self
781 .circuit_breaker_config
782 .map(canonicalize_circuit_breaker);
783
784 if self.security_policy_config.is_some() {
785 return Err(CamelError::RouteError(
786 "routes with security_policy cannot use the canonical/hot-reload path (not yet supported)"
787 .into(),
788 ));
789 }
790
791 let spec = CanonicalRouteSpec {
792 route_id,
793 from: self.from_uri,
794 steps,
795 circuit_breaker,
796 version: camel_api::CANONICAL_CONTRACT_VERSION,
797 };
798 spec.validate_contract()?;
799 Ok(spec)
800 }
801}
802
803pub struct OnExceptionBuilder {
804 parent: RouteBuilder,
805 policy: OnExceptionSpec,
806}
807
808impl OnExceptionBuilder {
809 pub fn retry(mut self, max_attempts: u32) -> Self {
810 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
811 self
812 }
813
814 pub fn with_backoff(
815 mut self,
816 initial: std::time::Duration,
817 multiplier: f64,
818 max: std::time::Duration,
819 ) -> Self {
820 if let Some(ref mut retry) = self.policy.retry {
821 retry.initial_delay = initial;
822 retry.multiplier = multiplier;
823 retry.max_delay = max;
824 } else {
825 tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
826 }
827 self
828 }
829
830 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
831 if let Some(ref mut retry) = self.policy.retry {
832 retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
833 } else {
834 tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
835 }
836 self
837 }
838
839 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
840 self.policy.handled_by = Some(uri.into());
841 self
842 }
843
844 pub fn end_on_exception(mut self) -> RouteBuilder {
845 if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
846 specs.push(self.policy);
847 }
848 self.parent
849 }
850}
851
852fn validate_uri(uri: &str) -> Result<(), CamelError> {
854 let trimmed = uri.trim();
855 if trimmed.is_empty() {
856 return Err(CamelError::RouteError(
857 "route must have a 'from' URI".to_string(),
858 ));
859 }
860 if !trimmed.contains(':') {
861 return Err(CamelError::RouteError(
862 "URI must have a scheme (e.g. 'timer:tick')".to_string(),
863 ));
864 }
865 let scheme = trimmed.split(':').next().unwrap_or("");
866 if scheme.trim().is_empty() {
867 return Err(CamelError::RouteError(
868 "URI scheme must not be empty".to_string(),
869 ));
870 }
871 Ok(())
872}
873
874fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
875 let mut canonical = Vec::with_capacity(steps.len());
876 for step in steps {
877 canonical.push(canonicalize_step(step)?);
878 }
879 Ok(canonical)
880}
881
882fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
883 match step {
884 BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
885 BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
886 BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
887 BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
888 BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
889 delay_ms: config.delay_ms,
890 dynamic_header: config.dynamic_header,
891 }),
892 BuilderStep::DeclarativeScript { expression } => {
893 Ok(CanonicalStepSpec::Script { expression })
894 }
895 BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
896 predicate,
897 steps: canonicalize_steps(steps)?,
898 }),
899 BuilderStep::DeclarativeChoice { whens, otherwise } => {
900 let mut canonical_whens = Vec::with_capacity(whens.len());
901 for DeclarativeWhenStep { predicate, steps } in whens {
902 canonical_whens.push(CanonicalWhenSpec {
903 predicate,
904 steps: canonicalize_steps(steps)?,
905 });
906 }
907 let otherwise = match otherwise {
908 Some(steps) => Some(canonicalize_steps(steps)?),
909 None => None,
910 };
911 Ok(CanonicalStepSpec::Choice {
912 whens: canonical_whens,
913 otherwise,
914 })
915 }
916 BuilderStep::DeclarativeSplit {
917 expression,
918 aggregation,
919 parallel,
920 parallel_limit,
921 stop_on_exception,
922 steps,
923 } => Ok(CanonicalStepSpec::Split {
924 expression: CanonicalSplitExpressionSpec::Language(expression),
925 aggregation: canonicalize_split_aggregation(aggregation)?,
926 parallel,
927 parallel_limit,
928 stop_on_exception,
929 steps: canonicalize_steps(steps)?,
930 }),
931 BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate(
932 canonicalize_aggregate(config)?,
933 )),
934 other => {
935 let step_name = canonical_step_name(&other);
936 let detail = camel_api::canonical_contract_rejection_reason(step_name)
937 .unwrap_or("not included in canonical v1");
938 Err(CamelError::RouteError(format!(
939 "canonical v1 does not support step `{step_name}`: {detail}"
940 )))
941 }
942 }
943}
944
945fn canonicalize_split_aggregation(
946 strategy: camel_api::splitter::AggregationStrategy,
947) -> Result<CanonicalSplitAggregationSpec, CamelError> {
948 match strategy {
949 camel_api::splitter::AggregationStrategy::LastWins => {
950 Ok(CanonicalSplitAggregationSpec::LastWins)
951 }
952 camel_api::splitter::AggregationStrategy::CollectAll => {
953 Ok(CanonicalSplitAggregationSpec::CollectAll)
954 }
955 camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
956 "canonical v1 does not support custom split aggregation".to_string(),
957 )),
958 camel_api::splitter::AggregationStrategy::Original => {
959 Ok(CanonicalSplitAggregationSpec::Original)
960 }
961 }
962}
963
964fn extract_completion_fields(
965 mode: &CompletionMode,
966) -> Result<(Option<usize>, Option<u64>), CamelError> {
967 match mode {
968 CompletionMode::Single(cond) => match cond {
969 CompletionCondition::Size(n) => Ok((Some(*n), None)),
970 CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
971 CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
972 "canonical v1 does not support aggregate predicate completion".to_string(),
973 )),
974 },
975 CompletionMode::Any(conds) => {
976 let mut size = None;
977 let mut timeout_ms = None;
978 for cond in conds {
979 match cond {
980 CompletionCondition::Size(n) => size = Some(*n),
981 CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
982 CompletionCondition::Predicate(_) => {
983 return Err(CamelError::RouteError(
984 "canonical v1 does not support aggregate predicate completion"
985 .to_string(),
986 ));
987 }
988 }
989 }
990 Ok((size, timeout_ms))
991 }
992 }
993}
994
995fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
996 let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
997
998 let header = match &config.correlation {
999 CorrelationStrategy::HeaderName(h) => h.clone(),
1000 CorrelationStrategy::Expression { expr, .. } => expr.clone(),
1001 CorrelationStrategy::Fn(_) => {
1002 return Err(CamelError::RouteError(
1003 "canonical v1 does not support Fn correlation strategy".to_string(),
1004 ));
1005 }
1006 };
1007
1008 let correlation_key = match &config.correlation {
1009 CorrelationStrategy::HeaderName(_) => None,
1010 CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
1011 CorrelationStrategy::Fn(_) => unreachable!(),
1012 };
1013
1014 let strategy = match config.strategy {
1015 AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
1016 AggregationStrategy::Custom(_) => {
1017 return Err(CamelError::RouteError(
1018 "canonical v1 does not support custom aggregate strategy".to_string(),
1019 ));
1020 }
1021 };
1022 let bucket_ttl_ms = config
1023 .bucket_ttl
1024 .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
1025
1026 Ok(CanonicalAggregateSpec {
1027 header,
1028 completion_size,
1029 completion_timeout_ms,
1030 correlation_key,
1031 force_completion_on_stop: if config.force_completion_on_stop {
1032 Some(true)
1033 } else {
1034 None
1035 },
1036 discard_on_timeout: if config.discard_on_timeout {
1037 Some(true)
1038 } else {
1039 None
1040 },
1041 strategy,
1042 max_buckets: config.max_buckets,
1043 bucket_ttl_ms,
1044 })
1045}
1046
1047fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
1048 CanonicalCircuitBreakerSpec {
1049 failure_threshold: config.failure_threshold,
1050 open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
1051 }
1052}
1053
1054fn canonical_step_name(step: &BuilderStep) -> &'static str {
1055 match step {
1056 BuilderStep::Processor(_) => "processor",
1057 BuilderStep::To(_) => "to",
1058 BuilderStep::Stop => "stop",
1059 BuilderStep::Log { .. } => "log",
1060 BuilderStep::DeclarativeSetHeader { .. } => "set_header",
1061 BuilderStep::DeclarativeSetBody { .. } => "set_body",
1062 BuilderStep::DeclarativeFilter { .. } => "filter",
1063 BuilderStep::DeclarativeChoice { .. } => "choice",
1064 BuilderStep::DeclarativeScript { .. } => "script",
1065 BuilderStep::DeclarativeFunction { .. } => "function",
1066 BuilderStep::DeclarativeSplit { .. } => "split",
1067 BuilderStep::Split { .. } => "split",
1068 BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
1069 BuilderStep::Aggregate { .. } => "aggregate",
1070 BuilderStep::Filter { .. } => "filter",
1071 BuilderStep::Choice { .. } => "choice",
1072 BuilderStep::WireTap { .. } => "wire_tap",
1073 BuilderStep::Delay { .. } => "delay",
1074 BuilderStep::Multicast { .. } => "multicast",
1075 BuilderStep::DeclarativeLog { .. } => "log",
1076 BuilderStep::Bean { .. } => "bean",
1077 BuilderStep::Script { .. } => "script",
1078 BuilderStep::Throttle { .. } => "throttle",
1079 BuilderStep::LoadBalance { .. } => "load_balancer",
1080 BuilderStep::DynamicRouter { .. } => "dynamic_router",
1081 BuilderStep::RoutingSlip { .. } => "routing_slip",
1082 BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
1083 BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
1084 BuilderStep::RecipientList { .. } => "recipient_list",
1085 BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
1086 BuilderStep::DeclarativeSetProperty { .. } => "set_property",
1087 BuilderStep::Enrich { .. } => "enrich",
1088 BuilderStep::PollEnrich { .. } => "poll_enrich",
1089 }
1090}
1091
1092impl StepAccumulator for RouteBuilder {
1093 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1094 &mut self.steps
1095 }
1096}
1097
1098pub struct SplitBuilder {
1106 parent: RouteBuilder,
1107 config: SplitterConfig,
1108 steps: Vec<BuilderStep>,
1109}
1110
1111impl SplitBuilder {
1112 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1114 where
1115 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1116 {
1117 FilterInSplitBuilder {
1118 parent: self,
1119 predicate: std::sync::Arc::new(predicate),
1120 steps: vec![],
1121 }
1122 }
1123
1124 pub fn end_split(mut self) -> RouteBuilder {
1127 let split_step = BuilderStep::Split {
1128 config: self.config,
1129 steps: self.steps,
1130 };
1131 self.parent.steps.push(split_step);
1132 self.parent
1133 }
1134}
1135
1136impl StepAccumulator for SplitBuilder {
1137 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1138 &mut self.steps
1139 }
1140}
1141
1142pub struct FilterBuilder {
1144 parent: RouteBuilder,
1145 predicate: FilterPredicate,
1146 steps: Vec<BuilderStep>,
1147}
1148
1149impl FilterBuilder {
1150 pub fn end_filter(mut self) -> RouteBuilder {
1153 let step = BuilderStep::Filter {
1154 predicate: self.predicate,
1155 steps: self.steps,
1156 };
1157 self.parent.steps.push(step);
1158 self.parent
1159 }
1160}
1161
1162impl StepAccumulator for FilterBuilder {
1163 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1164 &mut self.steps
1165 }
1166}
1167
1168pub struct FilterInSplitBuilder {
1170 parent: SplitBuilder,
1171 predicate: FilterPredicate,
1172 steps: Vec<BuilderStep>,
1173}
1174
1175impl FilterInSplitBuilder {
1176 pub fn end_filter(mut self) -> SplitBuilder {
1178 let step = BuilderStep::Filter {
1179 predicate: self.predicate,
1180 steps: self.steps,
1181 };
1182 self.parent.steps.push(step);
1183 self.parent
1184 }
1185}
1186
1187impl StepAccumulator for FilterInSplitBuilder {
1188 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1189 &mut self.steps
1190 }
1191}
1192
1193pub struct ChoiceBuilder {
1200 parent: RouteBuilder,
1201 whens: Vec<WhenStep>,
1202 _otherwise: Option<Vec<BuilderStep>>,
1203}
1204
1205impl ChoiceBuilder {
1206 pub fn when<F>(self, predicate: F) -> WhenBuilder
1209 where
1210 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1211 {
1212 WhenBuilder {
1213 parent: self,
1214 predicate: std::sync::Arc::new(predicate),
1215 steps: vec![],
1216 }
1217 }
1218
1219 pub fn otherwise(self) -> OtherwiseBuilder {
1223 OtherwiseBuilder {
1224 parent: self,
1225 steps: vec![],
1226 }
1227 }
1228
1229 pub fn end_choice(mut self) -> RouteBuilder {
1233 let step = BuilderStep::Choice {
1234 whens: self.whens,
1235 otherwise: self._otherwise,
1236 };
1237 self.parent.steps.push(step);
1238 self.parent
1239 }
1240}
1241
1242pub struct WhenBuilder {
1244 parent: ChoiceBuilder,
1245 predicate: camel_api::FilterPredicate,
1246 steps: Vec<BuilderStep>,
1247}
1248
1249impl WhenBuilder {
1250 pub fn end_when(mut self) -> ChoiceBuilder {
1253 self.parent.whens.push(WhenStep {
1254 predicate: self.predicate,
1255 steps: self.steps,
1256 });
1257 self.parent
1258 }
1259}
1260
1261impl StepAccumulator for WhenBuilder {
1262 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1263 &mut self.steps
1264 }
1265}
1266
1267pub struct OtherwiseBuilder {
1269 parent: ChoiceBuilder,
1270 steps: Vec<BuilderStep>,
1271}
1272
1273impl OtherwiseBuilder {
1274 pub fn end_otherwise(self) -> ChoiceBuilder {
1276 let OtherwiseBuilder { mut parent, steps } = self;
1277 parent._otherwise = Some(steps);
1278 parent
1279 }
1280}
1281
1282impl StepAccumulator for OtherwiseBuilder {
1283 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1284 &mut self.steps
1285 }
1286}
1287
1288pub struct MulticastBuilder {
1296 parent: RouteBuilder,
1297 steps: Vec<BuilderStep>,
1298 config: MulticastConfig,
1299}
1300
1301impl MulticastBuilder {
1302 pub fn parallel(mut self, parallel: bool) -> Self {
1303 self.config = self.config.parallel(parallel);
1304 self
1305 }
1306
1307 pub fn parallel_limit(mut self, limit: usize) -> Self {
1308 self.config = self.config.parallel_limit(limit);
1309 self
1310 }
1311
1312 pub fn stop_on_exception(mut self, stop: bool) -> Self {
1313 self.config = self.config.stop_on_exception(stop);
1314 self
1315 }
1316
1317 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1318 self.config = self.config.timeout(duration);
1319 self
1320 }
1321
1322 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1323 self.config = self.config.aggregation(strategy);
1324 self
1325 }
1326
1327 pub fn end_multicast(mut self) -> RouteBuilder {
1328 let step = BuilderStep::Multicast {
1329 steps: self.steps,
1330 config: self.config,
1331 };
1332 self.parent.steps.push(step);
1333 self.parent
1334 }
1335}
1336
1337impl StepAccumulator for MulticastBuilder {
1338 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1339 &mut self.steps
1340 }
1341}
1342
1343pub struct ThrottleBuilder {
1351 parent: RouteBuilder,
1352 config: ThrottlerConfig,
1353 steps: Vec<BuilderStep>,
1354}
1355
1356impl ThrottleBuilder {
1357 pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1363 self.config = self.config.strategy(strategy);
1364 self
1365 }
1366
1367 pub fn end_throttle(mut self) -> RouteBuilder {
1370 let step = BuilderStep::Throttle {
1371 config: self.config,
1372 steps: self.steps,
1373 };
1374 self.parent.steps.push(step);
1375 self.parent
1376 }
1377}
1378
1379impl StepAccumulator for ThrottleBuilder {
1380 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1381 &mut self.steps
1382 }
1383}
1384
1385pub struct LoopBuilder {
1387 parent: RouteBuilder,
1388 config: LoopConfig,
1389 steps: Vec<BuilderStep>,
1390}
1391
1392impl LoopBuilder {
1393 pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1394 LoopInLoopBuilder {
1395 parent: self,
1396 config: LoopConfig {
1397 mode: LoopMode::Count(count),
1398 },
1399 steps: vec![],
1400 }
1401 }
1402
1403 pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1404 where
1405 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1406 {
1407 LoopInLoopBuilder {
1408 parent: self,
1409 config: LoopConfig {
1410 mode: LoopMode::While(std::sync::Arc::new(predicate)),
1411 },
1412 steps: vec![],
1413 }
1414 }
1415
1416 pub fn end_loop(mut self) -> RouteBuilder {
1417 let step = BuilderStep::Loop {
1418 config: self.config,
1419 steps: self.steps,
1420 };
1421 self.parent.steps.push(step);
1422 self.parent
1423 }
1424}
1425
1426impl StepAccumulator for LoopBuilder {
1427 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1428 &mut self.steps
1429 }
1430}
1431
1432pub struct LoopInLoopBuilder {
1433 parent: LoopBuilder,
1434 config: LoopConfig,
1435 steps: Vec<BuilderStep>,
1436}
1437
1438impl LoopInLoopBuilder {
1439 pub fn end_loop(mut self) -> LoopBuilder {
1440 let step = BuilderStep::Loop {
1441 config: self.config,
1442 steps: self.steps,
1443 };
1444 self.parent.steps.push(step);
1445 self.parent
1446 }
1447}
1448
1449impl StepAccumulator for LoopInLoopBuilder {
1450 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1451 &mut self.steps
1452 }
1453}
1454
1455pub struct LoadBalancerBuilder {
1463 parent: RouteBuilder,
1464 config: LoadBalancerConfig,
1465 steps: Vec<BuilderStep>,
1466}
1467
1468impl LoadBalancerBuilder {
1469 pub fn round_robin(mut self) -> Self {
1471 self.config = LoadBalancerConfig::round_robin();
1472 self
1473 }
1474
1475 pub fn random(mut self) -> Self {
1477 self.config = LoadBalancerConfig::random();
1478 self
1479 }
1480
1481 pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1486 self.config = LoadBalancerConfig::weighted(weights);
1487 self
1488 }
1489
1490 pub fn failover(mut self) -> Self {
1495 self.config = LoadBalancerConfig::failover();
1496 self
1497 }
1498
1499 pub fn parallel(mut self, parallel: bool) -> Self {
1504 self.config = self.config.parallel(parallel);
1505 self
1506 }
1507
1508 pub fn end_load_balance(mut self) -> RouteBuilder {
1511 let step = BuilderStep::LoadBalance {
1512 config: self.config,
1513 steps: self.steps,
1514 };
1515 self.parent.steps.push(step);
1516 self.parent
1517 }
1518}
1519
1520impl StepAccumulator for LoadBalancerBuilder {
1521 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1522 &mut self.steps
1523 }
1524}
1525
1526#[cfg(test)]
1531mod tests {
1532 use super::*;
1533 use camel_api::error_handler::ErrorHandlerConfig;
1534 use camel_api::load_balancer::LoadBalanceStrategy;
1535 use camel_api::{Exchange, Message};
1536 use camel_core::route::BuilderStep;
1537 use std::sync::Arc;
1538 use std::time::Duration;
1539 use tower::{Service, ServiceExt};
1540
1541 #[test]
1542 fn test_builder_from_creates_definition() {
1543 let definition = RouteBuilder::from("timer:tick")
1544 .route_id("test-route")
1545 .build()
1546 .unwrap();
1547 assert_eq!(definition.from_uri(), "timer:tick");
1548 }
1549
1550 #[test]
1551 fn test_builder_empty_from_uri_errors() {
1552 let result = RouteBuilder::from("").route_id("test-route").build();
1553 assert!(result.is_err());
1554 }
1555
1556 #[test]
1557 fn test_build_rejects_schemeless_uri() {
1558 let result = RouteBuilder::from("no-scheme-here")
1559 .route_id("test-route")
1560 .build();
1561 match result {
1562 Err(err) => {
1563 let err_msg = format!("{err}");
1564 assert!(
1565 err_msg.contains("scheme"),
1566 "expected scheme-related error, got: {err_msg}"
1567 );
1568 }
1569 Ok(_) => panic!("schemeless URI should fail"),
1570 }
1571 }
1572
1573 #[test]
1574 fn test_build_rejects_empty_scheme_uri() {
1575 let result = RouteBuilder::from(":missing-scheme")
1576 .route_id("test-route")
1577 .build();
1578 match result {
1579 Err(err) => {
1580 let err_msg = format!("{err}");
1581 assert!(
1582 err_msg.contains("scheme"),
1583 "expected scheme-related error, got: {err_msg}"
1584 );
1585 }
1586 Ok(_) => panic!("empty-scheme URI should fail"),
1587 }
1588 }
1589
1590 #[test]
1591 fn test_build_accepts_valid_uri() {
1592 let result = RouteBuilder::from("timer:tick")
1593 .route_id("test-route")
1594 .build();
1595 assert!(result.is_ok());
1596 }
1597
1598 #[test]
1599 fn test_build_canonical_rejects_schemeless_uri() {
1600 let result = RouteBuilder::from("no-scheme-here")
1601 .route_id("test-route")
1602 .build_canonical();
1603 assert!(result.is_err());
1604 }
1605
1606 #[test]
1607 fn test_builder_to_adds_step() {
1608 let definition = RouteBuilder::from("timer:tick")
1609 .route_id("test-route")
1610 .to("log:info")
1611 .build()
1612 .unwrap();
1613
1614 assert_eq!(definition.from_uri(), "timer:tick");
1615 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1617 }
1618
1619 #[test]
1620 fn test_builder_filter_adds_filter_step() {
1621 let definition = RouteBuilder::from("timer:tick")
1622 .route_id("test-route")
1623 .filter(|_ex| true)
1624 .to("mock:result")
1625 .end_filter()
1626 .build()
1627 .unwrap();
1628
1629 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1630 }
1631
1632 #[test]
1633 fn test_builder_set_header_adds_processor_step() {
1634 let definition = RouteBuilder::from("timer:tick")
1635 .route_id("test-route")
1636 .set_header("key", Value::String("value".into()))
1637 .build()
1638 .unwrap();
1639
1640 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1641 }
1642
1643 #[test]
1644 fn test_builder_map_body_adds_processor_step() {
1645 let definition = RouteBuilder::from("timer:tick")
1646 .route_id("test-route")
1647 .map_body(|body| body)
1648 .build()
1649 .unwrap();
1650
1651 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1652 }
1653
1654 #[test]
1655 fn test_builder_process_adds_processor_step() {
1656 let definition = RouteBuilder::from("timer:tick")
1657 .route_id("test-route")
1658 .process(|ex| async move { Ok(ex) })
1659 .build()
1660 .unwrap();
1661
1662 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1663 }
1664
1665 #[test]
1666 fn test_builder_chain_multiple_steps() {
1667 let definition = RouteBuilder::from("timer:tick")
1668 .route_id("test-route")
1669 .set_header("source", Value::String("timer".into()))
1670 .filter(|ex| ex.input.header("source").is_some())
1671 .to("log:info")
1672 .end_filter()
1673 .to("mock:result")
1674 .build()
1675 .unwrap();
1676
1677 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1681 }
1682
1683 #[test]
1684 fn test_loop_count_builder() {
1685 use camel_api::loop_eip::LoopMode;
1686
1687 let def = RouteBuilder::from("direct:start")
1688 .route_id("loop-test")
1689 .loop_count(3)
1690 .to("mock:inside")
1691 .end_loop()
1692 .to("mock:after")
1693 .build()
1694 .unwrap();
1695
1696 assert_eq!(def.steps().len(), 2);
1697 match &def.steps()[0] {
1698 BuilderStep::Loop { config, steps } => {
1699 assert!(matches!(config.mode, LoopMode::Count(3)));
1700 assert_eq!(steps.len(), 1);
1701 }
1702 other => panic!("Expected Loop, got {:?}", other),
1703 }
1704 assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1705 }
1706
1707 #[test]
1708 fn test_loop_while_builder() {
1709 use camel_api::loop_eip::LoopMode;
1710
1711 let def = RouteBuilder::from("direct:start")
1712 .route_id("loop-while-test")
1713 .loop_while(|_ex| true)
1714 .to("mock:retry")
1715 .end_loop()
1716 .build()
1717 .unwrap();
1718
1719 assert_eq!(def.steps().len(), 1);
1720 match &def.steps()[0] {
1721 BuilderStep::Loop { config, steps } => {
1722 assert!(matches!(config.mode, LoopMode::While(_)));
1723 assert_eq!(steps.len(), 1);
1724 }
1725 other => panic!("Expected Loop, got {:?}", other),
1726 }
1727 }
1728
1729 #[test]
1730 fn test_nested_loop_builder() {
1731 use camel_api::loop_eip::LoopMode;
1732
1733 let def = RouteBuilder::from("direct:start")
1734 .route_id("nested-loop-test")
1735 .loop_count(2)
1736 .to("mock:outer")
1737 .loop_count(3)
1738 .to("mock:inner")
1739 .end_loop()
1740 .end_loop()
1741 .to("mock:after")
1742 .build()
1743 .unwrap();
1744
1745 assert_eq!(def.steps().len(), 2);
1746 match &def.steps()[0] {
1747 BuilderStep::Loop { steps, .. } => {
1748 assert_eq!(steps.len(), 2);
1749 match &steps[1] {
1750 BuilderStep::Loop {
1751 config,
1752 steps: inner_steps,
1753 } => {
1754 assert!(matches!(config.mode, LoopMode::Count(3)));
1755 assert_eq!(inner_steps.len(), 1);
1756 }
1757 other => panic!("Expected nested Loop, got {:?}", other),
1758 }
1759 }
1760 other => panic!("Expected outer Loop, got {:?}", other),
1761 }
1762 }
1763
1764 #[tokio::test]
1769 async fn test_set_header_processor_works() {
1770 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1771 let exchange = Exchange::new(Message::new("test"));
1772 let result = svc.call(exchange).await.unwrap();
1773 assert_eq!(
1774 result.input.header("greeting"),
1775 Some(&Value::String("hello".into()))
1776 );
1777 }
1778
1779 #[tokio::test]
1780 async fn test_filter_processor_passes() {
1781 use camel_api::BoxProcessorExt;
1782 use camel_processor::FilterService;
1783
1784 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1785 let mut svc =
1786 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1787 let exchange = Exchange::new(Message::new("pass"));
1788 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1789 assert_eq!(result.input.body.as_text(), Some("pass"));
1790 }
1791
1792 #[tokio::test]
1793 async fn test_filter_processor_blocks() {
1794 use camel_api::BoxProcessorExt;
1795 use camel_processor::FilterService;
1796
1797 let sub = BoxProcessor::from_fn(|_ex| {
1798 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1799 });
1800 let mut svc =
1801 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1802 let exchange = Exchange::new(Message::new("reject"));
1803 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1804 assert_eq!(result.input.body.as_text(), Some("reject"));
1805 }
1806
1807 #[tokio::test]
1808 async fn test_map_body_processor_works() {
1809 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1810 if let Some(text) = body.as_text() {
1811 Body::Text(text.to_uppercase())
1812 } else {
1813 body
1814 }
1815 });
1816 let exchange = Exchange::new(Message::new("hello"));
1817 let result = mapper.oneshot(exchange).await.unwrap();
1818 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1819 }
1820
1821 #[tokio::test]
1822 async fn test_process_custom_processor_works() {
1823 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1824 ex.set_property("custom", Value::Bool(true));
1825 Ok(ex)
1826 });
1827 let exchange = Exchange::new(Message::default());
1828 let result = processor.oneshot(exchange).await.unwrap();
1829 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1830 }
1831
1832 #[tokio::test]
1837 async fn test_compose_pipeline_runs_steps_in_order() {
1838 use camel_core::route::compose_pipeline;
1839
1840 let processors = vec![
1841 BoxProcessor::new(SetHeader::new(
1842 IdentityProcessor,
1843 "step",
1844 Value::String("one".into()),
1845 )),
1846 BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1847 if let Some(text) = body.as_text() {
1848 Body::Text(format!("{}-processed", text))
1849 } else {
1850 body
1851 }
1852 })),
1853 ];
1854
1855 let pipeline = compose_pipeline(processors);
1856 let exchange = Exchange::new(Message::new("hello"));
1857 let result = pipeline.oneshot(exchange).await.unwrap();
1858
1859 assert_eq!(
1860 result.input.header("step"),
1861 Some(&Value::String("one".into()))
1862 );
1863 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1864 }
1865
1866 #[tokio::test]
1867 async fn test_compose_pipeline_empty_is_identity() {
1868 use camel_core::route::compose_pipeline;
1869
1870 let pipeline = compose_pipeline(vec![]);
1871 let exchange = Exchange::new(Message::new("unchanged"));
1872 let result = pipeline.oneshot(exchange).await.unwrap();
1873 assert_eq!(result.input.body.as_text(), Some("unchanged"));
1874 }
1875
1876 #[test]
1881 fn test_builder_circuit_breaker_sets_config() {
1882 use camel_api::circuit_breaker::CircuitBreakerConfig;
1883
1884 let config = CircuitBreakerConfig::new().failure_threshold(5);
1885 let definition = RouteBuilder::from("timer:tick")
1886 .route_id("test-route")
1887 .circuit_breaker(config)
1888 .build()
1889 .unwrap();
1890
1891 let cb = definition
1892 .circuit_breaker_config()
1893 .expect("circuit breaker should be set");
1894 assert_eq!(cb.failure_threshold, 5);
1895 }
1896
1897 #[test]
1898 fn test_builder_circuit_breaker_with_error_handler() {
1899 use camel_api::circuit_breaker::CircuitBreakerConfig;
1900 use camel_api::error_handler::ErrorHandlerConfig;
1901
1902 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1903 let eh_config = ErrorHandlerConfig::log_only();
1904
1905 let definition = RouteBuilder::from("timer:tick")
1906 .route_id("test-route")
1907 .to("log:info")
1908 .circuit_breaker(cb_config)
1909 .error_handler(eh_config)
1910 .build()
1911 .unwrap();
1912
1913 assert!(
1914 definition.circuit_breaker_config().is_some(),
1915 "circuit breaker config should be set"
1916 );
1917 }
1919
1920 #[test]
1921 fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1922 let definition = RouteBuilder::from("direct:start")
1923 .route_id("test-route")
1924 .dead_letter_channel("log:dlc")
1925 .on_exception(|e| matches!(e, CamelError::Io(_)))
1926 .retry(3)
1927 .handled_by("log:io")
1928 .end_on_exception()
1929 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1930 .retry(1)
1931 .end_on_exception()
1932 .to("mock:out")
1933 .build()
1934 .expect("route should build");
1935
1936 let cfg = definition
1937 .error_handler_config()
1938 .expect("error handler should be set");
1939 assert_eq!(cfg.policies.len(), 2);
1940 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1941 assert_eq!(
1942 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1943 Some(3)
1944 );
1945 assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1946 assert_eq!(
1947 cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1948 Some(1)
1949 );
1950 }
1951
1952 #[test]
1953 fn test_builder_on_exception_mixed_mode_rejected() {
1954 let result = RouteBuilder::from("direct:start")
1955 .route_id("test-route")
1956 .error_handler(ErrorHandlerConfig::log_only())
1957 .on_exception(|_e| true)
1958 .end_on_exception()
1959 .to("mock:out")
1960 .build();
1961
1962 let err = result.err().expect("mixed mode should fail with an error");
1963
1964 assert!(
1965 format!("{err}").contains("mixed error handler modes"),
1966 "unexpected error: {err}"
1967 );
1968 }
1969
1970 #[test]
1971 fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1972 let definition = RouteBuilder::from("direct:start")
1973 .route_id("test-route")
1974 .on_exception(|_e| true)
1975 .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1976 .with_jitter(0.5)
1977 .end_on_exception()
1978 .to("mock:out")
1979 .build()
1980 .expect("route should build");
1981
1982 let cfg = definition
1983 .error_handler_config()
1984 .expect("error handler should be set");
1985 assert_eq!(cfg.policies.len(), 1);
1986 assert!(cfg.policies[0].retry.is_none());
1987 }
1988
1989 #[test]
1990 fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1991 let definition = RouteBuilder::from("direct:start")
1992 .route_id("test-route")
1993 .dead_letter_channel("log:dlc")
1994 .to("mock:out")
1995 .build()
1996 .expect("route should build");
1997
1998 let cfg = definition
1999 .error_handler_config()
2000 .expect("error handler should be set");
2001 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
2002 assert!(cfg.policies.is_empty());
2003 }
2004
2005 #[test]
2006 fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
2007 let definition = RouteBuilder::from("direct:start")
2008 .route_id("test-route")
2009 .dead_letter_channel("log:first")
2010 .on_exception(|e| matches!(e, CamelError::Io(_)))
2011 .retry(2)
2012 .end_on_exception()
2013 .dead_letter_channel("log:second")
2014 .to("mock:out")
2015 .build()
2016 .expect("route should build");
2017
2018 let cfg = definition
2019 .error_handler_config()
2020 .expect("error handler should be set");
2021 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2022 assert_eq!(cfg.policies.len(), 1);
2023 assert_eq!(
2024 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
2025 Some(2)
2026 );
2027 }
2028
2029 #[test]
2030 fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
2031 let definition = RouteBuilder::from("direct:start")
2032 .route_id("test-route")
2033 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
2034 .retry(1)
2035 .end_on_exception()
2036 .to("mock:out")
2037 .build()
2038 .expect("route should build");
2039
2040 let cfg = definition
2041 .error_handler_config()
2042 .expect("error handler should be set");
2043 assert!(cfg.dlc_uri.is_none());
2044 assert_eq!(cfg.policies.len(), 1);
2045 }
2046
2047 #[test]
2048 fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
2049 let first = ErrorHandlerConfig::dead_letter_channel("log:first");
2050 let second = ErrorHandlerConfig::dead_letter_channel("log:second");
2051
2052 let definition = RouteBuilder::from("direct:start")
2053 .route_id("test-route")
2054 .error_handler(first)
2055 .error_handler(second)
2056 .to("mock:out")
2057 .build()
2058 .expect("route should build");
2059
2060 let cfg = definition
2061 .error_handler_config()
2062 .expect("error handler should be set");
2063 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2064 }
2065
2066 #[test]
2069 fn test_split_builder_typestate() {
2070 use camel_api::splitter::{SplitterConfig, split_body_lines};
2071
2072 let definition = RouteBuilder::from("timer:test?period=1000")
2074 .route_id("test-route")
2075 .split(SplitterConfig::new(split_body_lines()))
2076 .to("mock:per-fragment")
2077 .end_split()
2078 .to("mock:final")
2079 .build()
2080 .unwrap();
2081
2082 assert_eq!(definition.steps().len(), 2);
2084 }
2085
2086 #[test]
2087 fn test_split_builder_steps_collected() {
2088 use camel_api::splitter::{SplitterConfig, split_body_lines};
2089
2090 let definition = RouteBuilder::from("timer:test?period=1000")
2091 .route_id("test-route")
2092 .split(SplitterConfig::new(split_body_lines()))
2093 .set_header("fragment", Value::String("yes".into()))
2094 .to("mock:per-fragment")
2095 .end_split()
2096 .build()
2097 .unwrap();
2098
2099 assert_eq!(definition.steps().len(), 1);
2101 match &definition.steps()[0] {
2102 BuilderStep::Split { steps, .. } => {
2103 assert_eq!(steps.len(), 2); }
2105 other => panic!("Expected Split, got {:?}", other),
2106 }
2107 }
2108
2109 #[test]
2110 fn test_split_builder_config_propagated() {
2111 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
2112
2113 let definition = RouteBuilder::from("timer:test?period=1000")
2114 .route_id("test-route")
2115 .split(
2116 SplitterConfig::new(split_body_lines())
2117 .parallel(true)
2118 .parallel_limit(4)
2119 .aggregation(AggregationStrategy::CollectAll),
2120 )
2121 .to("mock:per-fragment")
2122 .end_split()
2123 .build()
2124 .unwrap();
2125
2126 match &definition.steps()[0] {
2127 BuilderStep::Split { config, .. } => {
2128 assert!(config.parallel);
2129 assert_eq!(config.parallel_limit, Some(4));
2130 assert!(matches!(
2131 config.aggregation,
2132 AggregationStrategy::CollectAll
2133 ));
2134 }
2135 other => panic!("Expected Split, got {:?}", other),
2136 }
2137 }
2138
2139 #[test]
2140 fn test_aggregate_builder_adds_step() {
2141 use camel_api::aggregator::AggregatorConfig;
2142 use camel_core::route::BuilderStep;
2143
2144 let definition = RouteBuilder::from("timer:tick")
2145 .route_id("test-route")
2146 .aggregate(
2147 AggregatorConfig::correlate_by("key")
2148 .complete_when_size(2)
2149 .build()
2150 .unwrap(),
2151 )
2152 .build()
2153 .unwrap();
2154
2155 assert_eq!(definition.steps().len(), 1);
2156 assert!(matches!(
2157 definition.steps()[0],
2158 BuilderStep::Aggregate { .. }
2159 ));
2160 }
2161
2162 #[test]
2163 fn test_aggregate_in_split_builder() {
2164 use camel_api::aggregator::AggregatorConfig;
2165 use camel_api::splitter::{SplitterConfig, split_body_lines};
2166 use camel_core::route::BuilderStep;
2167
2168 let definition = RouteBuilder::from("timer:tick")
2169 .route_id("test-route")
2170 .split(SplitterConfig::new(split_body_lines()))
2171 .aggregate(
2172 AggregatorConfig::correlate_by("key")
2173 .complete_when_size(1)
2174 .build()
2175 .unwrap(),
2176 )
2177 .end_split()
2178 .build()
2179 .unwrap();
2180
2181 assert_eq!(definition.steps().len(), 1);
2182 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2183 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2184 } else {
2185 panic!("expected Split step");
2186 }
2187 }
2188
2189 #[test]
2192 fn test_builder_set_body_static_adds_processor() {
2193 let definition = RouteBuilder::from("timer:tick")
2194 .route_id("test-route")
2195 .set_body("fixed")
2196 .build()
2197 .unwrap();
2198 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2199 }
2200
2201 #[test]
2202 fn test_builder_set_body_fn_adds_processor() {
2203 let definition = RouteBuilder::from("timer:tick")
2204 .route_id("test-route")
2205 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2206 .build()
2207 .unwrap();
2208 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2209 }
2210
2211 #[test]
2212 fn transform_alias_produces_same_as_set_body() {
2213 let route_transform = RouteBuilder::from("timer:tick")
2214 .route_id("test-route")
2215 .transform("hello")
2216 .build()
2217 .unwrap();
2218
2219 let route_set_body = RouteBuilder::from("timer:tick")
2220 .route_id("test-route")
2221 .set_body("hello")
2222 .build()
2223 .unwrap();
2224
2225 assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2226 }
2227
2228 #[test]
2229 fn test_builder_set_header_fn_adds_processor() {
2230 let definition = RouteBuilder::from("timer:tick")
2231 .route_id("test-route")
2232 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2233 .build()
2234 .unwrap();
2235 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2236 }
2237
2238 #[tokio::test]
2239 async fn test_set_body_static_processor_works() {
2240 use camel_core::route::compose_pipeline;
2241 let def = RouteBuilder::from("t:t")
2242 .route_id("test-route")
2243 .set_body("replaced")
2244 .build()
2245 .unwrap();
2246 let pipeline = compose_pipeline(
2247 def.steps()
2248 .iter()
2249 .filter_map(|s| {
2250 if let BuilderStep::Processor(p) = s {
2251 Some(p.clone())
2252 } else {
2253 None
2254 }
2255 })
2256 .collect(),
2257 );
2258 let exchange = Exchange::new(Message::new("original"));
2259 let result = pipeline.oneshot(exchange).await.unwrap();
2260 assert_eq!(result.input.body.as_text(), Some("replaced"));
2261 }
2262
2263 #[tokio::test]
2264 async fn test_set_body_fn_processor_works() {
2265 use camel_core::route::compose_pipeline;
2266 let def = RouteBuilder::from("t:t")
2267 .route_id("test-route")
2268 .set_body_fn(|ex: &Exchange| {
2269 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2270 })
2271 .build()
2272 .unwrap();
2273 let pipeline = compose_pipeline(
2274 def.steps()
2275 .iter()
2276 .filter_map(|s| {
2277 if let BuilderStep::Processor(p) = s {
2278 Some(p.clone())
2279 } else {
2280 None
2281 }
2282 })
2283 .collect(),
2284 );
2285 let exchange = Exchange::new(Message::new("hello"));
2286 let result = pipeline.oneshot(exchange).await.unwrap();
2287 assert_eq!(result.input.body.as_text(), Some("HELLO"));
2288 }
2289
2290 #[tokio::test]
2291 async fn test_set_header_fn_processor_works() {
2292 use camel_core::route::compose_pipeline;
2293 let def = RouteBuilder::from("t:t")
2294 .route_id("test-route")
2295 .set_header_fn("echo", |ex: &Exchange| {
2296 ex.input
2297 .body
2298 .as_text()
2299 .map(|t| Value::String(t.into()))
2300 .unwrap_or(Value::Null)
2301 })
2302 .build()
2303 .unwrap();
2304 let pipeline = compose_pipeline(
2305 def.steps()
2306 .iter()
2307 .filter_map(|s| {
2308 if let BuilderStep::Processor(p) = s {
2309 Some(p.clone())
2310 } else {
2311 None
2312 }
2313 })
2314 .collect(),
2315 );
2316 let exchange = Exchange::new(Message::new("ping"));
2317 let result = pipeline.oneshot(exchange).await.unwrap();
2318 assert_eq!(
2319 result.input.header("echo"),
2320 Some(&Value::String("ping".into()))
2321 );
2322 }
2323
2324 #[test]
2327 fn test_filter_builder_typestate() {
2328 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2329 .route_id("test-route")
2330 .filter(|_ex| true)
2331 .to("mock:inner")
2332 .end_filter()
2333 .to("mock:outer")
2334 .build();
2335 assert!(result.is_ok());
2336 }
2337
2338 #[test]
2339 fn test_filter_builder_steps_collected() {
2340 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2341 .route_id("test-route")
2342 .filter(|_ex| true)
2343 .to("mock:inner")
2344 .end_filter()
2345 .build()
2346 .unwrap();
2347
2348 assert_eq!(definition.steps().len(), 1);
2349 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2350 }
2351
2352 #[test]
2353 fn test_wire_tap_builder_adds_step() {
2354 let definition = RouteBuilder::from("timer:tick")
2355 .route_id("test-route")
2356 .wire_tap("mock:tap")
2357 .to("mock:result")
2358 .build()
2359 .unwrap();
2360
2361 assert_eq!(definition.steps().len(), 2);
2362 assert!(
2363 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2364 );
2365 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2366 }
2367
2368 #[test]
2371 fn test_multicast_builder_typestate() {
2372 let definition = RouteBuilder::from("timer:tick")
2373 .route_id("test-route")
2374 .multicast()
2375 .to("direct:a")
2376 .to("direct:b")
2377 .end_multicast()
2378 .to("mock:result")
2379 .build()
2380 .unwrap();
2381
2382 assert_eq!(definition.steps().len(), 2); }
2384
2385 #[test]
2386 fn test_multicast_builder_steps_collected() {
2387 let definition = RouteBuilder::from("timer:tick")
2388 .route_id("test-route")
2389 .multicast()
2390 .to("direct:a")
2391 .to("direct:b")
2392 .end_multicast()
2393 .build()
2394 .unwrap();
2395
2396 match &definition.steps()[0] {
2397 BuilderStep::Multicast { steps, .. } => {
2398 assert_eq!(steps.len(), 2);
2399 }
2400 other => panic!("Expected Multicast, got {:?}", other),
2401 }
2402 }
2403
2404 #[test]
2407 fn test_builder_concurrent_sets_concurrency() {
2408 use camel_component_api::ConcurrencyModel;
2409
2410 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2411 .route_id("test-route")
2412 .concurrent(16)
2413 .to("log:info")
2414 .build()
2415 .unwrap();
2416
2417 assert_eq!(
2418 definition.concurrency_override(),
2419 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2420 );
2421 }
2422
2423 #[test]
2424 fn test_builder_concurrent_zero_means_unbounded() {
2425 use camel_component_api::ConcurrencyModel;
2426
2427 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2428 .route_id("test-route")
2429 .concurrent(0)
2430 .to("log:info")
2431 .build()
2432 .unwrap();
2433
2434 assert_eq!(
2435 definition.concurrency_override(),
2436 Some(&ConcurrencyModel::Concurrent { max: None })
2437 );
2438 }
2439
2440 #[test]
2441 fn test_builder_sequential_sets_concurrency() {
2442 use camel_component_api::ConcurrencyModel;
2443
2444 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2445 .route_id("test-route")
2446 .sequential()
2447 .to("log:info")
2448 .build()
2449 .unwrap();
2450
2451 assert_eq!(
2452 definition.concurrency_override(),
2453 Some(&ConcurrencyModel::Sequential)
2454 );
2455 }
2456
2457 #[test]
2458 fn test_builder_default_concurrency_is_none() {
2459 let definition = RouteBuilder::from("timer:tick")
2460 .route_id("test-route")
2461 .to("log:info")
2462 .build()
2463 .unwrap();
2464
2465 assert_eq!(definition.concurrency_override(), None);
2466 }
2467
2468 #[test]
2471 fn test_builder_route_id_sets_id() {
2472 let definition = RouteBuilder::from("timer:tick")
2473 .route_id("my-route")
2474 .build()
2475 .unwrap();
2476
2477 assert_eq!(definition.route_id(), "my-route");
2478 }
2479
2480 #[test]
2481 fn test_build_without_route_id_fails() {
2482 let result = RouteBuilder::from("timer:tick?period=1000")
2483 .to("log:info")
2484 .build();
2485 let err = match result {
2486 Err(e) => e.to_string(),
2487 Ok(_) => panic!("build() should fail without route_id"),
2488 };
2489 assert!(
2490 err.contains("route_id"),
2491 "error should mention route_id, got: {}",
2492 err
2493 );
2494 }
2495
2496 #[test]
2497 fn test_builder_empty_route_id_rejected() {
2498 let result = RouteBuilder::from("timer:tick").route_id("").build();
2499 let err = result.err().expect("empty route_id should be rejected");
2500 assert!(matches!(err, CamelError::RouteError(_)));
2501 }
2502
2503 #[test]
2504 fn test_builder_whitespace_route_id_rejected() {
2505 let result = RouteBuilder::from("timer:tick").route_id(" ").build();
2506 assert!(result.is_err());
2507 }
2508
2509 #[test]
2510 fn test_builder_auto_startup_false() {
2511 let definition = RouteBuilder::from("timer:tick")
2512 .route_id("test-route")
2513 .auto_startup(false)
2514 .build()
2515 .unwrap();
2516
2517 assert!(!definition.auto_startup());
2518 }
2519
2520 #[test]
2521 fn test_builder_startup_order_custom() {
2522 let definition = RouteBuilder::from("timer:tick")
2523 .route_id("test-route")
2524 .startup_order(50)
2525 .build()
2526 .unwrap();
2527
2528 assert_eq!(definition.startup_order(), 50);
2529 }
2530
2531 #[test]
2532 fn test_builder_defaults() {
2533 let definition = RouteBuilder::from("timer:tick")
2534 .route_id("test-route")
2535 .build()
2536 .unwrap();
2537
2538 assert_eq!(definition.route_id(), "test-route");
2539 assert!(definition.auto_startup());
2540 assert_eq!(definition.startup_order(), 1000);
2541 }
2542
2543 #[test]
2546 fn test_choice_builder_single_when() {
2547 let definition = RouteBuilder::from("timer:tick")
2548 .route_id("test-route")
2549 .choice()
2550 .when(|ex: &Exchange| ex.input.header("type").is_some())
2551 .to("mock:typed")
2552 .end_when()
2553 .end_choice()
2554 .build()
2555 .unwrap();
2556 assert_eq!(definition.steps().len(), 1);
2557 assert!(
2558 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2559 if whens.len() == 1 && otherwise.is_none())
2560 );
2561 }
2562
2563 #[test]
2564 fn test_choice_builder_when_otherwise() {
2565 let definition = RouteBuilder::from("timer:tick")
2566 .route_id("test-route")
2567 .choice()
2568 .when(|ex: &Exchange| ex.input.header("a").is_some())
2569 .to("mock:a")
2570 .end_when()
2571 .otherwise()
2572 .to("mock:fallback")
2573 .end_otherwise()
2574 .end_choice()
2575 .build()
2576 .unwrap();
2577 assert!(
2578 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2579 if whens.len() == 1 && otherwise.is_some())
2580 );
2581 }
2582
2583 #[test]
2584 fn test_choice_builder_multiple_whens() {
2585 let definition = RouteBuilder::from("timer:tick")
2586 .route_id("test-route")
2587 .choice()
2588 .when(|ex: &Exchange| ex.input.header("a").is_some())
2589 .to("mock:a")
2590 .end_when()
2591 .when(|ex: &Exchange| ex.input.header("b").is_some())
2592 .to("mock:b")
2593 .end_when()
2594 .end_choice()
2595 .build()
2596 .unwrap();
2597 assert!(
2598 matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2599 if whens.len() == 2)
2600 );
2601 }
2602
2603 #[test]
2604 fn test_choice_step_after_choice() {
2605 let definition = RouteBuilder::from("timer:tick")
2607 .route_id("test-route")
2608 .choice()
2609 .when(|_ex: &Exchange| true)
2610 .to("mock:inner")
2611 .end_when()
2612 .end_choice()
2613 .to("mock:outer") .build()
2615 .unwrap();
2616 assert_eq!(definition.steps().len(), 2);
2617 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2618 }
2619
2620 #[test]
2623 fn test_throttle_builder_typestate() {
2624 let definition = RouteBuilder::from("timer:tick")
2625 .route_id("test-route")
2626 .throttle(10, std::time::Duration::from_secs(1))
2627 .to("mock:result")
2628 .end_throttle()
2629 .build()
2630 .unwrap();
2631
2632 assert_eq!(definition.steps().len(), 1);
2633 assert!(matches!(
2634 &definition.steps()[0],
2635 BuilderStep::Throttle { .. }
2636 ));
2637 }
2638
2639 #[test]
2640 fn test_throttle_builder_with_strategy() {
2641 let definition = RouteBuilder::from("timer:tick")
2642 .route_id("test-route")
2643 .throttle(10, std::time::Duration::from_secs(1))
2644 .strategy(ThrottleStrategy::Reject)
2645 .to("mock:result")
2646 .end_throttle()
2647 .build()
2648 .unwrap();
2649
2650 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2651 assert_eq!(config.strategy, ThrottleStrategy::Reject);
2652 } else {
2653 panic!("Expected Throttle step");
2654 }
2655 }
2656
2657 #[test]
2658 fn test_throttle_builder_steps_collected() {
2659 let definition = RouteBuilder::from("timer:tick")
2660 .route_id("test-route")
2661 .throttle(5, std::time::Duration::from_secs(1))
2662 .set_header("throttled", Value::Bool(true))
2663 .to("mock:throttled")
2664 .end_throttle()
2665 .build()
2666 .unwrap();
2667
2668 match &definition.steps()[0] {
2669 BuilderStep::Throttle { steps, .. } => {
2670 assert_eq!(steps.len(), 2); }
2672 other => panic!("Expected Throttle, got {:?}", other),
2673 }
2674 }
2675
2676 #[test]
2677 fn test_throttle_step_after_throttle() {
2678 let definition = RouteBuilder::from("timer:tick")
2680 .route_id("test-route")
2681 .throttle(10, std::time::Duration::from_secs(1))
2682 .to("mock:inner")
2683 .end_throttle()
2684 .to("mock:outer")
2685 .build()
2686 .unwrap();
2687
2688 assert_eq!(definition.steps().len(), 2);
2689 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2690 }
2691
2692 #[test]
2695 fn test_load_balance_builder_typestate() {
2696 let definition = RouteBuilder::from("timer:tick")
2697 .route_id("test-route")
2698 .load_balance()
2699 .round_robin()
2700 .to("mock:a")
2701 .to("mock:b")
2702 .end_load_balance()
2703 .build()
2704 .unwrap();
2705
2706 assert_eq!(definition.steps().len(), 1);
2707 assert!(matches!(
2708 &definition.steps()[0],
2709 BuilderStep::LoadBalance { .. }
2710 ));
2711 }
2712
2713 #[test]
2714 fn test_load_balance_builder_with_strategy() {
2715 let definition = RouteBuilder::from("timer:tick")
2716 .route_id("test-route")
2717 .load_balance()
2718 .random()
2719 .to("mock:result")
2720 .end_load_balance()
2721 .build()
2722 .unwrap();
2723
2724 if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2725 assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2726 } else {
2727 panic!("Expected LoadBalance step");
2728 }
2729 }
2730
2731 #[test]
2732 fn test_load_balance_builder_steps_collected() {
2733 let definition = RouteBuilder::from("timer:tick")
2734 .route_id("test-route")
2735 .load_balance()
2736 .set_header("lb", Value::Bool(true))
2737 .to("mock:a")
2738 .end_load_balance()
2739 .build()
2740 .unwrap();
2741
2742 match &definition.steps()[0] {
2743 BuilderStep::LoadBalance { steps, .. } => {
2744 assert_eq!(steps.len(), 2); }
2746 other => panic!("Expected LoadBalance, got {:?}", other),
2747 }
2748 }
2749
2750 #[test]
2751 fn test_load_balance_step_after_load_balance() {
2752 let definition = RouteBuilder::from("timer:tick")
2754 .route_id("test-route")
2755 .load_balance()
2756 .to("mock:inner")
2757 .end_load_balance()
2758 .to("mock:outer")
2759 .build()
2760 .unwrap();
2761
2762 assert_eq!(definition.steps().len(), 2);
2763 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2764 }
2765
2766 #[test]
2769 fn test_dynamic_router_builder() {
2770 let definition = RouteBuilder::from("timer:tick")
2771 .route_id("test-route")
2772 .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2773 .build()
2774 .unwrap();
2775
2776 assert_eq!(definition.steps().len(), 1);
2777 assert!(matches!(
2778 &definition.steps()[0],
2779 BuilderStep::DynamicRouter { .. }
2780 ));
2781 }
2782
2783 #[test]
2784 fn test_dynamic_router_builder_with_config() {
2785 let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2786 .max_iterations(100)
2787 .cache_size(500);
2788
2789 let definition = RouteBuilder::from("timer:tick")
2790 .route_id("test-route")
2791 .dynamic_router_with_config(config)
2792 .build()
2793 .unwrap();
2794
2795 assert_eq!(definition.steps().len(), 1);
2796 if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2797 assert_eq!(config.max_iterations, 100);
2798 assert_eq!(config.cache_size, 500);
2799 } else {
2800 panic!("Expected DynamicRouter step");
2801 }
2802 }
2803
2804 #[test]
2805 fn test_dynamic_router_step_after_router() {
2806 let definition = RouteBuilder::from("timer:tick")
2808 .route_id("test-route")
2809 .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2810 .to("mock:outer")
2811 .build()
2812 .unwrap();
2813
2814 assert_eq!(definition.steps().len(), 2);
2815 assert!(matches!(
2816 &definition.steps()[0],
2817 BuilderStep::DynamicRouter { .. }
2818 ));
2819 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2820 }
2821
2822 #[test]
2823 fn routing_slip_builder_creates_step() {
2824 use camel_api::RoutingSlipExpression;
2825
2826 let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2827
2828 let route = RouteBuilder::from("direct:start")
2829 .route_id("routing-slip-test")
2830 .routing_slip(expression)
2831 .build()
2832 .unwrap();
2833
2834 assert!(
2835 matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2836 "Expected RoutingSlip step"
2837 );
2838 }
2839
2840 #[test]
2841 fn routing_slip_with_config_builder_creates_step() {
2842 use camel_api::RoutingSlipConfig;
2843
2844 let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2845 .uri_delimiter("|")
2846 .cache_size(50)
2847 .ignore_invalid_endpoints(true);
2848
2849 let route = RouteBuilder::from("direct:start")
2850 .route_id("routing-slip-config-test")
2851 .routing_slip_with_config(config)
2852 .build()
2853 .unwrap();
2854
2855 if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2856 assert_eq!(config.uri_delimiter, "|");
2857 assert_eq!(config.cache_size, 50);
2858 assert!(config.ignore_invalid_endpoints);
2859 } else {
2860 panic!("Expected RoutingSlip step");
2861 }
2862 }
2863
2864 #[test]
2865 fn test_builder_marshal_adds_processor_step() {
2866 let definition = RouteBuilder::from("timer:tick")
2867 .route_id("test-route")
2868 .marshal("json")
2869 .unwrap()
2870 .build()
2871 .unwrap();
2872 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2873 }
2874
2875 #[test]
2876 fn test_builder_unmarshal_adds_processor_step() {
2877 let definition = RouteBuilder::from("timer:tick")
2878 .route_id("test-route")
2879 .unmarshal("json")
2880 .unwrap()
2881 .build()
2882 .unwrap();
2883 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2884 }
2885
2886 #[test]
2887 fn test_builder_stream_cache_adds_processor_step() {
2888 let definition = RouteBuilder::from("timer:tick")
2889 .route_id("test-route")
2890 .stream_cache(1024)
2891 .build()
2892 .unwrap();
2893 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2894 }
2895
2896 #[test]
2897 fn validate_adds_to_step_with_validator_uri() {
2898 let def = RouteBuilder::from("direct:in")
2899 .route_id("test")
2900 .validate("schemas/order.xsd")
2901 .build()
2902 .unwrap();
2903 let steps = def.steps();
2904 assert_eq!(steps.len(), 1);
2905 assert!(
2906 matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2907 "got: {:?}",
2908 steps[0]
2909 );
2910 }
2911
2912 #[test]
2913 fn test_builder_marshal_returns_err_for_unknown_format() {
2914 let result = RouteBuilder::from("timer:tick")
2915 .route_id("test-route")
2916 .marshal("csv");
2917 let err = match result {
2918 Err(e) => e,
2919 Ok(_) => panic!("marshal with unknown format should return Err"),
2920 };
2921 let msg = err.to_string();
2922 assert!(
2923 msg.contains("unknown data format"),
2924 "error should mention unknown format, got: {msg}"
2925 );
2926 assert!(
2927 msg.contains("csv"),
2928 "error should mention format name, got: {msg}"
2929 );
2930 }
2931
2932 #[test]
2933 fn test_builder_unmarshal_returns_err_for_unknown_format() {
2934 let result = RouteBuilder::from("timer:tick")
2935 .route_id("test-route")
2936 .unmarshal("csv");
2937 let err = match result {
2938 Err(e) => e,
2939 Ok(_) => panic!("unmarshal with unknown format should return Err"),
2940 };
2941 let msg = err.to_string();
2942 assert!(
2943 msg.contains("unknown data format"),
2944 "error should mention unknown format, got: {msg}"
2945 );
2946 assert!(
2947 msg.contains("csv"),
2948 "error should mention format name, got: {msg}"
2949 );
2950 }
2951
2952 #[test]
2953 fn test_builder_recipient_list_creates_step() {
2954 let route = RouteBuilder::from("direct:start")
2955 .route_id("recipient-list-test")
2956 .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2957 .build()
2958 .unwrap();
2959
2960 assert!(matches!(
2961 &route.steps()[0],
2962 BuilderStep::RecipientList { .. }
2963 ));
2964 }
2965
2966 #[test]
2967 fn test_builder_recipient_list_with_config_creates_step() {
2968 let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2969
2970 let route = RouteBuilder::from("direct:start")
2971 .route_id("recipient-list-config-test")
2972 .recipient_list_with_config(config)
2973 .build()
2974 .unwrap();
2975
2976 assert!(matches!(
2977 &route.steps()[0],
2978 BuilderStep::RecipientList { .. }
2979 ));
2980 }
2981
2982 #[test]
2983 fn test_builder_script_adds_script_step() {
2984 let route = RouteBuilder::from("direct:start")
2985 .route_id("script-test")
2986 .script("rhai", "headers[\"x\"] = \"y\"")
2987 .build()
2988 .unwrap();
2989
2990 assert!(matches!(
2991 &route.steps()[0],
2992 BuilderStep::Script { language, script }
2993 if language == "rhai" && script == "headers[\"x\"] = \"y\""
2994 ));
2995 }
2996
2997 #[test]
2998 fn test_builder_delay_and_delay_with_header_add_steps() {
2999 let route = RouteBuilder::from("direct:start")
3000 .route_id("delay-test")
3001 .delay(Duration::from_millis(250))
3002 .delay_with_header(Duration::from_millis(500), "x-delay")
3003 .build()
3004 .unwrap();
3005
3006 assert_eq!(route.steps().len(), 2);
3007 assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
3008 assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
3009 }
3010
3011 #[test]
3012 fn test_builder_log_and_stop_add_steps_in_order() {
3013 let route = RouteBuilder::from("direct:start")
3014 .route_id("log-stop-test")
3015 .log("hello", LogLevel::Info)
3016 .stop()
3017 .to("mock:after")
3018 .build()
3019 .unwrap();
3020
3021 assert_eq!(route.steps().len(), 3);
3022 assert!(matches!(
3023 &route.steps()[0],
3024 BuilderStep::Log { message, .. } if message == "hello"
3025 ));
3026 assert!(matches!(&route.steps()[1], BuilderStep::Stop));
3027 assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
3028 }
3029
3030 #[test]
3031 fn test_builder_stream_cache_default_adds_processor_step() {
3032 let route = RouteBuilder::from("direct:start")
3033 .route_id("stream-cache-default-test")
3034 .stream_cache_default()
3035 .build()
3036 .unwrap();
3037
3038 assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
3039 }
3040
3041 #[test]
3042 fn test_validate_preserves_existing_validator_prefix() {
3043 let route = RouteBuilder::from("direct:in")
3044 .route_id("validate-prefix-test")
3045 .validate("validator:schemas/order.xsd")
3046 .build()
3047 .unwrap();
3048
3049 assert!(matches!(
3050 &route.steps()[0],
3051 BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
3052 ));
3053 }
3054
3055 #[test]
3056 fn test_load_balance_builder_weighted_failover_parallel_config() {
3057 let route = RouteBuilder::from("direct:start")
3058 .route_id("lb-weighted-failover-parallel")
3059 .load_balance()
3060 .weighted(vec![
3061 ("direct:a".to_string(), 3),
3062 ("direct:b".to_string(), 1),
3063 ])
3064 .failover()
3065 .parallel(true)
3066 .to("mock:result")
3067 .end_load_balance()
3068 .build()
3069 .unwrap();
3070
3071 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3072 assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3073 assert!(config.parallel);
3074 } else {
3075 panic!("Expected LoadBalance step");
3076 }
3077 }
3078
3079 #[test]
3080 fn test_multicast_builder_all_config_setters() {
3081 let route = RouteBuilder::from("direct:start")
3082 .route_id("multicast-config-test")
3083 .multicast()
3084 .parallel(true)
3085 .parallel_limit(4)
3086 .stop_on_exception(true)
3087 .timeout(Duration::from_millis(300))
3088 .aggregation(MulticastStrategy::Original)
3089 .to("mock:a")
3090 .end_multicast()
3091 .build()
3092 .unwrap();
3093
3094 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3095 assert!(config.parallel);
3096 assert_eq!(config.parallel_limit, Some(4));
3097 assert!(config.stop_on_exception);
3098 assert_eq!(config.timeout, Some(Duration::from_millis(300)));
3099 assert!(matches!(config.aggregation, MulticastStrategy::Original));
3100 } else {
3101 panic!("Expected Multicast step");
3102 }
3103 }
3104
3105 #[test]
3106 fn test_build_canonical_rejects_unsupported_processor_step() {
3107 let err = RouteBuilder::from("direct:start")
3108 .route_id("canonical-reject")
3109 .set_header("k", Value::String("v".into()))
3110 .build_canonical()
3111 .unwrap_err();
3112
3113 assert!(format!("{err}").contains("does not support step `processor`"));
3114 }
3115
3116 #[test]
3119 fn test_load_balance_builder_weighted_strategy() {
3120 let route = RouteBuilder::from("direct:start")
3121 .route_id("lb-weighted")
3122 .load_balance()
3123 .weighted(vec![
3124 ("direct:a".to_string(), 5),
3125 ("direct:b".to_string(), 2),
3126 ("direct:c".to_string(), 1),
3127 ])
3128 .to("mock:result")
3129 .end_load_balance()
3130 .build()
3131 .unwrap();
3132
3133 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3134 assert!(matches!(config.strategy, LoadBalanceStrategy::Weighted(_)));
3135 } else {
3136 panic!("Expected LoadBalance step");
3137 }
3138 }
3139
3140 #[test]
3141 fn test_load_balance_builder_failover_strategy() {
3142 let route = RouteBuilder::from("direct:start")
3143 .route_id("lb-failover")
3144 .load_balance()
3145 .failover()
3146 .to("mock:primary")
3147 .end_load_balance()
3148 .build()
3149 .unwrap();
3150
3151 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3152 assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3153 assert!(!config.parallel);
3154 } else {
3155 panic!("Expected LoadBalance step");
3156 }
3157 }
3158
3159 #[test]
3160 fn test_load_balance_builder_parallel_false_explicit() {
3161 let route = RouteBuilder::from("direct:start")
3162 .route_id("lb-parallel-false")
3163 .load_balance()
3164 .round_robin()
3165 .parallel(false)
3166 .to("mock:result")
3167 .end_load_balance()
3168 .build()
3169 .unwrap();
3170
3171 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3172 assert!(!config.parallel);
3173 } else {
3174 panic!("Expected LoadBalance step");
3175 }
3176 }
3177
3178 #[test]
3181 fn test_filter_in_split_builder_typestate() {
3182 use camel_api::splitter::{SplitterConfig, split_body_lines};
3183
3184 let definition = RouteBuilder::from("timer:test")
3185 .route_id("filter-in-split")
3186 .split(SplitterConfig::new(split_body_lines()))
3187 .filter(|_ex| true)
3188 .to("mock:filtered")
3189 .end_filter()
3190 .end_split()
3191 .build()
3192 .unwrap();
3193
3194 assert_eq!(definition.steps().len(), 1);
3195 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3196 assert_eq!(steps.len(), 1);
3197 assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3198 } else {
3199 panic!("Expected Split step");
3200 }
3201 }
3202
3203 #[test]
3204 fn test_filter_in_split_builder_multiple_steps() {
3205 use camel_api::splitter::{SplitterConfig, split_body_lines};
3206
3207 let definition = RouteBuilder::from("timer:test")
3208 .route_id("filter-in-split-multi")
3209 .split(SplitterConfig::new(split_body_lines()))
3210 .to("mock:before-filter")
3211 .filter(|_ex| true)
3212 .to("mock:inside-filter")
3213 .end_filter()
3214 .to("mock:after-filter")
3215 .end_split()
3216 .build()
3217 .unwrap();
3218
3219 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3220 assert_eq!(steps.len(), 3);
3222 } else {
3223 panic!("Expected Split step");
3224 }
3225 }
3226
3227 #[test]
3230 fn test_build_canonical_with_circuit_breaker() {
3231 use camel_api::circuit_breaker::CircuitBreakerConfig;
3232
3233 let spec = RouteBuilder::from("direct:start")
3234 .route_id("canonical-cb")
3235 .circuit_breaker(CircuitBreakerConfig::new().failure_threshold(10))
3236 .to("mock:result")
3237 .build_canonical()
3238 .unwrap();
3239
3240 let cb = spec.circuit_breaker.expect("circuit breaker should be set");
3241 assert_eq!(cb.failure_threshold, 10);
3242 }
3243
3244 #[test]
3245 fn test_build_canonical_rejects_custom_split_aggregation() {
3246 use camel_api::splitter::{SplitterConfig, split_body_lines};
3247
3248 let err = RouteBuilder::from("direct:start")
3249 .route_id("canonical-custom-split")
3250 .split(SplitterConfig::new(split_body_lines()).aggregation(
3251 camel_api::splitter::AggregationStrategy::Custom(Arc::new(|_, ex| ex)),
3252 ))
3253 .to("mock:frag")
3254 .end_split()
3255 .build_canonical()
3256 .unwrap_err();
3257
3258 assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3260 }
3261
3262 #[test]
3263 fn test_build_canonical_rejects_custom_aggregate_strategy() {
3264 let err = RouteBuilder::from("direct:start")
3265 .route_id("canonical-custom-agg")
3266 .aggregate(
3267 AggregatorConfig::correlate_by("key")
3268 .complete_when_size(2)
3269 .strategy(AggregationStrategy::Custom(Arc::new(|_, ex| ex)))
3270 .build()
3271 .unwrap(),
3272 )
3273 .build_canonical()
3274 .unwrap_err();
3275
3276 assert!(format!("{err}").contains("custom aggregate strategy"));
3277 }
3278
3279 #[test]
3280 fn test_build_canonical_rejects_fn_correlation_strategy() {
3281 let err = RouteBuilder::from("direct:start")
3282 .route_id("canonical-fn-corr")
3283 .aggregate(AggregatorConfig {
3284 header_name: "key".to_string(),
3285 completion: CompletionMode::Single(CompletionCondition::Size(1)),
3286 correlation: CorrelationStrategy::Fn(Arc::new(|_| Some("key".to_string()))),
3287 strategy: AggregationStrategy::CollectAll,
3288 max_buckets: None,
3289 bucket_ttl: None,
3290 force_completion_on_stop: false,
3291 discard_on_timeout: false,
3292 })
3293 .build_canonical()
3294 .unwrap_err();
3295
3296 assert!(format!("{err}").contains("Fn correlation strategy"));
3297 }
3298
3299 #[test]
3300 fn test_build_canonical_rejects_predicate_completion() {
3301 let err = RouteBuilder::from("direct:start")
3302 .route_id("canonical-pred-completion")
3303 .aggregate(AggregatorConfig {
3304 header_name: "key".to_string(),
3305 completion: CompletionMode::Single(CompletionCondition::Predicate(Arc::new(
3306 |_| false,
3307 ))),
3308 correlation: CorrelationStrategy::HeaderName("key".to_string()),
3309 strategy: AggregationStrategy::CollectAll,
3310 max_buckets: None,
3311 bucket_ttl: None,
3312 force_completion_on_stop: false,
3313 discard_on_timeout: false,
3314 })
3315 .build_canonical()
3316 .unwrap_err();
3317
3318 assert!(format!("{err}").contains("predicate completion"));
3319 }
3320
3321 #[test]
3322 fn test_build_canonical_with_expression_correlation() {
3323 let spec = RouteBuilder::from("direct:start")
3324 .route_id("canonical-expr-corr")
3325 .aggregate(AggregatorConfig {
3326 header_name: "key".to_string(),
3327 completion: CompletionMode::Single(CompletionCondition::Size(1)),
3328 correlation: CorrelationStrategy::Expression {
3329 expr: "header.key".to_string(),
3330 language: "simple".to_string(),
3331 },
3332 strategy: AggregationStrategy::CollectAll,
3333 max_buckets: None,
3334 bucket_ttl: None,
3335 force_completion_on_stop: false,
3336 discard_on_timeout: false,
3337 })
3338 .build_canonical()
3339 .unwrap();
3340
3341 assert!(spec.steps.iter().any(|s| matches!(s, CanonicalStepSpec::Aggregate(a) if a.correlation_key == Some("header.key".to_string()))));
3342 }
3343
3344 #[test]
3345 fn test_build_canonical_split_rejected_with_closure_expression() {
3346 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
3347
3348 let err = RouteBuilder::from("direct:start")
3350 .route_id("canonical-split-last")
3351 .split(
3352 SplitterConfig::new(split_body_lines()).aggregation(AggregationStrategy::LastWins),
3353 )
3354 .to("mock:frag")
3355 .end_split()
3356 .build_canonical()
3357 .unwrap_err();
3358
3359 assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3360 }
3361
3362 #[test]
3365 fn test_on_exception_full_chain_retry_backoff_jitter_handled_by() {
3366 let definition = RouteBuilder::from("direct:start")
3367 .route_id("on-exception-full")
3368 .dead_letter_channel("log:dlc")
3369 .on_exception(|e| matches!(e, CamelError::Io(_)))
3370 .retry(5)
3371 .with_backoff(Duration::from_millis(10), 2.0, Duration::from_millis(500))
3372 .with_jitter(0.3)
3373 .handled_by("log:io-handler")
3374 .end_on_exception()
3375 .to("mock:out")
3376 .build()
3377 .unwrap();
3378
3379 let cfg = definition
3380 .error_handler_config()
3381 .expect("error handler should be set");
3382 assert_eq!(cfg.policies.len(), 1);
3383 let policy = &cfg.policies[0];
3384 let retry = policy.retry.as_ref().expect("retry should be set");
3385 assert_eq!(retry.max_attempts, 5);
3386 assert_eq!(retry.initial_delay, Duration::from_millis(10));
3387 assert_eq!(retry.multiplier, 2.0);
3388 assert_eq!(retry.max_delay, Duration::from_millis(500));
3389 assert!((retry.jitter_factor - 0.3).abs() < f64::EPSILON);
3390 assert_eq!(policy.handled_by.as_deref(), Some("log:io-handler"));
3391 }
3392
3393 #[test]
3394 fn test_on_exception_jitter_clamped_to_valid_range() {
3395 let definition = RouteBuilder::from("direct:start")
3396 .route_id("jitter-clamp")
3397 .on_exception(|_e| true)
3398 .retry(1)
3399 .with_jitter(5.0)
3400 .end_on_exception()
3401 .to("mock:out")
3402 .build()
3403 .unwrap();
3404
3405 let cfg = definition.error_handler_config().unwrap();
3406 let retry = cfg.policies[0].retry.as_ref().unwrap();
3407 assert!((retry.jitter_factor - 1.0).abs() < f64::EPSILON);
3408 }
3409
3410 #[test]
3413 fn test_builder_process_fn_adds_processor_step() {
3414 use camel_api::BoxProcessorExt;
3415 let processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
3416 let definition = RouteBuilder::from("timer:tick")
3417 .route_id("process-fn-test")
3418 .process_fn(processor)
3419 .build()
3420 .unwrap();
3421
3422 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3423 }
3424
3425 #[test]
3426 fn test_builder_convert_body_to_adds_processor_step() {
3427 let definition = RouteBuilder::from("timer:tick")
3428 .route_id("convert-body-test")
3429 .convert_body_to(BodyType::Json)
3430 .build()
3431 .unwrap();
3432
3433 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3434 }
3435
3436 #[test]
3437 fn test_builder_bean_adds_bean_step() {
3438 let definition = RouteBuilder::from("timer:tick")
3439 .route_id("bean-test")
3440 .bean("myBean", "process")
3441 .build()
3442 .unwrap();
3443
3444 assert!(
3445 matches!(&definition.steps()[0], BuilderStep::Bean { name, method }
3446 if name == "myBean" && method == "process")
3447 );
3448 }
3449
3450 #[test]
3453 fn test_throttle_builder_delay_strategy() {
3454 let definition = RouteBuilder::from("timer:tick")
3455 .route_id("throttle-delay")
3456 .throttle(10, Duration::from_secs(1))
3457 .strategy(ThrottleStrategy::Delay)
3458 .to("mock:result")
3459 .end_throttle()
3460 .build()
3461 .unwrap();
3462
3463 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3464 assert_eq!(config.strategy, ThrottleStrategy::Delay);
3465 } else {
3466 panic!("Expected Throttle step");
3467 }
3468 }
3469
3470 #[test]
3471 fn test_throttle_builder_drop_strategy() {
3472 let definition = RouteBuilder::from("timer:tick")
3473 .route_id("throttle-drop")
3474 .throttle(10, Duration::from_secs(1))
3475 .strategy(ThrottleStrategy::Drop)
3476 .to("mock:result")
3477 .end_throttle()
3478 .build()
3479 .unwrap();
3480
3481 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3482 assert_eq!(config.strategy, ThrottleStrategy::Drop);
3483 } else {
3484 panic!("Expected Throttle step");
3485 }
3486 }
3487
3488 #[test]
3491 fn test_nested_loop_while_builder() {
3492 use camel_api::loop_eip::LoopMode;
3493
3494 let def = RouteBuilder::from("direct:start")
3495 .route_id("nested-loop-while")
3496 .loop_count(2)
3497 .to("mock:outer")
3498 .loop_while(|_ex| true)
3499 .to("mock:inner")
3500 .end_loop()
3501 .end_loop()
3502 .build()
3503 .unwrap();
3504
3505 assert_eq!(def.steps().len(), 1);
3506 if let BuilderStep::Loop { steps, .. } = &def.steps()[0] {
3507 assert_eq!(steps.len(), 2);
3508 if let BuilderStep::Loop { config, .. } = &steps[1] {
3509 assert!(matches!(config.mode, LoopMode::While(_)));
3510 } else {
3511 panic!("Expected inner Loop step");
3512 }
3513 } else {
3514 panic!("Expected outer Loop step");
3515 }
3516 }
3517
3518 #[test]
3521 fn test_choice_builder_multiple_whens_with_otherwise() {
3522 let definition = RouteBuilder::from("timer:tick")
3523 .route_id("choice-multi-otherwise")
3524 .choice()
3525 .when(|ex: &Exchange| ex.input.header("a").is_some())
3526 .to("mock:a")
3527 .end_when()
3528 .when(|ex: &Exchange| ex.input.header("b").is_some())
3529 .to("mock:b")
3530 .end_when()
3531 .when(|ex: &Exchange| ex.input.header("c").is_some())
3532 .to("mock:c")
3533 .end_when()
3534 .otherwise()
3535 .to("mock:fallback")
3536 .end_otherwise()
3537 .end_choice()
3538 .build()
3539 .unwrap();
3540
3541 if let BuilderStep::Choice { whens, otherwise } = &definition.steps()[0] {
3542 assert_eq!(whens.len(), 3);
3543 assert!(otherwise.is_some());
3544 assert_eq!(otherwise.as_ref().unwrap().len(), 1);
3545 } else {
3546 panic!("Expected Choice step");
3547 }
3548 }
3549
3550 #[test]
3553 fn test_multicast_builder_parallel_only() {
3554 let route = RouteBuilder::from("direct:start")
3555 .route_id("multicast-parallel")
3556 .multicast()
3557 .parallel(true)
3558 .to("mock:a")
3559 .end_multicast()
3560 .build()
3561 .unwrap();
3562
3563 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3564 assert!(config.parallel);
3565 assert_eq!(config.parallel_limit, None);
3566 } else {
3567 panic!("Expected Multicast step");
3568 }
3569 }
3570
3571 #[test]
3572 fn test_multicast_builder_timeout_only() {
3573 let route = RouteBuilder::from("direct:start")
3574 .route_id("multicast-timeout")
3575 .multicast()
3576 .timeout(Duration::from_secs(5))
3577 .to("mock:a")
3578 .end_multicast()
3579 .build()
3580 .unwrap();
3581
3582 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3583 assert_eq!(config.timeout, Some(Duration::from_secs(5)));
3584 } else {
3585 panic!("Expected Multicast step");
3586 }
3587 }
3588
3589 #[test]
3590 fn test_multicast_builder_aggregation_collect_all() {
3591 let route = RouteBuilder::from("direct:start")
3592 .route_id("multicast-collect")
3593 .multicast()
3594 .aggregation(MulticastStrategy::CollectAll)
3595 .to("mock:a")
3596 .end_multicast()
3597 .build()
3598 .unwrap();
3599
3600 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3601 assert!(matches!(config.aggregation, MulticastStrategy::CollectAll));
3602 } else {
3603 panic!("Expected Multicast step");
3604 }
3605 }
3606
3607 #[test]
3610 fn test_build_canonical_aggregate_any_completion_mode() {
3611 let spec = RouteBuilder::from("direct:start")
3612 .route_id("canonical-any-completion")
3613 .aggregate(
3614 AggregatorConfig::correlate_by("key")
3615 .complete_on_size_or_timeout(10, Duration::from_secs(30))
3616 .build()
3617 .unwrap(),
3618 )
3619 .build_canonical()
3620 .unwrap();
3621
3622 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3623 assert_eq!(agg.completion_size, Some(10));
3624 assert_eq!(agg.completion_timeout_ms, Some(30_000));
3625 } else {
3626 panic!("Expected Aggregate step");
3627 }
3628 }
3629
3630 #[test]
3631 fn test_build_canonical_aggregate_timeout_completion() {
3632 let spec = RouteBuilder::from("direct:start")
3633 .route_id("canonical-timeout-completion")
3634 .aggregate(
3635 AggregatorConfig::correlate_by("key")
3636 .complete_on_timeout(Duration::from_millis(500))
3637 .build()
3638 .unwrap(),
3639 )
3640 .build_canonical()
3641 .unwrap();
3642
3643 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3644 assert_eq!(agg.completion_size, None);
3645 assert_eq!(agg.completion_timeout_ms, Some(500));
3646 } else {
3647 panic!("Expected Aggregate step");
3648 }
3649 }
3650
3651 #[test]
3654 fn test_build_canonical_aggregate_discard_on_timeout() {
3655 use camel_api::aggregator::AggregatorConfig;
3656
3657 let spec = RouteBuilder::from("direct:start")
3658 .route_id("canonical-discard-timeout")
3659 .aggregate(
3660 AggregatorConfig::correlate_by("key")
3661 .complete_when_size(1)
3662 .discard_on_timeout(true)
3663 .build()
3664 .unwrap(),
3665 )
3666 .build_canonical()
3667 .unwrap();
3668
3669 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3670 assert_eq!(agg.discard_on_timeout, Some(true));
3671 } else {
3672 panic!("Expected Aggregate step");
3673 }
3674 }
3675
3676 #[test]
3677 fn test_build_canonical_aggregate_force_completion_on_stop() {
3678 use camel_api::aggregator::AggregatorConfig;
3679
3680 let spec = RouteBuilder::from("direct:start")
3681 .route_id("canonical-force-stop")
3682 .aggregate(
3683 AggregatorConfig::correlate_by("key")
3684 .complete_when_size(1)
3685 .force_completion_on_stop(true)
3686 .build()
3687 .unwrap(),
3688 )
3689 .build_canonical()
3690 .unwrap();
3691
3692 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3693 assert_eq!(agg.force_completion_on_stop, Some(true));
3694 } else {
3695 panic!("Expected Aggregate step");
3696 }
3697 }
3698
3699 #[test]
3702 fn test_build_canonical_aggregate_max_buckets_and_ttl() {
3703 use camel_api::aggregator::AggregatorConfig;
3704
3705 let spec = RouteBuilder::from("direct:start")
3706 .route_id("canonical-buckets-ttl")
3707 .aggregate(
3708 AggregatorConfig::correlate_by("key")
3709 .complete_when_size(1)
3710 .max_buckets(100)
3711 .bucket_ttl(Duration::from_secs(60))
3712 .build()
3713 .unwrap(),
3714 )
3715 .build_canonical()
3716 .unwrap();
3717
3718 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3719 assert_eq!(agg.max_buckets, Some(100));
3720 assert_eq!(agg.bucket_ttl_ms, Some(60_000));
3721 } else {
3722 panic!("Expected Aggregate step");
3723 }
3724 }
3725
3726 #[test]
3729 fn test_split_builder_with_filter_inside() {
3730 use camel_api::splitter::{SplitterConfig, split_body_lines};
3731
3732 let definition = RouteBuilder::from("timer:test")
3733 .route_id("split-with-filter")
3734 .split(SplitterConfig::new(split_body_lines()))
3735 .filter(|_ex| true)
3736 .to("mock:filtered-frag")
3737 .end_filter()
3738 .end_split()
3739 .build()
3740 .unwrap();
3741
3742 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3743 assert_eq!(steps.len(), 1);
3744 assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3745 } else {
3746 panic!("Expected Split step");
3747 }
3748 }
3749
3750 #[test]
3753 fn test_wire_tap_multiple_taps() {
3754 let definition = RouteBuilder::from("timer:tick")
3755 .route_id("multi-wire-tap")
3756 .wire_tap("mock:tap1")
3757 .wire_tap("mock:tap2")
3758 .to("mock:result")
3759 .build()
3760 .unwrap();
3761
3762 assert_eq!(definition.steps().len(), 3);
3763 assert!(
3764 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap1")
3765 );
3766 assert!(
3767 matches!(&definition.steps()[1], BuilderStep::WireTap { uri } if uri == "mock:tap2")
3768 );
3769 }
3770
3771 #[test]
3774 fn test_builder_shorthand_then_explicit_mixed_mode() {
3775 let result = RouteBuilder::from("direct:start")
3776 .route_id("mixed-mode-2")
3777 .dead_letter_channel("log:dlc")
3778 .error_handler(ErrorHandlerConfig::log_only())
3779 .to("mock:out")
3780 .build();
3781
3782 let err = result.err().expect("mixed mode should fail");
3783 assert!(format!("{err}").contains("mixed error handler modes"));
3784 }
3785
3786 #[test]
3789 fn test_build_canonical_empty_from_uri_errors() {
3790 let result = RouteBuilder::from("").route_id("test").build_canonical();
3791 assert!(result.is_err());
3792 }
3793
3794 #[test]
3795 fn test_build_canonical_missing_route_id_errors() {
3796 let result = RouteBuilder::from("direct:start").build_canonical();
3797 assert!(result.is_err());
3798 let err = result.unwrap_err().to_string();
3799 assert!(err.contains("route_id"));
3800 }
3801
3802 #[test]
3805 fn test_split_builder_with_aggregate_inside() {
3806 use camel_api::aggregator::AggregatorConfig;
3807 use camel_api::splitter::{SplitterConfig, split_body_lines};
3808
3809 let definition = RouteBuilder::from("timer:test")
3810 .route_id("split-agg")
3811 .split(SplitterConfig::new(split_body_lines()))
3812 .aggregate(
3813 AggregatorConfig::correlate_by("frag-key")
3814 .complete_when_size(3)
3815 .build()
3816 .unwrap(),
3817 )
3818 .end_split()
3819 .build()
3820 .unwrap();
3821
3822 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3823 assert_eq!(steps.len(), 1);
3824 assert!(matches!(&steps[0], BuilderStep::Aggregate { .. }));
3825 } else {
3826 panic!("Expected Split step");
3827 }
3828 }
3829
3830 #[test]
3833 fn test_throttle_builder_with_steps_inside() {
3834 let definition = RouteBuilder::from("timer:tick")
3835 .route_id("throttle-steps")
3836 .throttle(10, Duration::from_secs(1))
3837 .set_header("throttled", Value::Bool(true))
3838 .to("mock:throttled")
3839 .end_throttle()
3840 .build()
3841 .unwrap();
3842
3843 if let BuilderStep::Throttle { steps, .. } = &definition.steps()[0] {
3844 assert_eq!(steps.len(), 2);
3845 } else {
3846 panic!("Expected Throttle step");
3847 }
3848 }
3849
3850 #[test]
3853 fn test_load_balance_builder_with_steps_inside() {
3854 let definition = RouteBuilder::from("timer:tick")
3855 .route_id("lb-steps")
3856 .load_balance()
3857 .round_robin()
3858 .set_header("lb", Value::Bool(true))
3859 .to("mock:lb")
3860 .end_load_balance()
3861 .build()
3862 .unwrap();
3863
3864 if let BuilderStep::LoadBalance { steps, .. } = &definition.steps()[0] {
3865 assert_eq!(steps.len(), 2);
3866 } else {
3867 panic!("Expected LoadBalance step");
3868 }
3869 }
3870
3871 #[test]
3874 fn test_multicast_builder_with_steps_inside() {
3875 let definition = RouteBuilder::from("timer:tick")
3876 .route_id("multicast-steps")
3877 .multicast()
3878 .set_header("mc", Value::Bool(true))
3879 .to("mock:multicast")
3880 .end_multicast()
3881 .build()
3882 .unwrap();
3883
3884 if let BuilderStep::Multicast { steps, .. } = &definition.steps()[0] {
3885 assert_eq!(steps.len(), 2);
3886 } else {
3887 panic!("Expected Multicast step");
3888 }
3889 }
3890
3891 #[test]
3894 fn test_loop_builder_with_steps_inside() {
3895 let definition = RouteBuilder::from("timer:tick")
3896 .route_id("loop-steps")
3897 .loop_count(3)
3898 .set_header("loop", Value::Bool(true))
3899 .to("mock:loop")
3900 .end_loop()
3901 .build()
3902 .unwrap();
3903
3904 if let BuilderStep::Loop { steps, .. } = &definition.steps()[0] {
3905 assert_eq!(steps.len(), 2);
3906 } else {
3907 panic!("Expected Loop step");
3908 }
3909 }
3910
3911 #[test]
3914 fn test_build_canonical_rejects_loop_step() {
3915 let err = RouteBuilder::from("direct:start")
3916 .route_id("canonical-loop")
3917 .loop_count(3)
3918 .to("mock:loop")
3919 .end_loop()
3920 .build_canonical()
3921 .unwrap_err();
3922
3923 assert!(format!("{err}").contains("does not support step `loop`"));
3924 }
3925
3926 #[test]
3927 fn test_build_canonical_rejects_multicast_step() {
3928 let err = RouteBuilder::from("direct:start")
3929 .route_id("canonical-multicast")
3930 .multicast()
3931 .to("mock:a")
3932 .end_multicast()
3933 .build_canonical()
3934 .unwrap_err();
3935
3936 assert!(format!("{err}").contains("does not support step `multicast`"));
3937 }
3938
3939 #[test]
3940 fn test_build_canonical_rejects_throttle_step() {
3941 let err = RouteBuilder::from("direct:start")
3942 .route_id("canonical-throttle")
3943 .throttle(10, Duration::from_secs(1))
3944 .to("mock:result")
3945 .end_throttle()
3946 .build_canonical()
3947 .unwrap_err();
3948
3949 assert!(format!("{err}").contains("does not support step `throttle`"));
3950 }
3951
3952 #[test]
3953 fn test_build_canonical_rejects_load_balancer_step() {
3954 let err = RouteBuilder::from("direct:start")
3955 .route_id("canonical-lb")
3956 .load_balance()
3957 .round_robin()
3958 .to("mock:result")
3959 .end_load_balance()
3960 .build_canonical()
3961 .unwrap_err();
3962
3963 assert!(format!("{err}").contains("does not support step `load_balancer`"));
3964 }
3965
3966 #[test]
3967 fn test_build_canonical_rejects_bean_step() {
3968 let err = RouteBuilder::from("direct:start")
3969 .route_id("canonical-bean")
3970 .bean("myBean", "process")
3971 .build_canonical()
3972 .unwrap_err();
3973
3974 assert!(format!("{err}").contains("does not support step `bean`"));
3975 }
3976
3977 #[test]
3978 fn test_build_canonical_rejects_script_step() {
3979 let err = RouteBuilder::from("direct:start")
3980 .route_id("canonical-script")
3981 .script("rhai", "x = 1")
3982 .build_canonical()
3983 .unwrap_err();
3984
3985 assert!(format!("{err}").contains("does not support step `script`"));
3986 }
3987
3988 #[test]
3989 fn test_build_canonical_accepts_delay_step() {
3990 let spec = RouteBuilder::from("direct:start")
3991 .route_id("canonical-delay")
3992 .delay(Duration::from_millis(100))
3993 .build_canonical()
3994 .unwrap();
3995
3996 assert!(
3997 spec.steps.iter().any(
3998 |s| matches!(s, CanonicalStepSpec::Delay { delay_ms, .. } if *delay_ms == 100)
3999 )
4000 );
4001 }
4002
4003 #[test]
4004 fn test_build_canonical_accepts_wire_tap_step() {
4005 let spec = RouteBuilder::from("direct:start")
4006 .route_id("canonical-wiretap")
4007 .wire_tap("mock:tap")
4008 .build_canonical()
4009 .unwrap();
4010
4011 assert!(
4012 spec.steps
4013 .iter()
4014 .any(|s| matches!(s, CanonicalStepSpec::WireTap { uri } if uri == "mock:tap"))
4015 );
4016 }
4017
4018 #[test]
4019 fn test_build_canonical_rejects_dynamic_router_step() {
4020 let err = RouteBuilder::from("direct:start")
4021 .route_id("canonical-dyn-router")
4022 .dynamic_router(Arc::new(|_| Some("mock:a".to_string())))
4023 .build_canonical()
4024 .unwrap_err();
4025
4026 assert!(format!("{err}").contains("does not support step `dynamic_router`"));
4027 }
4028
4029 #[test]
4030 fn test_build_canonical_rejects_routing_slip_step() {
4031 let err = RouteBuilder::from("direct:start")
4032 .route_id("canonical-routing-slip")
4033 .routing_slip(Arc::new(|_| Some("mock:a".to_string())))
4034 .build_canonical()
4035 .unwrap_err();
4036
4037 assert!(format!("{err}").contains("does not support step `routing_slip`"));
4038 }
4039
4040 #[test]
4041 fn test_build_canonical_rejects_recipient_list_step() {
4042 let err = RouteBuilder::from("direct:start")
4043 .route_id("canonical-recipient")
4044 .recipient_list(Arc::new(|_| "mock:a".to_string()))
4045 .build_canonical()
4046 .unwrap_err();
4047
4048 assert!(format!("{err}").contains("does not support step `recipient_list`"));
4049 }
4050
4051 #[test]
4054 fn test_build_canonical_rejects_any_mode_with_predicate() {
4055 let err = RouteBuilder::from("direct:start")
4056 .route_id("canonical-any-pred")
4057 .aggregate(AggregatorConfig {
4058 header_name: "key".to_string(),
4059 completion: CompletionMode::Any(vec![
4060 CompletionCondition::Size(5),
4061 CompletionCondition::Predicate(Arc::new(|_| false)),
4062 ]),
4063 correlation: CorrelationStrategy::HeaderName("key".to_string()),
4064 strategy: AggregationStrategy::CollectAll,
4065 max_buckets: None,
4066 bucket_ttl: None,
4067 force_completion_on_stop: false,
4068 discard_on_timeout: false,
4069 })
4070 .build_canonical()
4071 .unwrap_err();
4072
4073 assert!(format!("{err}").contains("predicate completion"));
4074 }
4075
4076 #[test]
4079 fn test_builder_validation_missing_from_uri() {
4080 let result = RouteBuilder::from("")
4081 .route_id("missing-uri-route")
4082 .to("log:info")
4083 .build();
4084 assert!(result.is_err(), "empty from URI should fail validation");
4085 let err = result.err().unwrap().to_string();
4086 assert!(
4087 err.contains("'from'") || err.contains("URI"),
4088 "error should mention from/URI, got: {err}"
4089 );
4090 }
4091
4092 #[test]
4093 fn test_builder_validation_invalid_step_uri_scheme() {
4094 let result = RouteBuilder::from("timer:tick")
4095 .route_id("bad-step-route")
4096 .to("not-a-valid-uri") .build();
4098 assert!(
4101 result.is_ok(),
4102 "builder should accept opaque step URIs; resolution happens later"
4103 );
4104 }
4105
4106 #[test]
4109 fn test_builder_duplicate_route_ids_produce_identical_definitions() {
4110 let route1 = RouteBuilder::from("direct:a")
4113 .route_id("dup-route")
4114 .to("mock:out")
4115 .build();
4116 let route2 = RouteBuilder::from("direct:b")
4117 .route_id("dup-route")
4118 .to("mock:out")
4119 .build();
4120
4121 assert!(route1.is_ok());
4122 assert!(route2.is_ok());
4123 assert_eq!(route1.unwrap().route_id(), route2.unwrap().route_id());
4124 }
4125}