1use camel_api::DelayConfig;
7use camel_api::aggregator::{
8 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
9};
10use camel_api::body::Body;
11use camel_api::body_converter::BodyType;
12use camel_api::circuit_breaker::CircuitBreakerConfig;
13use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
14use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
15use camel_api::load_balancer::LoadBalancerConfig;
16use camel_api::loop_eip::{LoopConfig, LoopMode};
17use camel_api::multicast::{MulticastConfig, MulticastStrategy};
18use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
19use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
20use camel_api::splitter::SplitterConfig;
21use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
22use camel_api::{
23 BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
24 LanguageExpressionDef, ProcessorFn, Value,
25 runtime::{
26 CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
27 CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
28 CanonicalWhenSpec,
29 },
30};
31use camel_component_api::ConcurrencyModel;
32use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
33use camel_processor::{
34 ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
35 StreamCacheService, UnmarshalService, builtin_data_format,
36};
37
38pub mod do_try;
40pub use do_try::{DoCatchBuilder, DoFinallyBuilder, DoTryBuilder};
41
42pub trait StepAccumulator: Sized {
48 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
49
50 fn to(mut self, endpoint: impl Into<String>) -> Self {
51 self.steps_mut().push(BuilderStep::To(endpoint.into()));
52 self
53 }
54
55 fn process<F, Fut>(mut self, f: F) -> Self
56 where
57 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
58 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
59 {
60 let svc = ProcessorFn::new(f);
61 self.steps_mut()
62 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
63 self
64 }
65
66 fn process_fn(mut self, processor: BoxProcessor) -> Self {
67 self.steps_mut().push(BuilderStep::Processor(processor));
68 self
69 }
70
71 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
72 let svc = SetHeader::new(IdentityProcessor, key, value);
73 self.steps_mut()
74 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
75 self
76 }
77
78 fn map_body<F>(mut self, mapper: F) -> Self
79 where
80 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
81 {
82 let svc = MapBody::new(IdentityProcessor, mapper);
83 self.steps_mut()
84 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
85 self
86 }
87
88 fn set_body<B>(mut self, body: B) -> Self
89 where
90 B: Into<Body> + Clone + Send + Sync + 'static,
91 {
92 let body: Body = body.into();
93 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
94 self.steps_mut()
95 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
96 self
97 }
98
99 fn transform<B>(self, body: B) -> Self
104 where
105 B: Into<Body> + Clone + Send + Sync + 'static,
106 {
107 self.set_body(body)
108 }
109
110 fn set_body_fn<F>(mut self, expr: F) -> Self
111 where
112 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
113 {
114 let svc = SetBody::new(IdentityProcessor, expr);
115 self.steps_mut()
116 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
117 self
118 }
119
120 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
121 where
122 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
123 {
124 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
125 self.steps_mut()
126 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
127 self
128 }
129
130 fn aggregate(mut self, config: AggregatorConfig) -> Self {
131 self.steps_mut().push(BuilderStep::Aggregate { config });
132 self
133 }
134
135 fn stop(mut self) -> Self {
141 self.steps_mut().push(BuilderStep::Stop);
142 self
143 }
144
145 fn delay(mut self, duration: std::time::Duration) -> Self {
146 self.steps_mut().push(BuilderStep::Delay {
147 config: DelayConfig::from_duration(duration),
148 });
149 self
150 }
151
152 fn delay_with_header(
153 mut self,
154 duration: std::time::Duration,
155 header: impl Into<String>,
156 ) -> Self {
157 self.steps_mut().push(BuilderStep::Delay {
158 config: DelayConfig::from_duration_with_header(duration, header),
159 });
160 self
161 }
162
163 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
167 self.steps_mut().push(BuilderStep::Log {
168 level,
169 message: message.into(),
170 });
171 self
172 }
173
174 fn convert_body_to(mut self, target: BodyType) -> Self {
186 let svc = ConvertBodyTo::new(IdentityProcessor, target);
187 self.steps_mut()
188 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
189 self
190 }
191
192 fn stream_cache(mut self, threshold: usize) -> Self {
193 let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
194 let svc = StreamCacheService::new(IdentityProcessor, config);
195 self.steps_mut()
196 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
197 self
198 }
199
200 fn stream_cache_default(self) -> Self {
204 self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
205 }
206
207 fn marshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
218 let name = format.into();
219 let df = builtin_data_format(&name)
220 .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
221 let svc = MarshalService::new(IdentityProcessor, df);
222 self.steps_mut()
223 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
224 Ok(self)
225 }
226
227 fn unmarshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
238 let name = format.into();
239 let df = builtin_data_format(&name)
240 .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
241 let svc = UnmarshalService::new(IdentityProcessor, df);
242 self.steps_mut()
243 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
244 Ok(self)
245 }
246
247 fn validate(mut self, expression: impl Into<String>) -> Self {
257 let source = expression.into();
258 let expression = LanguageExpressionDef {
259 language: "simple".into(),
260 source,
261 };
262 self.steps_mut().push(BuilderStep::Validate {
263 predicate: expression,
264 });
265 self
266 }
267
268 fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
279 self.steps_mut().push(BuilderStep::Script {
280 language: language.into(),
281 script: script.into(),
282 });
283 self
284 }
285
286 fn enrich(mut self, uri: impl Into<String>) -> Self {
292 self.steps_mut().push(BuilderStep::Enrich {
293 uri: uri.into(),
294 strategy: None,
295 timeout_ms: None,
296 });
297 self
298 }
299
300 fn poll_enrich(mut self, uri: impl Into<String>, timeout_ms: u64) -> Self {
306 self.steps_mut().push(BuilderStep::PollEnrich {
307 uri: uri.into(),
308 strategy: None,
309 timeout_ms: Some(timeout_ms),
310 });
311 self
312 }
313
314 fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
315 self.steps_mut().push(BuilderStep::Bean {
316 name: name.into(),
317 method: method.into(),
318 });
319 self
320 }
321}
322
323pub struct RouteBuilder {
338 from_uri: String,
339 steps: Vec<BuilderStep>,
340 error_handler: Option<ErrorHandlerConfig>,
341 error_handler_mode: ErrorHandlerMode,
342 circuit_breaker_config: Option<CircuitBreakerConfig>,
343 security_policy_config: Option<camel_api::security_policy::SecurityPolicyConfig>,
344 security_authenticator: Option<std::sync::Arc<dyn camel_auth::TokenAuthenticator>>,
345 concurrency: Option<ConcurrencyModel>,
346 route_id: Option<String>,
347 auto_startup: Option<bool>,
348 startup_order: Option<i32>,
349}
350
351#[derive(Default)]
352enum ErrorHandlerMode {
353 #[default]
354 None,
355 ExplicitConfig,
356 Shorthand {
357 dlc_uri: Option<String>,
358 specs: Vec<OnExceptionSpec>,
359 },
360 Mixed,
361}
362
363#[derive(Clone)]
364struct OnExceptionSpec {
365 matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
366 retry: Option<RedeliveryPolicy>,
367 handled_by: Option<String>,
368}
369
370impl RouteBuilder {
371 pub fn from(endpoint: &str) -> Self {
373 Self {
374 from_uri: endpoint.to_string(),
375 steps: Vec::new(),
376 error_handler: None,
377 error_handler_mode: ErrorHandlerMode::None,
378 circuit_breaker_config: None,
379 security_policy_config: None,
380 security_authenticator: None,
381 concurrency: None,
382 route_id: None,
383 auto_startup: None,
384 startup_order: None,
385 }
386 }
387
388 pub fn filter<F>(self, predicate: F) -> FilterBuilder
392 where
393 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
394 {
395 FilterBuilder {
396 parent: self,
397 predicate: std::sync::Arc::new(predicate),
398 steps: vec![],
399 }
400 }
401
402 pub fn choice(self) -> ChoiceBuilder {
408 ChoiceBuilder {
409 parent: self,
410 whens: vec![],
411 _otherwise: None,
412 }
413 }
414
415 pub fn wire_tap(mut self, endpoint: &str) -> Self {
419 self.steps.push(BuilderStep::WireTap {
420 uri: endpoint.to_string(),
421 });
422 self
423 }
424
425 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
427 self.error_handler_mode = match self.error_handler_mode {
428 ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
429 ErrorHandlerMode::ExplicitConfig
430 }
431 ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
432 };
433 self.error_handler = Some(config);
434 self
435 }
436
437 pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
439 let uri = uri.into();
440 self.error_handler_mode = match self.error_handler_mode {
441 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
442 dlc_uri: Some(uri),
443 specs: Vec::new(),
444 },
445 ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
446 dlc_uri: Some(uri),
447 specs,
448 },
449 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
450 };
451 self
452 }
453
454 pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
456 where
457 F: Fn(&CamelError) -> bool + Send + Sync + 'static,
458 {
459 self.error_handler_mode = match self.error_handler_mode {
460 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
461 dlc_uri: None,
462 specs: Vec::new(),
463 },
464 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
465 shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
466 };
467
468 OnExceptionBuilder {
469 parent: self,
470 policy: OnExceptionSpec {
471 matches: std::sync::Arc::new(matches),
472 retry: None,
473 handled_by: None,
474 },
475 }
476 }
477
478 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
480 self.circuit_breaker_config = Some(config);
481 self
482 }
483
484 pub fn security_policy(
485 mut self,
486 config: camel_api::security_policy::SecurityPolicyConfig,
487 ) -> Self {
488 self.security_policy_config = Some(config);
489 self
490 }
491
492 pub fn security_authenticator(
493 mut self,
494 auth: std::sync::Arc<dyn camel_auth::TokenAuthenticator>,
495 ) -> Self {
496 self.security_authenticator = Some(auth);
497 self
498 }
499
500 pub fn concurrent(mut self, max: usize) -> Self {
514 let max = if max == 0 { None } else { Some(max) };
515 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
516 self
517 }
518
519 pub fn sequential(mut self) -> Self {
524 self.concurrency = Some(ConcurrencyModel::Sequential);
525 self
526 }
527
528 pub fn route_id(mut self, id: impl Into<String>) -> Self {
532 self.route_id = Some(id.into());
533 self
534 }
535
536 pub fn auto_startup(mut self, auto: bool) -> Self {
540 self.auto_startup = Some(auto);
541 self
542 }
543
544 pub fn startup_order(mut self, order: i32) -> Self {
548 self.startup_order = Some(order);
549 self
550 }
551
552 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
558 SplitBuilder {
559 parent: self,
560 config,
561 steps: Vec::new(),
562 }
563 }
564
565 pub fn multicast(self) -> MulticastBuilder {
571 MulticastBuilder {
572 parent: self,
573 steps: Vec::new(),
574 config: MulticastConfig::new(),
575 }
576 }
577
578 pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
585 ThrottleBuilder {
586 parent: self,
587 config: ThrottlerConfig::new(max_requests, period),
588 steps: Vec::new(),
589 }
590 }
591
592 pub fn loop_count(self, count: usize) -> LoopBuilder {
594 LoopBuilder {
595 parent: self,
596 config: LoopConfig {
597 mode: LoopMode::Count(count),
598 },
599 steps: vec![],
600 }
601 }
602
603 pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
605 where
606 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
607 {
608 LoopBuilder {
609 parent: self,
610 config: LoopConfig {
611 mode: LoopMode::While(std::sync::Arc::new(predicate)),
612 },
613 steps: vec![],
614 }
615 }
616
617 pub fn load_balance(self) -> LoadBalancerBuilder {
623 LoadBalancerBuilder {
624 parent: self,
625 config: LoadBalancerConfig::round_robin(),
626 steps: Vec::new(),
627 }
628 }
629
630 pub fn dynamic_router(self, expression: RouterExpression) -> Self {
646 self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
647 }
648
649 pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
653 self.steps.push(BuilderStep::DynamicRouter { config });
654 self
655 }
656
657 pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
658 self.routing_slip_with_config(RoutingSlipConfig::new(expression))
659 }
660
661 pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
662 self.steps.push(BuilderStep::RoutingSlip { config });
663 self
664 }
665
666 pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
667 self.recipient_list_with_config(RecipientListConfig::new(expression))
668 }
669
670 pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
671 self.steps.push(BuilderStep::RecipientList { config });
672 self
673 }
674
675 pub fn build(self) -> Result<RouteDefinition, CamelError> {
681 validate_uri(&self.from_uri)?;
682 let route_id = self
683 .route_id
684 .filter(|s| !s.trim().is_empty())
685 .ok_or_else(|| {
686 CamelError::RouteError(
687 "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
688 .to_string(),
689 )
690 })?;
691 let resolved_error_handler = match self.error_handler_mode {
692 ErrorHandlerMode::None => self.error_handler,
693 ErrorHandlerMode::ExplicitConfig => self.error_handler,
694 ErrorHandlerMode::Mixed => {
695 return Err(CamelError::RouteError(
696 "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
697 ));
698 }
699 ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
700 let mut config = if let Some(uri) = dlc_uri {
701 ErrorHandlerConfig::dead_letter_channel(uri)
702 } else {
703 ErrorHandlerConfig::log_only()
704 };
705
706 for spec in specs {
707 let matcher = spec.matches.clone();
708 let mut builder = config.on_exception(move |e| matcher(e));
709
710 if let Some(retry) = spec.retry {
711 builder = builder.retry(retry.max_attempts).with_backoff(
712 retry.initial_delay,
713 retry.multiplier,
714 retry.max_delay,
715 );
716 if retry.jitter_factor > 0.0 {
717 builder = builder.with_jitter(retry.jitter_factor);
718 }
719 }
720
721 if let Some(uri) = spec.handled_by {
722 builder = builder.handled_by(uri);
723 }
724
725 config = builder.build();
726 }
727
728 Some(config)
729 }
730 };
731
732 let definition = RouteDefinition::new(self.from_uri, self.steps);
733 let definition = if let Some(eh) = resolved_error_handler {
734 definition.with_error_handler(eh)
735 } else {
736 definition
737 };
738 let definition = if let Some(cb) = self.circuit_breaker_config {
739 definition.with_circuit_breaker(cb)
740 } else {
741 definition
742 };
743 let definition = if let Some(sp) = self.security_policy_config {
744 definition.with_security_policy(sp)
745 } else {
746 definition
747 };
748 let definition = if let Some(auth) = self.security_authenticator {
749 definition.with_security_authenticator(auth)
750 } else {
751 definition
752 };
753 let definition = if let Some(concurrency) = self.concurrency {
754 definition.with_concurrency(concurrency)
755 } else {
756 definition
757 };
758 let definition = definition.with_route_id(route_id);
759 let definition = if let Some(auto) = self.auto_startup {
760 definition.with_auto_startup(auto)
761 } else {
762 definition
763 };
764 let definition = if let Some(order) = self.startup_order {
765 definition.with_startup_order(order)
766 } else {
767 definition
768 };
769 Ok(definition)
770 }
771
772 pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
774 validate_uri(&self.from_uri)?;
775 let route_id = self
776 .route_id
777 .filter(|s| !s.trim().is_empty())
778 .ok_or_else(|| {
779 CamelError::RouteError(
780 "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
781 .to_string(),
782 )
783 })?;
784
785 let steps = canonicalize_steps(self.steps)?;
786 let circuit_breaker = self
787 .circuit_breaker_config
788 .map(canonicalize_circuit_breaker);
789
790 if self.security_policy_config.is_some() {
791 return Err(CamelError::RouteError(
792 "routes with security_policy cannot use the canonical/hot-reload path (not yet supported)"
793 .into(),
794 ));
795 }
796
797 let spec = CanonicalRouteSpec {
798 route_id,
799 from: self.from_uri,
800 steps,
801 circuit_breaker,
802 auto_startup: None,
803 startup_order: None,
804 concurrency: None,
805 version: camel_api::CANONICAL_CONTRACT_VERSION,
806 };
807 spec.validate_contract()?;
808 Ok(spec)
809 }
810}
811
812pub struct OnExceptionBuilder {
813 parent: RouteBuilder,
814 policy: OnExceptionSpec,
815}
816
817impl OnExceptionBuilder {
818 pub fn retry(mut self, max_attempts: u32) -> Self {
819 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
820 self
821 }
822
823 pub fn with_backoff(
824 mut self,
825 initial: std::time::Duration,
826 multiplier: f64,
827 max: std::time::Duration,
828 ) -> Self {
829 if let Some(ref mut retry) = self.policy.retry {
830 retry.initial_delay = initial;
831 retry.multiplier = multiplier;
832 retry.max_delay = max;
833 } else {
834 tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
835 }
836 self
837 }
838
839 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
840 if let Some(ref mut retry) = self.policy.retry {
841 retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
842 } else {
843 tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
844 }
845 self
846 }
847
848 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
849 self.policy.handled_by = Some(uri.into());
850 self
851 }
852
853 pub fn end_on_exception(mut self) -> RouteBuilder {
854 if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
855 specs.push(self.policy);
856 }
857 self.parent
858 }
859}
860
861fn validate_uri(uri: &str) -> Result<(), CamelError> {
863 let trimmed = uri.trim();
864 if trimmed.is_empty() {
865 return Err(CamelError::RouteError(
866 "route must have a 'from' URI".to_string(),
867 ));
868 }
869 if !trimmed.contains(':') {
870 return Err(CamelError::RouteError(
871 "URI must have a scheme (e.g. 'timer:tick')".to_string(),
872 ));
873 }
874 let scheme = trimmed.split(':').next().unwrap_or("");
875 if scheme.trim().is_empty() {
876 return Err(CamelError::RouteError(
877 "URI scheme must not be empty".to_string(),
878 ));
879 }
880 Ok(())
881}
882
883fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
884 let mut canonical = Vec::with_capacity(steps.len());
885 for step in steps {
886 canonical.push(canonicalize_step(step)?);
887 }
888 Ok(canonical)
889}
890
891fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
892 match step {
893 BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
894 BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
895 BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
896 BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
897 BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
898 delay_ms: config.delay_ms,
899 dynamic_header: config.dynamic_header,
900 }),
901 BuilderStep::DeclarativeScript { expression } => {
902 Ok(CanonicalStepSpec::Script { expression })
903 }
904 BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
905 predicate,
906 steps: canonicalize_steps(steps)?,
907 }),
908 BuilderStep::DeclarativeChoice { whens, otherwise } => {
909 let mut canonical_whens = Vec::with_capacity(whens.len());
910 for DeclarativeWhenStep { predicate, steps } in whens {
911 canonical_whens.push(CanonicalWhenSpec {
912 predicate,
913 steps: canonicalize_steps(steps)?,
914 });
915 }
916 let otherwise = match otherwise {
917 Some(steps) => Some(canonicalize_steps(steps)?),
918 None => None,
919 };
920 Ok(CanonicalStepSpec::Choice {
921 whens: canonical_whens,
922 otherwise,
923 })
924 }
925 BuilderStep::DeclarativeSplit {
926 expression,
927 aggregation,
928 parallel,
929 parallel_limit,
930 stop_on_exception,
931 steps,
932 } => Ok(CanonicalStepSpec::Split {
933 expression: CanonicalSplitExpressionSpec::Language(expression),
934 aggregation: canonicalize_split_aggregation(aggregation)?,
935 parallel,
936 parallel_limit,
937 stop_on_exception,
938 steps: canonicalize_steps(steps)?,
939 }),
940 BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate(
941 canonicalize_aggregate(config)?,
942 )),
943 other => {
944 let step_name = canonical_step_name(&other);
945 let detail = camel_api::canonical_contract_rejection_reason(step_name)
946 .unwrap_or("not included in canonical v1");
947 Err(CamelError::RouteError(format!(
948 "canonical v1 does not support step `{step_name}`: {detail}"
949 )))
950 }
951 }
952}
953
954fn canonicalize_split_aggregation(
955 strategy: camel_api::splitter::AggregationStrategy,
956) -> Result<CanonicalSplitAggregationSpec, CamelError> {
957 match strategy {
958 camel_api::splitter::AggregationStrategy::LastWins => {
959 Ok(CanonicalSplitAggregationSpec::LastWins)
960 }
961 camel_api::splitter::AggregationStrategy::CollectAll => {
962 Ok(CanonicalSplitAggregationSpec::CollectAll)
963 }
964 camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
965 "canonical v1 does not support custom split aggregation".to_string(),
966 )),
967 camel_api::splitter::AggregationStrategy::Original => {
968 Ok(CanonicalSplitAggregationSpec::Original)
969 }
970 }
971}
972
973fn extract_completion_fields(
974 mode: &CompletionMode,
975) -> Result<(Option<usize>, Option<u64>), CamelError> {
976 match mode {
977 CompletionMode::Single(cond) => match cond {
978 CompletionCondition::Size(n) => Ok((Some(*n), None)),
979 CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
980 CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
981 "canonical v1 does not support aggregate predicate completion".to_string(),
982 )),
983 },
984 CompletionMode::Any(conds) => {
985 let mut size = None;
986 let mut timeout_ms = None;
987 for cond in conds {
988 match cond {
989 CompletionCondition::Size(n) => size = Some(*n),
990 CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
991 CompletionCondition::Predicate(_) => {
992 return Err(CamelError::RouteError(
993 "canonical v1 does not support aggregate predicate completion"
994 .to_string(),
995 ));
996 }
997 }
998 }
999 Ok((size, timeout_ms))
1000 }
1001 }
1002}
1003
1004fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
1005 let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
1006
1007 let header = match &config.correlation {
1008 CorrelationStrategy::HeaderName(h) => h.clone(),
1009 CorrelationStrategy::Expression { expr, .. } => expr.clone(),
1010 CorrelationStrategy::Fn(_) => {
1011 return Err(CamelError::RouteError(
1012 "canonical v1 does not support Fn correlation strategy".to_string(),
1013 ));
1014 }
1015 };
1016
1017 let correlation_key = match &config.correlation {
1018 CorrelationStrategy::HeaderName(_) => None,
1019 CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
1020 CorrelationStrategy::Fn(_) => unreachable!(),
1021 };
1022
1023 let strategy = match config.strategy {
1024 AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
1025 AggregationStrategy::Custom(_) => {
1026 return Err(CamelError::RouteError(
1027 "canonical v1 does not support custom aggregate strategy".to_string(),
1028 ));
1029 }
1030 };
1031 let bucket_ttl_ms = config
1032 .bucket_ttl
1033 .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
1034
1035 Ok(CanonicalAggregateSpec {
1036 header,
1037 completion_size,
1038 completion_timeout_ms,
1039 correlation_key,
1040 force_completion_on_stop: if config.force_completion_on_stop {
1041 Some(true)
1042 } else {
1043 None
1044 },
1045 discard_on_timeout: if config.discard_on_timeout {
1046 Some(true)
1047 } else {
1048 None
1049 },
1050 strategy,
1051 max_buckets: config.max_buckets,
1052 bucket_ttl_ms,
1053 })
1054}
1055
1056fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
1057 CanonicalCircuitBreakerSpec {
1058 failure_threshold: config.failure_threshold,
1059 open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
1060 }
1061}
1062
1063fn canonical_step_name(step: &BuilderStep) -> &'static str {
1064 match step {
1065 BuilderStep::Processor(_) => "processor",
1066 BuilderStep::To(_) => "to",
1067 BuilderStep::Stop => "stop",
1068 BuilderStep::Log { .. } => "log",
1069 BuilderStep::DeclarativeSetHeader { .. } => "set_header",
1070 BuilderStep::DeclarativeSetBody { .. } => "set_body",
1071 BuilderStep::DeclarativeFilter { .. } => "filter",
1072 BuilderStep::DeclarativeChoice { .. } => "choice",
1073 BuilderStep::DeclarativeScript { .. } => "script",
1074 BuilderStep::DeclarativeFunction { .. } => "function",
1075 BuilderStep::DeclarativeSplit { .. } => "split",
1076 BuilderStep::Split { .. } => "split",
1077 BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
1078 BuilderStep::Aggregate { .. } => "aggregate",
1079 BuilderStep::Filter { .. } => "filter",
1080 BuilderStep::Choice { .. } => "choice",
1081 BuilderStep::WireTap { .. } => "wire_tap",
1082 BuilderStep::Delay { .. } => "delay",
1083 BuilderStep::Multicast { .. } => "multicast",
1084 BuilderStep::DeclarativeLog { .. } => "log",
1085 BuilderStep::Bean { .. } => "bean",
1086 BuilderStep::Script { .. } => "script",
1087 BuilderStep::Throttle { .. } => "throttle",
1088 BuilderStep::LoadBalance { .. } => "load_balancer",
1089 BuilderStep::DynamicRouter { .. } => "dynamic_router",
1090 BuilderStep::RoutingSlip { .. } => "routing_slip",
1091 BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
1092 BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
1093 BuilderStep::RecipientList { .. } => "recipient_list",
1094 BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
1095 BuilderStep::DeclarativeSetProperty { .. } => "set_property",
1096 BuilderStep::DeclarativeStreamSplit { .. } => "stream_split",
1097 BuilderStep::Enrich { .. } => "enrich",
1098 BuilderStep::PollEnrich { .. } => "poll_enrich",
1099 BuilderStep::Validate { .. } => "validate",
1100 BuilderStep::IdempotentConsumer { .. } => "idempotent_consumer",
1101 BuilderStep::ClaimCheck { .. } => "claim_check",
1102 BuilderStep::Sampling { .. } => "sampling",
1103 BuilderStep::Sort { .. } => "sort",
1104 BuilderStep::DeclarativeDoTry { .. } => "do_try",
1105 BuilderStep::Resequence { .. } => "resequence",
1106 }
1107}
1108
1109impl StepAccumulator for RouteBuilder {
1110 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1111 &mut self.steps
1112 }
1113}
1114
1115pub struct SplitBuilder {
1123 parent: RouteBuilder,
1124 config: SplitterConfig,
1125 steps: Vec<BuilderStep>,
1126}
1127
1128impl SplitBuilder {
1129 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1131 where
1132 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1133 {
1134 FilterInSplitBuilder {
1135 parent: self,
1136 predicate: std::sync::Arc::new(predicate),
1137 steps: vec![],
1138 }
1139 }
1140
1141 pub fn end_split(mut self) -> RouteBuilder {
1144 let split_step = BuilderStep::Split {
1145 config: self.config,
1146 steps: self.steps,
1147 };
1148 self.parent.steps.push(split_step);
1149 self.parent
1150 }
1151}
1152
1153impl StepAccumulator for SplitBuilder {
1154 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1155 &mut self.steps
1156 }
1157}
1158
1159pub struct FilterBuilder {
1161 parent: RouteBuilder,
1162 predicate: FilterPredicate,
1163 steps: Vec<BuilderStep>,
1164}
1165
1166impl FilterBuilder {
1167 pub fn end_filter(mut self) -> RouteBuilder {
1170 let step = BuilderStep::Filter {
1171 predicate: self.predicate,
1172 steps: self.steps,
1173 };
1174 self.parent.steps.push(step);
1175 self.parent
1176 }
1177}
1178
1179impl StepAccumulator for FilterBuilder {
1180 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1181 &mut self.steps
1182 }
1183}
1184
1185pub struct FilterInSplitBuilder {
1187 parent: SplitBuilder,
1188 predicate: FilterPredicate,
1189 steps: Vec<BuilderStep>,
1190}
1191
1192impl FilterInSplitBuilder {
1193 pub fn end_filter(mut self) -> SplitBuilder {
1195 let step = BuilderStep::Filter {
1196 predicate: self.predicate,
1197 steps: self.steps,
1198 };
1199 self.parent.steps.push(step);
1200 self.parent
1201 }
1202}
1203
1204impl StepAccumulator for FilterInSplitBuilder {
1205 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1206 &mut self.steps
1207 }
1208}
1209
1210pub struct ChoiceBuilder {
1217 parent: RouteBuilder,
1218 whens: Vec<WhenStep>,
1219 _otherwise: Option<Vec<BuilderStep>>,
1220}
1221
1222impl ChoiceBuilder {
1223 pub fn when<F>(self, predicate: F) -> WhenBuilder
1226 where
1227 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1228 {
1229 WhenBuilder {
1230 parent: self,
1231 predicate: std::sync::Arc::new(predicate),
1232 steps: vec![],
1233 }
1234 }
1235
1236 pub fn otherwise(self) -> OtherwiseBuilder {
1240 OtherwiseBuilder {
1241 parent: self,
1242 steps: vec![],
1243 }
1244 }
1245
1246 pub fn end_choice(mut self) -> RouteBuilder {
1250 let step = BuilderStep::Choice {
1251 whens: self.whens,
1252 otherwise: self._otherwise,
1253 };
1254 self.parent.steps.push(step);
1255 self.parent
1256 }
1257}
1258
1259pub struct WhenBuilder {
1261 parent: ChoiceBuilder,
1262 predicate: camel_api::FilterPredicate,
1263 steps: Vec<BuilderStep>,
1264}
1265
1266impl WhenBuilder {
1267 pub fn end_when(mut self) -> ChoiceBuilder {
1270 self.parent.whens.push(WhenStep {
1271 predicate: self.predicate,
1272 steps: self.steps,
1273 });
1274 self.parent
1275 }
1276}
1277
1278impl StepAccumulator for WhenBuilder {
1279 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1280 &mut self.steps
1281 }
1282}
1283
1284pub struct OtherwiseBuilder {
1286 parent: ChoiceBuilder,
1287 steps: Vec<BuilderStep>,
1288}
1289
1290impl OtherwiseBuilder {
1291 pub fn end_otherwise(self) -> ChoiceBuilder {
1293 let OtherwiseBuilder { mut parent, steps } = self;
1294 parent._otherwise = Some(steps);
1295 parent
1296 }
1297}
1298
1299impl StepAccumulator for OtherwiseBuilder {
1300 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1301 &mut self.steps
1302 }
1303}
1304
1305pub struct MulticastBuilder {
1313 parent: RouteBuilder,
1314 steps: Vec<BuilderStep>,
1315 config: MulticastConfig,
1316}
1317
1318impl MulticastBuilder {
1319 pub fn parallel(mut self, parallel: bool) -> Self {
1320 self.config = self.config.parallel(parallel);
1321 self
1322 }
1323
1324 pub fn parallel_limit(mut self, limit: usize) -> Self {
1325 self.config = self.config.parallel_limit(limit);
1326 self
1327 }
1328
1329 pub fn stop_on_exception(mut self, stop: bool) -> Self {
1330 self.config = self.config.stop_on_exception(stop);
1331 self
1332 }
1333
1334 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1335 self.config = self.config.timeout(duration);
1336 self
1337 }
1338
1339 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1340 self.config = self.config.aggregation(strategy);
1341 self
1342 }
1343
1344 pub fn end_multicast(mut self) -> RouteBuilder {
1345 let step = BuilderStep::Multicast {
1346 steps: self.steps,
1347 config: self.config,
1348 };
1349 self.parent.steps.push(step);
1350 self.parent
1351 }
1352}
1353
1354impl StepAccumulator for MulticastBuilder {
1355 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1356 &mut self.steps
1357 }
1358}
1359
1360pub struct ThrottleBuilder {
1368 parent: RouteBuilder,
1369 config: ThrottlerConfig,
1370 steps: Vec<BuilderStep>,
1371}
1372
1373impl ThrottleBuilder {
1374 pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1380 self.config = self.config.strategy(strategy);
1381 self
1382 }
1383
1384 pub fn end_throttle(mut self) -> RouteBuilder {
1387 let step = BuilderStep::Throttle {
1388 config: self.config,
1389 steps: self.steps,
1390 };
1391 self.parent.steps.push(step);
1392 self.parent
1393 }
1394}
1395
1396impl StepAccumulator for ThrottleBuilder {
1397 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1398 &mut self.steps
1399 }
1400}
1401
1402pub struct LoopBuilder {
1404 parent: RouteBuilder,
1405 config: LoopConfig,
1406 steps: Vec<BuilderStep>,
1407}
1408
1409impl LoopBuilder {
1410 pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1411 LoopInLoopBuilder {
1412 parent: self,
1413 config: LoopConfig {
1414 mode: LoopMode::Count(count),
1415 },
1416 steps: vec![],
1417 }
1418 }
1419
1420 pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1421 where
1422 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1423 {
1424 LoopInLoopBuilder {
1425 parent: self,
1426 config: LoopConfig {
1427 mode: LoopMode::While(std::sync::Arc::new(predicate)),
1428 },
1429 steps: vec![],
1430 }
1431 }
1432
1433 pub fn end_loop(mut self) -> RouteBuilder {
1434 let step = BuilderStep::Loop {
1435 config: self.config,
1436 steps: self.steps,
1437 };
1438 self.parent.steps.push(step);
1439 self.parent
1440 }
1441}
1442
1443impl StepAccumulator for LoopBuilder {
1444 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1445 &mut self.steps
1446 }
1447}
1448
1449pub struct LoopInLoopBuilder {
1450 parent: LoopBuilder,
1451 config: LoopConfig,
1452 steps: Vec<BuilderStep>,
1453}
1454
1455impl LoopInLoopBuilder {
1456 pub fn end_loop(mut self) -> LoopBuilder {
1457 let step = BuilderStep::Loop {
1458 config: self.config,
1459 steps: self.steps,
1460 };
1461 self.parent.steps.push(step);
1462 self.parent
1463 }
1464}
1465
1466impl StepAccumulator for LoopInLoopBuilder {
1467 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1468 &mut self.steps
1469 }
1470}
1471
1472pub struct LoadBalancerBuilder {
1480 parent: RouteBuilder,
1481 config: LoadBalancerConfig,
1482 steps: Vec<BuilderStep>,
1483}
1484
1485impl LoadBalancerBuilder {
1486 pub fn round_robin(mut self) -> Self {
1488 self.config = LoadBalancerConfig::round_robin();
1489 self
1490 }
1491
1492 pub fn random(mut self) -> Self {
1494 self.config = LoadBalancerConfig::random();
1495 self
1496 }
1497
1498 pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1503 self.config = LoadBalancerConfig::weighted(weights);
1504 self
1505 }
1506
1507 pub fn failover(mut self) -> Self {
1512 self.config = LoadBalancerConfig::failover();
1513 self
1514 }
1515
1516 pub fn end_load_balance(mut self) -> RouteBuilder {
1519 let step = BuilderStep::LoadBalance {
1520 config: self.config,
1521 steps: self.steps,
1522 };
1523 self.parent.steps.push(step);
1524 self.parent
1525 }
1526}
1527
1528impl StepAccumulator for LoadBalancerBuilder {
1529 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1530 &mut self.steps
1531 }
1532}
1533
1534#[cfg(test)]
1539mod tests {
1540 use super::*;
1541 use camel_api::error_handler::ErrorHandlerConfig;
1542 use camel_api::load_balancer::LoadBalanceStrategy;
1543 use camel_api::{Exchange, Message};
1544 use camel_core::route::BuilderStep;
1545 use std::sync::Arc;
1546 use std::time::Duration;
1547 use tower::{Service, ServiceExt};
1548
1549 #[test]
1550 fn test_builder_from_creates_definition() {
1551 let definition = RouteBuilder::from("timer:tick")
1552 .route_id("test-route")
1553 .build()
1554 .unwrap();
1555 assert_eq!(definition.from_uri(), "timer:tick");
1556 }
1557
1558 #[test]
1559 fn test_builder_empty_from_uri_errors() {
1560 let result = RouteBuilder::from("").route_id("test-route").build();
1561 assert!(result.is_err());
1562 }
1563
1564 #[test]
1565 fn test_build_rejects_schemeless_uri() {
1566 let result = RouteBuilder::from("no-scheme-here")
1567 .route_id("test-route")
1568 .build();
1569 match result {
1570 Err(err) => {
1571 let err_msg = format!("{err}");
1572 assert!(
1573 err_msg.contains("scheme"),
1574 "expected scheme-related error, got: {err_msg}"
1575 );
1576 }
1577 Ok(_) => panic!("schemeless URI should fail"),
1578 }
1579 }
1580
1581 #[test]
1582 fn test_build_rejects_empty_scheme_uri() {
1583 let result = RouteBuilder::from(":missing-scheme")
1584 .route_id("test-route")
1585 .build();
1586 match result {
1587 Err(err) => {
1588 let err_msg = format!("{err}");
1589 assert!(
1590 err_msg.contains("scheme"),
1591 "expected scheme-related error, got: {err_msg}"
1592 );
1593 }
1594 Ok(_) => panic!("empty-scheme URI should fail"),
1595 }
1596 }
1597
1598 #[test]
1599 fn test_build_accepts_valid_uri() {
1600 let result = RouteBuilder::from("timer:tick")
1601 .route_id("test-route")
1602 .build();
1603 assert!(result.is_ok());
1604 }
1605
1606 #[test]
1607 fn test_build_canonical_rejects_schemeless_uri() {
1608 let result = RouteBuilder::from("no-scheme-here")
1609 .route_id("test-route")
1610 .build_canonical();
1611 assert!(result.is_err());
1612 }
1613
1614 #[test]
1615 fn test_builder_to_adds_step() {
1616 let definition = RouteBuilder::from("timer:tick")
1617 .route_id("test-route")
1618 .to("log:info")
1619 .build()
1620 .unwrap();
1621
1622 assert_eq!(definition.from_uri(), "timer:tick");
1623 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1625 }
1626
1627 #[test]
1628 fn test_builder_filter_adds_filter_step() {
1629 let definition = RouteBuilder::from("timer:tick")
1630 .route_id("test-route")
1631 .filter(|_ex| true)
1632 .to("mock:result")
1633 .end_filter()
1634 .build()
1635 .unwrap();
1636
1637 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1638 }
1639
1640 #[test]
1641 fn test_builder_set_header_adds_processor_step() {
1642 let definition = RouteBuilder::from("timer:tick")
1643 .route_id("test-route")
1644 .set_header("key", Value::String("value".into()))
1645 .build()
1646 .unwrap();
1647
1648 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1649 }
1650
1651 #[test]
1652 fn test_builder_map_body_adds_processor_step() {
1653 let definition = RouteBuilder::from("timer:tick")
1654 .route_id("test-route")
1655 .map_body(|body| body)
1656 .build()
1657 .unwrap();
1658
1659 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1660 }
1661
1662 #[test]
1663 fn test_builder_process_adds_processor_step() {
1664 let definition = RouteBuilder::from("timer:tick")
1665 .route_id("test-route")
1666 .process(|ex| async move { Ok(ex) })
1667 .build()
1668 .unwrap();
1669
1670 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1671 }
1672
1673 #[test]
1674 fn test_builder_chain_multiple_steps() {
1675 let definition = RouteBuilder::from("timer:tick")
1676 .route_id("test-route")
1677 .set_header("source", Value::String("timer".into()))
1678 .filter(|ex| ex.input.header("source").is_some())
1679 .to("log:info")
1680 .end_filter()
1681 .to("mock:result")
1682 .build()
1683 .unwrap();
1684
1685 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1689 }
1690
1691 #[test]
1692 fn test_loop_count_builder() {
1693 use camel_api::loop_eip::LoopMode;
1694
1695 let def = RouteBuilder::from("direct:start")
1696 .route_id("loop-test")
1697 .loop_count(3)
1698 .to("mock:inside")
1699 .end_loop()
1700 .to("mock:after")
1701 .build()
1702 .unwrap();
1703
1704 assert_eq!(def.steps().len(), 2);
1705 match &def.steps()[0] {
1706 BuilderStep::Loop { config, steps } => {
1707 assert!(matches!(config.mode, LoopMode::Count(3)));
1708 assert_eq!(steps.len(), 1);
1709 }
1710 other => panic!("Expected Loop, got {:?}", other),
1711 }
1712 assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1713 }
1714
1715 #[test]
1716 fn test_loop_while_builder() {
1717 use camel_api::loop_eip::LoopMode;
1718
1719 let def = RouteBuilder::from("direct:start")
1720 .route_id("loop-while-test")
1721 .loop_while(|_ex| true)
1722 .to("mock:retry")
1723 .end_loop()
1724 .build()
1725 .unwrap();
1726
1727 assert_eq!(def.steps().len(), 1);
1728 match &def.steps()[0] {
1729 BuilderStep::Loop { config, steps } => {
1730 assert!(matches!(config.mode, LoopMode::While(_)));
1731 assert_eq!(steps.len(), 1);
1732 }
1733 other => panic!("Expected Loop, got {:?}", other),
1734 }
1735 }
1736
1737 #[test]
1738 fn test_nested_loop_builder() {
1739 use camel_api::loop_eip::LoopMode;
1740
1741 let def = RouteBuilder::from("direct:start")
1742 .route_id("nested-loop-test")
1743 .loop_count(2)
1744 .to("mock:outer")
1745 .loop_count(3)
1746 .to("mock:inner")
1747 .end_loop()
1748 .end_loop()
1749 .to("mock:after")
1750 .build()
1751 .unwrap();
1752
1753 assert_eq!(def.steps().len(), 2);
1754 match &def.steps()[0] {
1755 BuilderStep::Loop { steps, .. } => {
1756 assert_eq!(steps.len(), 2);
1757 match &steps[1] {
1758 BuilderStep::Loop {
1759 config,
1760 steps: inner_steps,
1761 } => {
1762 assert!(matches!(config.mode, LoopMode::Count(3)));
1763 assert_eq!(inner_steps.len(), 1);
1764 }
1765 other => panic!("Expected nested Loop, got {:?}", other),
1766 }
1767 }
1768 other => panic!("Expected outer Loop, got {:?}", other),
1769 }
1770 }
1771
1772 #[tokio::test]
1777 async fn test_set_header_processor_works() {
1778 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1779 let exchange = Exchange::new(Message::new("test"));
1780 let result = svc.call(exchange).await.unwrap();
1781 assert_eq!(
1782 result.input.header("greeting"),
1783 Some(&Value::String("hello".into()))
1784 );
1785 }
1786
1787 #[tokio::test]
1788 async fn test_filter_processor_passes() {
1789 use camel_api::BoxProcessorExt;
1790 use camel_processor::FilterService;
1791
1792 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1793 let mut svc =
1794 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1795 let exchange = Exchange::new(Message::new("pass"));
1796 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1797 assert_eq!(result.input.body.as_text(), Some("pass"));
1798 }
1799
1800 #[tokio::test]
1801 async fn test_filter_processor_blocks() {
1802 use camel_api::BoxProcessorExt;
1803 use camel_processor::FilterService;
1804
1805 let sub = BoxProcessor::from_fn(|_ex| {
1806 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1807 });
1808 let mut svc =
1809 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1810 let exchange = Exchange::new(Message::new("reject"));
1811 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1812 assert_eq!(result.input.body.as_text(), Some("reject"));
1813 }
1814
1815 #[tokio::test]
1816 async fn test_map_body_processor_works() {
1817 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1818 if let Some(text) = body.as_text() {
1819 Body::Text(text.to_uppercase())
1820 } else {
1821 body
1822 }
1823 });
1824 let exchange = Exchange::new(Message::new("hello"));
1825 let result = mapper.oneshot(exchange).await.unwrap();
1826 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1827 }
1828
1829 #[tokio::test]
1830 async fn test_process_custom_processor_works() {
1831 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1832 ex.set_property("custom", Value::Bool(true));
1833 Ok(ex)
1834 });
1835 let exchange = Exchange::new(Message::default());
1836 let result = processor.oneshot(exchange).await.unwrap();
1837 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1838 }
1839
1840 #[tokio::test]
1845 async fn test_compose_pipeline_runs_steps_in_order() {
1846 use camel_core::route::{CompiledStep, compose_pipeline};
1847
1848 let processors = vec![
1849 CompiledStep::Process {
1850 processor: BoxProcessor::new(SetHeader::new(
1851 IdentityProcessor,
1852 "step",
1853 Value::String("one".into()),
1854 )),
1855 body_contract: None,
1856 lifecycle: None,
1857 },
1858 CompiledStep::Process {
1859 processor: BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1860 if let Some(text) = body.as_text() {
1861 Body::Text(format!("{}-processed", text))
1862 } else {
1863 body
1864 }
1865 })),
1866 body_contract: None,
1867 lifecycle: None,
1868 },
1869 ];
1870
1871 let pipeline = compose_pipeline(processors);
1872 let exchange = Exchange::new(Message::new("hello"));
1873 let result = pipeline.oneshot(exchange).await.unwrap();
1874
1875 assert_eq!(
1876 result.input.header("step"),
1877 Some(&Value::String("one".into()))
1878 );
1879 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1880 }
1881
1882 #[tokio::test]
1883 async fn test_compose_pipeline_empty_is_identity() {
1884 use camel_core::route::compose_pipeline;
1885
1886 let pipeline = compose_pipeline(vec![]);
1887 let exchange = Exchange::new(Message::new("unchanged"));
1888 let result = pipeline.oneshot(exchange).await.unwrap();
1889 assert_eq!(result.input.body.as_text(), Some("unchanged"));
1890 }
1891
1892 #[test]
1897 fn test_builder_circuit_breaker_sets_config() {
1898 use camel_api::circuit_breaker::CircuitBreakerConfig;
1899
1900 let config = CircuitBreakerConfig::new().failure_threshold(5);
1901 let definition = RouteBuilder::from("timer:tick")
1902 .route_id("test-route")
1903 .circuit_breaker(config)
1904 .build()
1905 .unwrap();
1906
1907 let cb = definition
1908 .circuit_breaker_config()
1909 .expect("circuit breaker should be set");
1910 assert_eq!(cb.failure_threshold, 5);
1911 }
1912
1913 #[test]
1914 fn test_builder_circuit_breaker_with_error_handler() {
1915 use camel_api::circuit_breaker::CircuitBreakerConfig;
1916 use camel_api::error_handler::ErrorHandlerConfig;
1917
1918 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1919 let eh_config = ErrorHandlerConfig::log_only();
1920
1921 let definition = RouteBuilder::from("timer:tick")
1922 .route_id("test-route")
1923 .to("log:info")
1924 .circuit_breaker(cb_config)
1925 .error_handler(eh_config)
1926 .build()
1927 .unwrap();
1928
1929 assert!(
1930 definition.circuit_breaker_config().is_some(),
1931 "circuit breaker config should be set"
1932 );
1933 }
1935
1936 #[test]
1937 fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1938 let definition = RouteBuilder::from("direct:start")
1939 .route_id("test-route")
1940 .dead_letter_channel("log:dlc")
1941 .on_exception(|e| matches!(e, CamelError::Io(_)))
1942 .retry(3)
1943 .handled_by("log:io")
1944 .end_on_exception()
1945 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1946 .retry(1)
1947 .end_on_exception()
1948 .to("mock:out")
1949 .build()
1950 .expect("route should build");
1951
1952 let cfg = definition
1953 .error_handler_config()
1954 .expect("error handler should be set");
1955 assert_eq!(cfg.policies.len(), 2);
1956 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1957 assert_eq!(
1958 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1959 Some(3)
1960 );
1961 assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1962 assert_eq!(
1963 cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1964 Some(1)
1965 );
1966 }
1967
1968 #[test]
1969 fn test_builder_on_exception_mixed_mode_rejected() {
1970 let result = RouteBuilder::from("direct:start")
1971 .route_id("test-route")
1972 .error_handler(ErrorHandlerConfig::log_only())
1973 .on_exception(|_e| true)
1974 .end_on_exception()
1975 .to("mock:out")
1976 .build();
1977
1978 let err = result.err().expect("mixed mode should fail with an error");
1979
1980 assert!(
1981 format!("{err}").contains("mixed error handler modes"),
1982 "unexpected error: {err}"
1983 );
1984 }
1985
1986 #[test]
1987 fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1988 let definition = RouteBuilder::from("direct:start")
1989 .route_id("test-route")
1990 .on_exception(|_e| true)
1991 .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1992 .with_jitter(0.5)
1993 .end_on_exception()
1994 .to("mock:out")
1995 .build()
1996 .expect("route should build");
1997
1998 let cfg = definition
1999 .error_handler_config()
2000 .expect("error handler should be set");
2001 assert_eq!(cfg.policies.len(), 1);
2002 assert!(cfg.policies[0].retry.is_none());
2003 }
2004
2005 #[test]
2006 fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
2007 let definition = RouteBuilder::from("direct:start")
2008 .route_id("test-route")
2009 .dead_letter_channel("log:dlc")
2010 .to("mock:out")
2011 .build()
2012 .expect("route should build");
2013
2014 let cfg = definition
2015 .error_handler_config()
2016 .expect("error handler should be set");
2017 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
2018 assert!(cfg.policies.is_empty());
2019 }
2020
2021 #[test]
2022 fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
2023 let definition = RouteBuilder::from("direct:start")
2024 .route_id("test-route")
2025 .dead_letter_channel("log:first")
2026 .on_exception(|e| matches!(e, CamelError::Io(_)))
2027 .retry(2)
2028 .end_on_exception()
2029 .dead_letter_channel("log:second")
2030 .to("mock:out")
2031 .build()
2032 .expect("route should build");
2033
2034 let cfg = definition
2035 .error_handler_config()
2036 .expect("error handler should be set");
2037 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2038 assert_eq!(cfg.policies.len(), 1);
2039 assert_eq!(
2040 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
2041 Some(2)
2042 );
2043 }
2044
2045 #[test]
2046 fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
2047 let definition = RouteBuilder::from("direct:start")
2048 .route_id("test-route")
2049 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
2050 .retry(1)
2051 .end_on_exception()
2052 .to("mock:out")
2053 .build()
2054 .expect("route should build");
2055
2056 let cfg = definition
2057 .error_handler_config()
2058 .expect("error handler should be set");
2059 assert!(cfg.dlc_uri.is_none());
2060 assert_eq!(cfg.policies.len(), 1);
2061 }
2062
2063 #[test]
2064 fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
2065 let first = ErrorHandlerConfig::dead_letter_channel("log:first");
2066 let second = ErrorHandlerConfig::dead_letter_channel("log:second");
2067
2068 let definition = RouteBuilder::from("direct:start")
2069 .route_id("test-route")
2070 .error_handler(first)
2071 .error_handler(second)
2072 .to("mock:out")
2073 .build()
2074 .expect("route should build");
2075
2076 let cfg = definition
2077 .error_handler_config()
2078 .expect("error handler should be set");
2079 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2080 }
2081
2082 #[test]
2085 fn test_split_builder_typestate() {
2086 use camel_api::splitter::{SplitterConfig, split_body_lines};
2087
2088 let definition = RouteBuilder::from("timer:test?period=1000")
2090 .route_id("test-route")
2091 .split(SplitterConfig::new(split_body_lines()))
2092 .to("mock:per-fragment")
2093 .end_split()
2094 .to("mock:final")
2095 .build()
2096 .unwrap();
2097
2098 assert_eq!(definition.steps().len(), 2);
2100 }
2101
2102 #[test]
2103 fn test_split_builder_steps_collected() {
2104 use camel_api::splitter::{SplitterConfig, split_body_lines};
2105
2106 let definition = RouteBuilder::from("timer:test?period=1000")
2107 .route_id("test-route")
2108 .split(SplitterConfig::new(split_body_lines()))
2109 .set_header("fragment", Value::String("yes".into()))
2110 .to("mock:per-fragment")
2111 .end_split()
2112 .build()
2113 .unwrap();
2114
2115 assert_eq!(definition.steps().len(), 1);
2117 match &definition.steps()[0] {
2118 BuilderStep::Split { steps, .. } => {
2119 assert_eq!(steps.len(), 2); }
2121 other => panic!("Expected Split, got {:?}", other),
2122 }
2123 }
2124
2125 #[test]
2126 fn test_split_builder_config_propagated() {
2127 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
2128
2129 let definition = RouteBuilder::from("timer:test?period=1000")
2130 .route_id("test-route")
2131 .split(
2132 SplitterConfig::new(split_body_lines())
2133 .parallel(true)
2134 .parallel_limit(4)
2135 .aggregation(AggregationStrategy::CollectAll),
2136 )
2137 .to("mock:per-fragment")
2138 .end_split()
2139 .build()
2140 .unwrap();
2141
2142 match &definition.steps()[0] {
2143 BuilderStep::Split { config, .. } => {
2144 assert!(config.parallel);
2145 assert_eq!(config.parallel_limit, Some(4));
2146 assert!(matches!(
2147 config.aggregation,
2148 AggregationStrategy::CollectAll
2149 ));
2150 }
2151 other => panic!("Expected Split, got {:?}", other),
2152 }
2153 }
2154
2155 #[test]
2156 fn test_aggregate_builder_adds_step() {
2157 use camel_api::aggregator::AggregatorConfig;
2158 use camel_core::route::BuilderStep;
2159
2160 let definition = RouteBuilder::from("timer:tick")
2161 .route_id("test-route")
2162 .aggregate(
2163 AggregatorConfig::correlate_by("key")
2164 .complete_when_size(2)
2165 .build()
2166 .unwrap(),
2167 )
2168 .build()
2169 .unwrap();
2170
2171 assert_eq!(definition.steps().len(), 1);
2172 assert!(matches!(
2173 definition.steps()[0],
2174 BuilderStep::Aggregate { .. }
2175 ));
2176 }
2177
2178 #[test]
2179 fn test_aggregate_in_split_builder() {
2180 use camel_api::aggregator::AggregatorConfig;
2181 use camel_api::splitter::{SplitterConfig, split_body_lines};
2182 use camel_core::route::BuilderStep;
2183
2184 let definition = RouteBuilder::from("timer:tick")
2185 .route_id("test-route")
2186 .split(SplitterConfig::new(split_body_lines()))
2187 .aggregate(
2188 AggregatorConfig::correlate_by("key")
2189 .complete_when_size(1)
2190 .build()
2191 .unwrap(),
2192 )
2193 .end_split()
2194 .build()
2195 .unwrap();
2196
2197 assert_eq!(definition.steps().len(), 1);
2198 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2199 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2200 } else {
2201 panic!("expected Split step");
2202 }
2203 }
2204
2205 #[test]
2208 fn test_builder_set_body_static_adds_processor() {
2209 let definition = RouteBuilder::from("timer:tick")
2210 .route_id("test-route")
2211 .set_body("fixed")
2212 .build()
2213 .unwrap();
2214 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2215 }
2216
2217 #[test]
2218 fn test_builder_set_body_fn_adds_processor() {
2219 let definition = RouteBuilder::from("timer:tick")
2220 .route_id("test-route")
2221 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2222 .build()
2223 .unwrap();
2224 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2225 }
2226
2227 #[test]
2228 fn transform_alias_produces_same_as_set_body() {
2229 let route_transform = RouteBuilder::from("timer:tick")
2230 .route_id("test-route")
2231 .transform("hello")
2232 .build()
2233 .unwrap();
2234
2235 let route_set_body = RouteBuilder::from("timer:tick")
2236 .route_id("test-route")
2237 .set_body("hello")
2238 .build()
2239 .unwrap();
2240
2241 assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2242 }
2243
2244 #[test]
2245 fn test_builder_set_header_fn_adds_processor() {
2246 let definition = RouteBuilder::from("timer:tick")
2247 .route_id("test-route")
2248 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2249 .build()
2250 .unwrap();
2251 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2252 }
2253
2254 #[tokio::test]
2255 async fn test_set_body_static_processor_works() {
2256 use camel_core::route::{CompiledStep, compose_pipeline};
2257 let def = RouteBuilder::from("t:t")
2258 .route_id("test-route")
2259 .set_body("replaced")
2260 .build()
2261 .unwrap();
2262 let pipeline = compose_pipeline(
2263 def.steps()
2264 .iter()
2265 .filter_map(|s| {
2266 if let BuilderStep::Processor(p) = s {
2267 Some(p.clone())
2268 } else {
2269 None
2270 }
2271 })
2272 .map(|p| CompiledStep::Process {
2273 processor: p,
2274 body_contract: None,
2275 lifecycle: None,
2276 })
2277 .collect(),
2278 );
2279 let exchange = Exchange::new(Message::new("original"));
2280 let result = pipeline.oneshot(exchange).await.unwrap();
2281 assert_eq!(result.input.body.as_text(), Some("replaced"));
2282 }
2283
2284 #[tokio::test]
2285 async fn test_set_body_fn_processor_works() {
2286 use camel_core::route::{CompiledStep, compose_pipeline};
2287 let def = RouteBuilder::from("t:t")
2288 .route_id("test-route")
2289 .set_body_fn(|ex: &Exchange| {
2290 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2291 })
2292 .build()
2293 .unwrap();
2294 let pipeline = compose_pipeline(
2295 def.steps()
2296 .iter()
2297 .filter_map(|s| {
2298 if let BuilderStep::Processor(p) = s {
2299 Some(p.clone())
2300 } else {
2301 None
2302 }
2303 })
2304 .map(|p| CompiledStep::Process {
2305 processor: p,
2306 body_contract: None,
2307 lifecycle: None,
2308 })
2309 .collect(),
2310 );
2311 let exchange = Exchange::new(Message::new("hello"));
2312 let result = pipeline.oneshot(exchange).await.unwrap();
2313 assert_eq!(result.input.body.as_text(), Some("HELLO"));
2314 }
2315
2316 #[tokio::test]
2317 async fn test_set_header_fn_processor_works() {
2318 use camel_core::route::{CompiledStep, compose_pipeline};
2319 let def = RouteBuilder::from("t:t")
2320 .route_id("test-route")
2321 .set_header_fn("echo", |ex: &Exchange| {
2322 ex.input
2323 .body
2324 .as_text()
2325 .map(|t| Value::String(t.into()))
2326 .unwrap_or(Value::Null)
2327 })
2328 .build()
2329 .unwrap();
2330 let pipeline = compose_pipeline(
2331 def.steps()
2332 .iter()
2333 .filter_map(|s| {
2334 if let BuilderStep::Processor(p) = s {
2335 Some(p.clone())
2336 } else {
2337 None
2338 }
2339 })
2340 .map(|p| CompiledStep::Process {
2341 processor: p,
2342 body_contract: None,
2343 lifecycle: None,
2344 })
2345 .collect(),
2346 );
2347 let exchange = Exchange::new(Message::new("ping"));
2348 let result = pipeline.oneshot(exchange).await.unwrap();
2349 assert_eq!(
2350 result.input.header("echo"),
2351 Some(&Value::String("ping".into()))
2352 );
2353 }
2354
2355 #[test]
2358 fn test_filter_builder_typestate() {
2359 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2360 .route_id("test-route")
2361 .filter(|_ex| true)
2362 .to("mock:inner")
2363 .end_filter()
2364 .to("mock:outer")
2365 .build();
2366 assert!(result.is_ok());
2367 }
2368
2369 #[test]
2370 fn test_filter_builder_steps_collected() {
2371 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2372 .route_id("test-route")
2373 .filter(|_ex| true)
2374 .to("mock:inner")
2375 .end_filter()
2376 .build()
2377 .unwrap();
2378
2379 assert_eq!(definition.steps().len(), 1);
2380 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2381 }
2382
2383 #[test]
2384 fn test_wire_tap_builder_adds_step() {
2385 let definition = RouteBuilder::from("timer:tick")
2386 .route_id("test-route")
2387 .wire_tap("mock:tap")
2388 .to("mock:result")
2389 .build()
2390 .unwrap();
2391
2392 assert_eq!(definition.steps().len(), 2);
2393 assert!(
2394 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2395 );
2396 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2397 }
2398
2399 #[test]
2402 fn test_multicast_builder_typestate() {
2403 let definition = RouteBuilder::from("timer:tick")
2404 .route_id("test-route")
2405 .multicast()
2406 .to("direct:a")
2407 .to("direct:b")
2408 .end_multicast()
2409 .to("mock:result")
2410 .build()
2411 .unwrap();
2412
2413 assert_eq!(definition.steps().len(), 2); }
2415
2416 #[test]
2417 fn test_multicast_builder_steps_collected() {
2418 let definition = RouteBuilder::from("timer:tick")
2419 .route_id("test-route")
2420 .multicast()
2421 .to("direct:a")
2422 .to("direct:b")
2423 .end_multicast()
2424 .build()
2425 .unwrap();
2426
2427 match &definition.steps()[0] {
2428 BuilderStep::Multicast { steps, .. } => {
2429 assert_eq!(steps.len(), 2);
2430 }
2431 other => panic!("Expected Multicast, got {:?}", other),
2432 }
2433 }
2434
2435 #[test]
2438 fn test_builder_concurrent_sets_concurrency() {
2439 use camel_component_api::ConcurrencyModel;
2440
2441 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2442 .route_id("test-route")
2443 .concurrent(16)
2444 .to("log:info")
2445 .build()
2446 .unwrap();
2447
2448 assert_eq!(
2449 definition.concurrency_override(),
2450 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2451 );
2452 }
2453
2454 #[test]
2455 fn test_builder_concurrent_zero_means_unbounded() {
2456 use camel_component_api::ConcurrencyModel;
2457
2458 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2459 .route_id("test-route")
2460 .concurrent(0)
2461 .to("log:info")
2462 .build()
2463 .unwrap();
2464
2465 assert_eq!(
2466 definition.concurrency_override(),
2467 Some(&ConcurrencyModel::Concurrent { max: None })
2468 );
2469 }
2470
2471 #[test]
2472 fn test_builder_sequential_sets_concurrency() {
2473 use camel_component_api::ConcurrencyModel;
2474
2475 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2476 .route_id("test-route")
2477 .sequential()
2478 .to("log:info")
2479 .build()
2480 .unwrap();
2481
2482 assert_eq!(
2483 definition.concurrency_override(),
2484 Some(&ConcurrencyModel::Sequential)
2485 );
2486 }
2487
2488 #[test]
2489 fn test_builder_default_concurrency_is_none() {
2490 let definition = RouteBuilder::from("timer:tick")
2491 .route_id("test-route")
2492 .to("log:info")
2493 .build()
2494 .unwrap();
2495
2496 assert_eq!(definition.concurrency_override(), None);
2497 }
2498
2499 #[test]
2502 fn test_builder_route_id_sets_id() {
2503 let definition = RouteBuilder::from("timer:tick")
2504 .route_id("my-route")
2505 .build()
2506 .unwrap();
2507
2508 assert_eq!(definition.route_id(), "my-route");
2509 }
2510
2511 #[test]
2512 fn test_build_without_route_id_fails() {
2513 let result = RouteBuilder::from("timer:tick?period=1000")
2514 .to("log:info")
2515 .build();
2516 let err = match result {
2517 Err(e) => e.to_string(),
2518 Ok(_) => panic!("build() should fail without route_id"),
2519 };
2520 assert!(
2521 err.contains("route_id"),
2522 "error should mention route_id, got: {}",
2523 err
2524 );
2525 }
2526
2527 #[test]
2528 fn test_builder_empty_route_id_rejected() {
2529 let result = RouteBuilder::from("timer:tick").route_id("").build();
2530 let err = result.err().expect("empty route_id should be rejected");
2531 assert!(matches!(err, CamelError::RouteError(_)));
2532 }
2533
2534 #[test]
2535 fn test_builder_whitespace_route_id_rejected() {
2536 let result = RouteBuilder::from("timer:tick").route_id(" ").build();
2537 assert!(result.is_err());
2538 }
2539
2540 #[test]
2541 fn test_builder_auto_startup_false() {
2542 let definition = RouteBuilder::from("timer:tick")
2543 .route_id("test-route")
2544 .auto_startup(false)
2545 .build()
2546 .unwrap();
2547
2548 assert!(!definition.auto_startup());
2549 }
2550
2551 #[test]
2552 fn test_builder_startup_order_custom() {
2553 let definition = RouteBuilder::from("timer:tick")
2554 .route_id("test-route")
2555 .startup_order(50)
2556 .build()
2557 .unwrap();
2558
2559 assert_eq!(definition.startup_order(), 50);
2560 }
2561
2562 #[test]
2563 fn test_builder_defaults() {
2564 let definition = RouteBuilder::from("timer:tick")
2565 .route_id("test-route")
2566 .build()
2567 .unwrap();
2568
2569 assert_eq!(definition.route_id(), "test-route");
2570 assert!(definition.auto_startup());
2571 assert_eq!(definition.startup_order(), 1000);
2572 }
2573
2574 #[test]
2577 fn test_choice_builder_single_when() {
2578 let definition = RouteBuilder::from("timer:tick")
2579 .route_id("test-route")
2580 .choice()
2581 .when(|ex: &Exchange| ex.input.header("type").is_some())
2582 .to("mock:typed")
2583 .end_when()
2584 .end_choice()
2585 .build()
2586 .unwrap();
2587 assert_eq!(definition.steps().len(), 1);
2588 assert!(
2589 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2590 if whens.len() == 1 && otherwise.is_none())
2591 );
2592 }
2593
2594 #[test]
2595 fn test_choice_builder_when_otherwise() {
2596 let definition = RouteBuilder::from("timer:tick")
2597 .route_id("test-route")
2598 .choice()
2599 .when(|ex: &Exchange| ex.input.header("a").is_some())
2600 .to("mock:a")
2601 .end_when()
2602 .otherwise()
2603 .to("mock:fallback")
2604 .end_otherwise()
2605 .end_choice()
2606 .build()
2607 .unwrap();
2608 assert!(
2609 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2610 if whens.len() == 1 && otherwise.is_some())
2611 );
2612 }
2613
2614 #[test]
2615 fn test_choice_builder_multiple_whens() {
2616 let definition = RouteBuilder::from("timer:tick")
2617 .route_id("test-route")
2618 .choice()
2619 .when(|ex: &Exchange| ex.input.header("a").is_some())
2620 .to("mock:a")
2621 .end_when()
2622 .when(|ex: &Exchange| ex.input.header("b").is_some())
2623 .to("mock:b")
2624 .end_when()
2625 .end_choice()
2626 .build()
2627 .unwrap();
2628 assert!(
2629 matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2630 if whens.len() == 2)
2631 );
2632 }
2633
2634 #[test]
2635 fn test_choice_step_after_choice() {
2636 let definition = RouteBuilder::from("timer:tick")
2638 .route_id("test-route")
2639 .choice()
2640 .when(|_ex: &Exchange| true)
2641 .to("mock:inner")
2642 .end_when()
2643 .end_choice()
2644 .to("mock:outer") .build()
2646 .unwrap();
2647 assert_eq!(definition.steps().len(), 2);
2648 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2649 }
2650
2651 #[test]
2654 fn test_throttle_builder_typestate() {
2655 let definition = RouteBuilder::from("timer:tick")
2656 .route_id("test-route")
2657 .throttle(10, std::time::Duration::from_secs(1))
2658 .to("mock:result")
2659 .end_throttle()
2660 .build()
2661 .unwrap();
2662
2663 assert_eq!(definition.steps().len(), 1);
2664 assert!(matches!(
2665 &definition.steps()[0],
2666 BuilderStep::Throttle { .. }
2667 ));
2668 }
2669
2670 #[test]
2671 fn test_throttle_builder_with_strategy() {
2672 let definition = RouteBuilder::from("timer:tick")
2673 .route_id("test-route")
2674 .throttle(10, std::time::Duration::from_secs(1))
2675 .strategy(ThrottleStrategy::Reject)
2676 .to("mock:result")
2677 .end_throttle()
2678 .build()
2679 .unwrap();
2680
2681 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2682 assert_eq!(config.strategy, ThrottleStrategy::Reject);
2683 } else {
2684 panic!("Expected Throttle step");
2685 }
2686 }
2687
2688 #[test]
2689 fn test_throttle_builder_steps_collected() {
2690 let definition = RouteBuilder::from("timer:tick")
2691 .route_id("test-route")
2692 .throttle(5, std::time::Duration::from_secs(1))
2693 .set_header("throttled", Value::Bool(true))
2694 .to("mock:throttled")
2695 .end_throttle()
2696 .build()
2697 .unwrap();
2698
2699 match &definition.steps()[0] {
2700 BuilderStep::Throttle { steps, .. } => {
2701 assert_eq!(steps.len(), 2); }
2703 other => panic!("Expected Throttle, got {:?}", other),
2704 }
2705 }
2706
2707 #[test]
2708 fn test_throttle_step_after_throttle() {
2709 let definition = RouteBuilder::from("timer:tick")
2711 .route_id("test-route")
2712 .throttle(10, std::time::Duration::from_secs(1))
2713 .to("mock:inner")
2714 .end_throttle()
2715 .to("mock:outer")
2716 .build()
2717 .unwrap();
2718
2719 assert_eq!(definition.steps().len(), 2);
2720 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2721 }
2722
2723 #[test]
2726 fn test_load_balance_builder_typestate() {
2727 let definition = RouteBuilder::from("timer:tick")
2728 .route_id("test-route")
2729 .load_balance()
2730 .round_robin()
2731 .to("mock:a")
2732 .to("mock:b")
2733 .end_load_balance()
2734 .build()
2735 .unwrap();
2736
2737 assert_eq!(definition.steps().len(), 1);
2738 assert!(matches!(
2739 &definition.steps()[0],
2740 BuilderStep::LoadBalance { .. }
2741 ));
2742 }
2743
2744 #[test]
2745 fn test_load_balance_builder_with_strategy() {
2746 let definition = RouteBuilder::from("timer:tick")
2747 .route_id("test-route")
2748 .load_balance()
2749 .random()
2750 .to("mock:result")
2751 .end_load_balance()
2752 .build()
2753 .unwrap();
2754
2755 if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2756 assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2757 } else {
2758 panic!("Expected LoadBalance step");
2759 }
2760 }
2761
2762 #[test]
2763 fn test_load_balance_builder_steps_collected() {
2764 let definition = RouteBuilder::from("timer:tick")
2765 .route_id("test-route")
2766 .load_balance()
2767 .set_header("lb", Value::Bool(true))
2768 .to("mock:a")
2769 .end_load_balance()
2770 .build()
2771 .unwrap();
2772
2773 match &definition.steps()[0] {
2774 BuilderStep::LoadBalance { steps, .. } => {
2775 assert_eq!(steps.len(), 2); }
2777 other => panic!("Expected LoadBalance, got {:?}", other),
2778 }
2779 }
2780
2781 #[test]
2782 fn test_load_balance_step_after_load_balance() {
2783 let definition = RouteBuilder::from("timer:tick")
2785 .route_id("test-route")
2786 .load_balance()
2787 .to("mock:inner")
2788 .end_load_balance()
2789 .to("mock:outer")
2790 .build()
2791 .unwrap();
2792
2793 assert_eq!(definition.steps().len(), 2);
2794 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2795 }
2796
2797 #[test]
2800 fn test_dynamic_router_builder() {
2801 let definition = RouteBuilder::from("timer:tick")
2802 .route_id("test-route")
2803 .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2804 .build()
2805 .unwrap();
2806
2807 assert_eq!(definition.steps().len(), 1);
2808 assert!(matches!(
2809 &definition.steps()[0],
2810 BuilderStep::DynamicRouter { .. }
2811 ));
2812 }
2813
2814 #[test]
2815 fn test_dynamic_router_builder_with_config() {
2816 let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2817 .max_iterations(100)
2818 .cache_size(500);
2819
2820 let definition = RouteBuilder::from("timer:tick")
2821 .route_id("test-route")
2822 .dynamic_router_with_config(config)
2823 .build()
2824 .unwrap();
2825
2826 assert_eq!(definition.steps().len(), 1);
2827 if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2828 assert_eq!(config.max_iterations, 100);
2829 assert_eq!(config.cache_size, 500);
2830 } else {
2831 panic!("Expected DynamicRouter step");
2832 }
2833 }
2834
2835 #[test]
2836 fn test_dynamic_router_step_after_router() {
2837 let definition = RouteBuilder::from("timer:tick")
2839 .route_id("test-route")
2840 .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2841 .to("mock:outer")
2842 .build()
2843 .unwrap();
2844
2845 assert_eq!(definition.steps().len(), 2);
2846 assert!(matches!(
2847 &definition.steps()[0],
2848 BuilderStep::DynamicRouter { .. }
2849 ));
2850 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2851 }
2852
2853 #[test]
2854 fn routing_slip_builder_creates_step() {
2855 use camel_api::RoutingSlipExpression;
2856
2857 let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2858
2859 let route = RouteBuilder::from("direct:start")
2860 .route_id("routing-slip-test")
2861 .routing_slip(expression)
2862 .build()
2863 .unwrap();
2864
2865 assert!(
2866 matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2867 "Expected RoutingSlip step"
2868 );
2869 }
2870
2871 #[test]
2872 fn routing_slip_with_config_builder_creates_step() {
2873 use camel_api::RoutingSlipConfig;
2874
2875 let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2876 .uri_delimiter("|")
2877 .cache_size(50)
2878 .ignore_invalid_endpoints(true);
2879
2880 let route = RouteBuilder::from("direct:start")
2881 .route_id("routing-slip-config-test")
2882 .routing_slip_with_config(config)
2883 .build()
2884 .unwrap();
2885
2886 if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2887 assert_eq!(config.uri_delimiter, "|");
2888 assert_eq!(config.cache_size, 50);
2889 assert!(config.ignore_invalid_endpoints);
2890 } else {
2891 panic!("Expected RoutingSlip step");
2892 }
2893 }
2894
2895 #[test]
2896 fn test_builder_marshal_adds_processor_step() {
2897 let definition = RouteBuilder::from("timer:tick")
2898 .route_id("test-route")
2899 .marshal("json")
2900 .unwrap()
2901 .build()
2902 .unwrap();
2903 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2904 }
2905
2906 #[test]
2907 fn test_builder_unmarshal_adds_processor_step() {
2908 let definition = RouteBuilder::from("timer:tick")
2909 .route_id("test-route")
2910 .unmarshal("json")
2911 .unwrap()
2912 .build()
2913 .unwrap();
2914 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2915 }
2916
2917 #[test]
2918 fn test_builder_stream_cache_adds_processor_step() {
2919 let definition = RouteBuilder::from("timer:tick")
2920 .route_id("test-route")
2921 .stream_cache(1024)
2922 .build()
2923 .unwrap();
2924 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2925 }
2926
2927 #[test]
2928 fn validate_adds_validate_step() {
2929 let def = RouteBuilder::from("direct:in")
2930 .route_id("test")
2931 .validate("schemas/order.xsd")
2932 .build()
2933 .unwrap();
2934 let steps = def.steps();
2935 assert_eq!(steps.len(), 1);
2936 assert!(
2937 matches!(&steps[0], BuilderStep::Validate { predicate } if predicate.language == "simple" && predicate.source == "schemas/order.xsd"),
2938 "got: {:?}",
2939 steps[0]
2940 );
2941 }
2942
2943 #[test]
2944 fn test_builder_marshal_returns_err_for_unknown_format() {
2945 let result = RouteBuilder::from("timer:tick")
2946 .route_id("test-route")
2947 .marshal("csv");
2948 let err = match result {
2949 Err(e) => e,
2950 Ok(_) => panic!("marshal with unknown format should return Err"),
2951 };
2952 let msg = err.to_string();
2953 assert!(
2954 msg.contains("unknown data format"),
2955 "error should mention unknown format, got: {msg}"
2956 );
2957 assert!(
2958 msg.contains("csv"),
2959 "error should mention format name, got: {msg}"
2960 );
2961 }
2962
2963 #[test]
2964 fn test_builder_unmarshal_returns_err_for_unknown_format() {
2965 let result = RouteBuilder::from("timer:tick")
2966 .route_id("test-route")
2967 .unmarshal("csv");
2968 let err = match result {
2969 Err(e) => e,
2970 Ok(_) => panic!("unmarshal with unknown format should return Err"),
2971 };
2972 let msg = err.to_string();
2973 assert!(
2974 msg.contains("unknown data format"),
2975 "error should mention unknown format, got: {msg}"
2976 );
2977 assert!(
2978 msg.contains("csv"),
2979 "error should mention format name, got: {msg}"
2980 );
2981 }
2982
2983 #[test]
2984 fn test_builder_recipient_list_creates_step() {
2985 let route = RouteBuilder::from("direct:start")
2986 .route_id("recipient-list-test")
2987 .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2988 .build()
2989 .unwrap();
2990
2991 assert!(matches!(
2992 &route.steps()[0],
2993 BuilderStep::RecipientList { .. }
2994 ));
2995 }
2996
2997 #[test]
2998 fn test_builder_recipient_list_with_config_creates_step() {
2999 let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
3000
3001 let route = RouteBuilder::from("direct:start")
3002 .route_id("recipient-list-config-test")
3003 .recipient_list_with_config(config)
3004 .build()
3005 .unwrap();
3006
3007 assert!(matches!(
3008 &route.steps()[0],
3009 BuilderStep::RecipientList { .. }
3010 ));
3011 }
3012
3013 #[test]
3014 fn test_builder_script_adds_script_step() {
3015 let route = RouteBuilder::from("direct:start")
3016 .route_id("script-test")
3017 .script("rhai", "headers[\"x\"] = \"y\"")
3018 .build()
3019 .unwrap();
3020
3021 assert!(matches!(
3022 &route.steps()[0],
3023 BuilderStep::Script { language, script }
3024 if language == "rhai" && script == "headers[\"x\"] = \"y\""
3025 ));
3026 }
3027
3028 #[test]
3029 fn test_builder_delay_and_delay_with_header_add_steps() {
3030 let route = RouteBuilder::from("direct:start")
3031 .route_id("delay-test")
3032 .delay(Duration::from_millis(250))
3033 .delay_with_header(Duration::from_millis(500), "x-delay")
3034 .build()
3035 .unwrap();
3036
3037 assert_eq!(route.steps().len(), 2);
3038 assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
3039 assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
3040 }
3041
3042 #[test]
3043 fn test_builder_log_and_stop_add_steps_in_order() {
3044 let route = RouteBuilder::from("direct:start")
3045 .route_id("log-stop-test")
3046 .log("hello", LogLevel::Info)
3047 .stop()
3048 .to("mock:after")
3049 .build()
3050 .unwrap();
3051
3052 assert_eq!(route.steps().len(), 3);
3053 assert!(matches!(
3054 &route.steps()[0],
3055 BuilderStep::Log { message, .. } if message == "hello"
3056 ));
3057 assert!(matches!(&route.steps()[1], BuilderStep::Stop));
3058 assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
3059 }
3060
3061 #[test]
3062 fn test_builder_stream_cache_default_adds_processor_step() {
3063 let route = RouteBuilder::from("direct:start")
3064 .route_id("stream-cache-default-test")
3065 .stream_cache_default()
3066 .build()
3067 .unwrap();
3068
3069 assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
3070 }
3071
3072 #[test]
3073 fn test_validate_creates_validate_step_with_expression() {
3074 let route = RouteBuilder::from("direct:in")
3075 .route_id("validate-prefix-test")
3076 .validate("${body.size()} > 0")
3077 .build()
3078 .unwrap();
3079
3080 assert!(matches!(
3081 &route.steps()[0],
3082 BuilderStep::Validate { predicate } if predicate.language == "simple" && predicate.source == "${body.size()} > 0"
3083 ));
3084 }
3085
3086 #[test]
3087 fn test_load_balance_builder_weighted_failover_config() {
3088 let route = RouteBuilder::from("direct:start")
3089 .route_id("lb-weighted-failover")
3090 .load_balance()
3091 .weighted(vec![
3092 ("direct:a".to_string(), 3),
3093 ("direct:b".to_string(), 1),
3094 ])
3095 .failover()
3096 .to("mock:result")
3097 .end_load_balance()
3098 .build()
3099 .unwrap();
3100
3101 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3102 assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3103 } else {
3104 panic!("Expected LoadBalance step");
3105 }
3106 }
3107
3108 #[test]
3109 fn test_multicast_builder_all_config_setters() {
3110 let route = RouteBuilder::from("direct:start")
3111 .route_id("multicast-config-test")
3112 .multicast()
3113 .parallel(true)
3114 .parallel_limit(4)
3115 .stop_on_exception(true)
3116 .timeout(Duration::from_millis(300))
3117 .aggregation(MulticastStrategy::Original)
3118 .to("mock:a")
3119 .end_multicast()
3120 .build()
3121 .unwrap();
3122
3123 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3124 assert!(config.parallel);
3125 assert_eq!(config.parallel_limit, Some(4));
3126 assert!(config.stop_on_exception);
3127 assert_eq!(config.timeout, Some(Duration::from_millis(300)));
3128 assert!(matches!(config.aggregation, MulticastStrategy::Original));
3129 } else {
3130 panic!("Expected Multicast step");
3131 }
3132 }
3133
3134 #[test]
3135 fn test_build_canonical_rejects_unsupported_processor_step() {
3136 let err = RouteBuilder::from("direct:start")
3137 .route_id("canonical-reject")
3138 .set_header("k", Value::String("v".into()))
3139 .build_canonical()
3140 .unwrap_err();
3141
3142 assert!(format!("{err}").contains("does not support step `processor`"));
3143 }
3144
3145 #[test]
3148 fn test_load_balance_builder_weighted_strategy() {
3149 let route = RouteBuilder::from("direct:start")
3150 .route_id("lb-weighted")
3151 .load_balance()
3152 .weighted(vec![
3153 ("direct:a".to_string(), 5),
3154 ("direct:b".to_string(), 2),
3155 ("direct:c".to_string(), 1),
3156 ])
3157 .to("mock:result")
3158 .end_load_balance()
3159 .build()
3160 .unwrap();
3161
3162 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3163 assert!(matches!(config.strategy, LoadBalanceStrategy::Weighted(_)));
3164 } else {
3165 panic!("Expected LoadBalance step");
3166 }
3167 }
3168
3169 #[test]
3170 fn test_load_balance_builder_failover_strategy() {
3171 let route = RouteBuilder::from("direct:start")
3172 .route_id("lb-failover")
3173 .load_balance()
3174 .failover()
3175 .to("mock:primary")
3176 .end_load_balance()
3177 .build()
3178 .unwrap();
3179
3180 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3181 assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3182 } else {
3183 panic!("Expected LoadBalance step");
3184 }
3185 }
3186
3187 #[test]
3190 fn test_filter_in_split_builder_typestate() {
3191 use camel_api::splitter::{SplitterConfig, split_body_lines};
3192
3193 let definition = RouteBuilder::from("timer:test")
3194 .route_id("filter-in-split")
3195 .split(SplitterConfig::new(split_body_lines()))
3196 .filter(|_ex| true)
3197 .to("mock:filtered")
3198 .end_filter()
3199 .end_split()
3200 .build()
3201 .unwrap();
3202
3203 assert_eq!(definition.steps().len(), 1);
3204 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3205 assert_eq!(steps.len(), 1);
3206 assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3207 } else {
3208 panic!("Expected Split step");
3209 }
3210 }
3211
3212 #[test]
3213 fn test_filter_in_split_builder_multiple_steps() {
3214 use camel_api::splitter::{SplitterConfig, split_body_lines};
3215
3216 let definition = RouteBuilder::from("timer:test")
3217 .route_id("filter-in-split-multi")
3218 .split(SplitterConfig::new(split_body_lines()))
3219 .to("mock:before-filter")
3220 .filter(|_ex| true)
3221 .to("mock:inside-filter")
3222 .end_filter()
3223 .to("mock:after-filter")
3224 .end_split()
3225 .build()
3226 .unwrap();
3227
3228 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3229 assert_eq!(steps.len(), 3);
3231 } else {
3232 panic!("Expected Split step");
3233 }
3234 }
3235
3236 #[test]
3239 fn test_build_canonical_with_circuit_breaker() {
3240 use camel_api::circuit_breaker::CircuitBreakerConfig;
3241
3242 let spec = RouteBuilder::from("direct:start")
3243 .route_id("canonical-cb")
3244 .circuit_breaker(CircuitBreakerConfig::new().failure_threshold(10))
3245 .to("mock:result")
3246 .build_canonical()
3247 .unwrap();
3248
3249 let cb = spec.circuit_breaker.expect("circuit breaker should be set");
3250 assert_eq!(cb.failure_threshold, 10);
3251 }
3252
3253 #[test]
3254 fn test_build_canonical_rejects_custom_split_aggregation() {
3255 use camel_api::splitter::{SplitterConfig, split_body_lines};
3256
3257 let err = RouteBuilder::from("direct:start")
3258 .route_id("canonical-custom-split")
3259 .split(SplitterConfig::new(split_body_lines()).aggregation(
3260 camel_api::splitter::AggregationStrategy::Custom(Arc::new(|_, ex| ex)),
3261 ))
3262 .to("mock:frag")
3263 .end_split()
3264 .build_canonical()
3265 .unwrap_err();
3266
3267 assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3269 }
3270
3271 #[test]
3272 fn test_build_canonical_rejects_custom_aggregate_strategy() {
3273 let err = RouteBuilder::from("direct:start")
3274 .route_id("canonical-custom-agg")
3275 .aggregate(
3276 AggregatorConfig::correlate_by("key")
3277 .complete_when_size(2)
3278 .strategy(AggregationStrategy::Custom(Arc::new(|_, ex| ex)))
3279 .build()
3280 .unwrap(),
3281 )
3282 .build_canonical()
3283 .unwrap_err();
3284
3285 assert!(format!("{err}").contains("custom aggregate strategy"));
3286 }
3287
3288 #[test]
3289 fn test_build_canonical_rejects_fn_correlation_strategy() {
3290 let err = RouteBuilder::from("direct:start")
3291 .route_id("canonical-fn-corr")
3292 .aggregate(AggregatorConfig {
3293 header_name: "key".to_string(),
3294 completion: CompletionMode::Single(CompletionCondition::Size(1)),
3295 correlation: CorrelationStrategy::Fn(Arc::new(|_| Some("key".to_string()))),
3296 strategy: AggregationStrategy::CollectAll,
3297 max_buckets: None,
3298 bucket_ttl: None,
3299 force_completion_on_stop: false,
3300 discard_on_timeout: false,
3301 })
3302 .build_canonical()
3303 .unwrap_err();
3304
3305 assert!(format!("{err}").contains("Fn correlation strategy"));
3306 }
3307
3308 #[test]
3309 fn test_build_canonical_rejects_predicate_completion() {
3310 let err = RouteBuilder::from("direct:start")
3311 .route_id("canonical-pred-completion")
3312 .aggregate(AggregatorConfig {
3313 header_name: "key".to_string(),
3314 completion: CompletionMode::Single(CompletionCondition::Predicate(Arc::new(
3315 |_| false,
3316 ))),
3317 correlation: CorrelationStrategy::HeaderName("key".to_string()),
3318 strategy: AggregationStrategy::CollectAll,
3319 max_buckets: None,
3320 bucket_ttl: None,
3321 force_completion_on_stop: false,
3322 discard_on_timeout: false,
3323 })
3324 .build_canonical()
3325 .unwrap_err();
3326
3327 assert!(format!("{err}").contains("predicate completion"));
3328 }
3329
3330 #[test]
3331 fn test_build_canonical_with_expression_correlation() {
3332 let spec = RouteBuilder::from("direct:start")
3333 .route_id("canonical-expr-corr")
3334 .aggregate(AggregatorConfig {
3335 header_name: "key".to_string(),
3336 completion: CompletionMode::Single(CompletionCondition::Size(1)),
3337 correlation: CorrelationStrategy::Expression {
3338 expr: "header.key".to_string(),
3339 language: "simple".to_string(),
3340 },
3341 strategy: AggregationStrategy::CollectAll,
3342 max_buckets: None,
3343 bucket_ttl: None,
3344 force_completion_on_stop: false,
3345 discard_on_timeout: false,
3346 })
3347 .build_canonical()
3348 .unwrap();
3349
3350 assert!(spec.steps.iter().any(|s| matches!(s, CanonicalStepSpec::Aggregate(a) if a.correlation_key == Some("header.key".to_string()))));
3351 }
3352
3353 #[test]
3354 fn test_build_canonical_split_rejected_with_closure_expression() {
3355 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
3356
3357 let err = RouteBuilder::from("direct:start")
3359 .route_id("canonical-split-last")
3360 .split(
3361 SplitterConfig::new(split_body_lines()).aggregation(AggregationStrategy::LastWins),
3362 )
3363 .to("mock:frag")
3364 .end_split()
3365 .build_canonical()
3366 .unwrap_err();
3367
3368 assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3369 }
3370
3371 #[test]
3374 fn test_on_exception_full_chain_retry_backoff_jitter_handled_by() {
3375 let definition = RouteBuilder::from("direct:start")
3376 .route_id("on-exception-full")
3377 .dead_letter_channel("log:dlc")
3378 .on_exception(|e| matches!(e, CamelError::Io(_)))
3379 .retry(5)
3380 .with_backoff(Duration::from_millis(10), 2.0, Duration::from_millis(500))
3381 .with_jitter(0.3)
3382 .handled_by("log:io-handler")
3383 .end_on_exception()
3384 .to("mock:out")
3385 .build()
3386 .unwrap();
3387
3388 let cfg = definition
3389 .error_handler_config()
3390 .expect("error handler should be set");
3391 assert_eq!(cfg.policies.len(), 1);
3392 let policy = &cfg.policies[0];
3393 let retry = policy.retry.as_ref().expect("retry should be set");
3394 assert_eq!(retry.max_attempts, 5);
3395 assert_eq!(retry.initial_delay, Duration::from_millis(10));
3396 assert_eq!(retry.multiplier, 2.0);
3397 assert_eq!(retry.max_delay, Duration::from_millis(500));
3398 assert!((retry.jitter_factor - 0.3).abs() < f64::EPSILON);
3399 assert_eq!(policy.handled_by.as_deref(), Some("log:io-handler"));
3400 }
3401
3402 #[test]
3403 fn test_on_exception_jitter_clamped_to_valid_range() {
3404 let definition = RouteBuilder::from("direct:start")
3405 .route_id("jitter-clamp")
3406 .on_exception(|_e| true)
3407 .retry(1)
3408 .with_jitter(5.0)
3409 .end_on_exception()
3410 .to("mock:out")
3411 .build()
3412 .unwrap();
3413
3414 let cfg = definition.error_handler_config().unwrap();
3415 let retry = cfg.policies[0].retry.as_ref().unwrap();
3416 assert!((retry.jitter_factor - 1.0).abs() < f64::EPSILON);
3417 }
3418
3419 #[test]
3422 fn test_builder_process_fn_adds_processor_step() {
3423 use camel_api::BoxProcessorExt;
3424 let processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
3425 let definition = RouteBuilder::from("timer:tick")
3426 .route_id("process-fn-test")
3427 .process_fn(processor)
3428 .build()
3429 .unwrap();
3430
3431 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3432 }
3433
3434 #[test]
3435 fn test_builder_convert_body_to_adds_processor_step() {
3436 let definition = RouteBuilder::from("timer:tick")
3437 .route_id("convert-body-test")
3438 .convert_body_to(BodyType::Json)
3439 .build()
3440 .unwrap();
3441
3442 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3443 }
3444
3445 #[test]
3446 fn test_builder_bean_adds_bean_step() {
3447 let definition = RouteBuilder::from("timer:tick")
3448 .route_id("bean-test")
3449 .bean("myBean", "process")
3450 .build()
3451 .unwrap();
3452
3453 assert!(
3454 matches!(&definition.steps()[0], BuilderStep::Bean { name, method }
3455 if name == "myBean" && method == "process")
3456 );
3457 }
3458
3459 #[test]
3462 fn test_throttle_builder_delay_strategy() {
3463 let definition = RouteBuilder::from("timer:tick")
3464 .route_id("throttle-delay")
3465 .throttle(10, Duration::from_secs(1))
3466 .strategy(ThrottleStrategy::Delay)
3467 .to("mock:result")
3468 .end_throttle()
3469 .build()
3470 .unwrap();
3471
3472 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3473 assert_eq!(config.strategy, ThrottleStrategy::Delay);
3474 } else {
3475 panic!("Expected Throttle step");
3476 }
3477 }
3478
3479 #[test]
3480 fn test_throttle_builder_drop_strategy() {
3481 let definition = RouteBuilder::from("timer:tick")
3482 .route_id("throttle-drop")
3483 .throttle(10, Duration::from_secs(1))
3484 .strategy(ThrottleStrategy::Drop)
3485 .to("mock:result")
3486 .end_throttle()
3487 .build()
3488 .unwrap();
3489
3490 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3491 assert_eq!(config.strategy, ThrottleStrategy::Drop);
3492 } else {
3493 panic!("Expected Throttle step");
3494 }
3495 }
3496
3497 #[test]
3500 fn test_nested_loop_while_builder() {
3501 use camel_api::loop_eip::LoopMode;
3502
3503 let def = RouteBuilder::from("direct:start")
3504 .route_id("nested-loop-while")
3505 .loop_count(2)
3506 .to("mock:outer")
3507 .loop_while(|_ex| true)
3508 .to("mock:inner")
3509 .end_loop()
3510 .end_loop()
3511 .build()
3512 .unwrap();
3513
3514 assert_eq!(def.steps().len(), 1);
3515 if let BuilderStep::Loop { steps, .. } = &def.steps()[0] {
3516 assert_eq!(steps.len(), 2);
3517 if let BuilderStep::Loop { config, .. } = &steps[1] {
3518 assert!(matches!(config.mode, LoopMode::While(_)));
3519 } else {
3520 panic!("Expected inner Loop step");
3521 }
3522 } else {
3523 panic!("Expected outer Loop step");
3524 }
3525 }
3526
3527 #[test]
3530 fn test_choice_builder_multiple_whens_with_otherwise() {
3531 let definition = RouteBuilder::from("timer:tick")
3532 .route_id("choice-multi-otherwise")
3533 .choice()
3534 .when(|ex: &Exchange| ex.input.header("a").is_some())
3535 .to("mock:a")
3536 .end_when()
3537 .when(|ex: &Exchange| ex.input.header("b").is_some())
3538 .to("mock:b")
3539 .end_when()
3540 .when(|ex: &Exchange| ex.input.header("c").is_some())
3541 .to("mock:c")
3542 .end_when()
3543 .otherwise()
3544 .to("mock:fallback")
3545 .end_otherwise()
3546 .end_choice()
3547 .build()
3548 .unwrap();
3549
3550 if let BuilderStep::Choice { whens, otherwise } = &definition.steps()[0] {
3551 assert_eq!(whens.len(), 3);
3552 assert!(otherwise.is_some());
3553 assert_eq!(otherwise.as_ref().unwrap().len(), 1);
3554 } else {
3555 panic!("Expected Choice step");
3556 }
3557 }
3558
3559 #[test]
3562 fn test_multicast_builder_parallel_only() {
3563 let route = RouteBuilder::from("direct:start")
3564 .route_id("multicast-parallel")
3565 .multicast()
3566 .parallel(true)
3567 .to("mock:a")
3568 .end_multicast()
3569 .build()
3570 .unwrap();
3571
3572 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3573 assert!(config.parallel);
3574 assert_eq!(config.parallel_limit, None);
3575 } else {
3576 panic!("Expected Multicast step");
3577 }
3578 }
3579
3580 #[test]
3581 fn test_multicast_builder_timeout_only() {
3582 let route = RouteBuilder::from("direct:start")
3583 .route_id("multicast-timeout")
3584 .multicast()
3585 .timeout(Duration::from_secs(5))
3586 .to("mock:a")
3587 .end_multicast()
3588 .build()
3589 .unwrap();
3590
3591 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3592 assert_eq!(config.timeout, Some(Duration::from_secs(5)));
3593 } else {
3594 panic!("Expected Multicast step");
3595 }
3596 }
3597
3598 #[test]
3599 fn test_multicast_builder_aggregation_collect_all() {
3600 let route = RouteBuilder::from("direct:start")
3601 .route_id("multicast-collect")
3602 .multicast()
3603 .aggregation(MulticastStrategy::CollectAll)
3604 .to("mock:a")
3605 .end_multicast()
3606 .build()
3607 .unwrap();
3608
3609 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3610 assert!(matches!(config.aggregation, MulticastStrategy::CollectAll));
3611 } else {
3612 panic!("Expected Multicast step");
3613 }
3614 }
3615
3616 #[test]
3619 fn test_build_canonical_aggregate_any_completion_mode() {
3620 let spec = RouteBuilder::from("direct:start")
3621 .route_id("canonical-any-completion")
3622 .aggregate(
3623 AggregatorConfig::correlate_by("key")
3624 .complete_on_size_or_timeout(10, Duration::from_secs(30))
3625 .build()
3626 .unwrap(),
3627 )
3628 .build_canonical()
3629 .unwrap();
3630
3631 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3632 assert_eq!(agg.completion_size, Some(10));
3633 assert_eq!(agg.completion_timeout_ms, Some(30_000));
3634 } else {
3635 panic!("Expected Aggregate step");
3636 }
3637 }
3638
3639 #[test]
3640 fn test_build_canonical_aggregate_timeout_completion() {
3641 let spec = RouteBuilder::from("direct:start")
3642 .route_id("canonical-timeout-completion")
3643 .aggregate(
3644 AggregatorConfig::correlate_by("key")
3645 .complete_on_timeout(Duration::from_millis(500))
3646 .build()
3647 .unwrap(),
3648 )
3649 .build_canonical()
3650 .unwrap();
3651
3652 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3653 assert_eq!(agg.completion_size, None);
3654 assert_eq!(agg.completion_timeout_ms, Some(500));
3655 } else {
3656 panic!("Expected Aggregate step");
3657 }
3658 }
3659
3660 #[test]
3663 fn test_build_canonical_aggregate_discard_on_timeout() {
3664 use camel_api::aggregator::AggregatorConfig;
3665
3666 let spec = RouteBuilder::from("direct:start")
3667 .route_id("canonical-discard-timeout")
3668 .aggregate(
3669 AggregatorConfig::correlate_by("key")
3670 .complete_when_size(1)
3671 .discard_on_timeout(true)
3672 .build()
3673 .unwrap(),
3674 )
3675 .build_canonical()
3676 .unwrap();
3677
3678 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3679 assert_eq!(agg.discard_on_timeout, Some(true));
3680 } else {
3681 panic!("Expected Aggregate step");
3682 }
3683 }
3684
3685 #[test]
3686 fn test_build_canonical_aggregate_force_completion_on_stop() {
3687 use camel_api::aggregator::AggregatorConfig;
3688
3689 let spec = RouteBuilder::from("direct:start")
3690 .route_id("canonical-force-stop")
3691 .aggregate(
3692 AggregatorConfig::correlate_by("key")
3693 .complete_when_size(1)
3694 .force_completion_on_stop(true)
3695 .build()
3696 .unwrap(),
3697 )
3698 .build_canonical()
3699 .unwrap();
3700
3701 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3702 assert_eq!(agg.force_completion_on_stop, Some(true));
3703 } else {
3704 panic!("Expected Aggregate step");
3705 }
3706 }
3707
3708 #[test]
3711 fn test_build_canonical_aggregate_max_buckets_and_ttl() {
3712 use camel_api::aggregator::AggregatorConfig;
3713
3714 let spec = RouteBuilder::from("direct:start")
3715 .route_id("canonical-buckets-ttl")
3716 .aggregate(
3717 AggregatorConfig::correlate_by("key")
3718 .complete_when_size(1)
3719 .max_buckets(100)
3720 .bucket_ttl(Duration::from_secs(60))
3721 .build()
3722 .unwrap(),
3723 )
3724 .build_canonical()
3725 .unwrap();
3726
3727 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3728 assert_eq!(agg.max_buckets, Some(100));
3729 assert_eq!(agg.bucket_ttl_ms, Some(60_000));
3730 } else {
3731 panic!("Expected Aggregate step");
3732 }
3733 }
3734
3735 #[test]
3738 fn test_split_builder_with_filter_inside() {
3739 use camel_api::splitter::{SplitterConfig, split_body_lines};
3740
3741 let definition = RouteBuilder::from("timer:test")
3742 .route_id("split-with-filter")
3743 .split(SplitterConfig::new(split_body_lines()))
3744 .filter(|_ex| true)
3745 .to("mock:filtered-frag")
3746 .end_filter()
3747 .end_split()
3748 .build()
3749 .unwrap();
3750
3751 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3752 assert_eq!(steps.len(), 1);
3753 assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3754 } else {
3755 panic!("Expected Split step");
3756 }
3757 }
3758
3759 #[test]
3762 fn test_wire_tap_multiple_taps() {
3763 let definition = RouteBuilder::from("timer:tick")
3764 .route_id("multi-wire-tap")
3765 .wire_tap("mock:tap1")
3766 .wire_tap("mock:tap2")
3767 .to("mock:result")
3768 .build()
3769 .unwrap();
3770
3771 assert_eq!(definition.steps().len(), 3);
3772 assert!(
3773 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap1")
3774 );
3775 assert!(
3776 matches!(&definition.steps()[1], BuilderStep::WireTap { uri } if uri == "mock:tap2")
3777 );
3778 }
3779
3780 #[test]
3783 fn test_builder_shorthand_then_explicit_mixed_mode() {
3784 let result = RouteBuilder::from("direct:start")
3785 .route_id("mixed-mode-2")
3786 .dead_letter_channel("log:dlc")
3787 .error_handler(ErrorHandlerConfig::log_only())
3788 .to("mock:out")
3789 .build();
3790
3791 let err = result.err().expect("mixed mode should fail");
3792 assert!(format!("{err}").contains("mixed error handler modes"));
3793 }
3794
3795 #[test]
3798 fn test_build_canonical_empty_from_uri_errors() {
3799 let result = RouteBuilder::from("").route_id("test").build_canonical();
3800 assert!(result.is_err());
3801 }
3802
3803 #[test]
3804 fn test_build_canonical_missing_route_id_errors() {
3805 let result = RouteBuilder::from("direct:start").build_canonical();
3806 assert!(result.is_err());
3807 let err = result.unwrap_err().to_string();
3808 assert!(err.contains("route_id"));
3809 }
3810
3811 #[test]
3814 fn test_split_builder_with_aggregate_inside() {
3815 use camel_api::aggregator::AggregatorConfig;
3816 use camel_api::splitter::{SplitterConfig, split_body_lines};
3817
3818 let definition = RouteBuilder::from("timer:test")
3819 .route_id("split-agg")
3820 .split(SplitterConfig::new(split_body_lines()))
3821 .aggregate(
3822 AggregatorConfig::correlate_by("frag-key")
3823 .complete_when_size(3)
3824 .build()
3825 .unwrap(),
3826 )
3827 .end_split()
3828 .build()
3829 .unwrap();
3830
3831 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3832 assert_eq!(steps.len(), 1);
3833 assert!(matches!(&steps[0], BuilderStep::Aggregate { .. }));
3834 } else {
3835 panic!("Expected Split step");
3836 }
3837 }
3838
3839 #[test]
3842 fn test_throttle_builder_with_steps_inside() {
3843 let definition = RouteBuilder::from("timer:tick")
3844 .route_id("throttle-steps")
3845 .throttle(10, Duration::from_secs(1))
3846 .set_header("throttled", Value::Bool(true))
3847 .to("mock:throttled")
3848 .end_throttle()
3849 .build()
3850 .unwrap();
3851
3852 if let BuilderStep::Throttle { steps, .. } = &definition.steps()[0] {
3853 assert_eq!(steps.len(), 2);
3854 } else {
3855 panic!("Expected Throttle step");
3856 }
3857 }
3858
3859 #[test]
3862 fn test_load_balance_builder_with_steps_inside() {
3863 let definition = RouteBuilder::from("timer:tick")
3864 .route_id("lb-steps")
3865 .load_balance()
3866 .round_robin()
3867 .set_header("lb", Value::Bool(true))
3868 .to("mock:lb")
3869 .end_load_balance()
3870 .build()
3871 .unwrap();
3872
3873 if let BuilderStep::LoadBalance { steps, .. } = &definition.steps()[0] {
3874 assert_eq!(steps.len(), 2);
3875 } else {
3876 panic!("Expected LoadBalance step");
3877 }
3878 }
3879
3880 #[test]
3883 fn test_multicast_builder_with_steps_inside() {
3884 let definition = RouteBuilder::from("timer:tick")
3885 .route_id("multicast-steps")
3886 .multicast()
3887 .set_header("mc", Value::Bool(true))
3888 .to("mock:multicast")
3889 .end_multicast()
3890 .build()
3891 .unwrap();
3892
3893 if let BuilderStep::Multicast { steps, .. } = &definition.steps()[0] {
3894 assert_eq!(steps.len(), 2);
3895 } else {
3896 panic!("Expected Multicast step");
3897 }
3898 }
3899
3900 #[test]
3903 fn test_loop_builder_with_steps_inside() {
3904 let definition = RouteBuilder::from("timer:tick")
3905 .route_id("loop-steps")
3906 .loop_count(3)
3907 .set_header("loop", Value::Bool(true))
3908 .to("mock:loop")
3909 .end_loop()
3910 .build()
3911 .unwrap();
3912
3913 if let BuilderStep::Loop { steps, .. } = &definition.steps()[0] {
3914 assert_eq!(steps.len(), 2);
3915 } else {
3916 panic!("Expected Loop step");
3917 }
3918 }
3919
3920 #[test]
3923 fn test_build_canonical_rejects_loop_step() {
3924 let err = RouteBuilder::from("direct:start")
3925 .route_id("canonical-loop")
3926 .loop_count(3)
3927 .to("mock:loop")
3928 .end_loop()
3929 .build_canonical()
3930 .unwrap_err();
3931
3932 assert!(format!("{err}").contains("does not support step `loop`"));
3933 }
3934
3935 #[test]
3936 fn test_build_canonical_rejects_multicast_step() {
3937 let err = RouteBuilder::from("direct:start")
3938 .route_id("canonical-multicast")
3939 .multicast()
3940 .to("mock:a")
3941 .end_multicast()
3942 .build_canonical()
3943 .unwrap_err();
3944
3945 assert!(format!("{err}").contains("does not support step `multicast`"));
3946 }
3947
3948 #[test]
3949 fn test_build_canonical_rejects_throttle_step() {
3950 let err = RouteBuilder::from("direct:start")
3951 .route_id("canonical-throttle")
3952 .throttle(10, Duration::from_secs(1))
3953 .to("mock:result")
3954 .end_throttle()
3955 .build_canonical()
3956 .unwrap_err();
3957
3958 assert!(format!("{err}").contains("does not support step `throttle`"));
3959 }
3960
3961 #[test]
3962 fn test_build_canonical_rejects_load_balancer_step() {
3963 let err = RouteBuilder::from("direct:start")
3964 .route_id("canonical-lb")
3965 .load_balance()
3966 .round_robin()
3967 .to("mock:result")
3968 .end_load_balance()
3969 .build_canonical()
3970 .unwrap_err();
3971
3972 assert!(format!("{err}").contains("does not support step `load_balancer`"));
3973 }
3974
3975 #[test]
3976 fn test_build_canonical_rejects_bean_step() {
3977 let err = RouteBuilder::from("direct:start")
3978 .route_id("canonical-bean")
3979 .bean("myBean", "process")
3980 .build_canonical()
3981 .unwrap_err();
3982
3983 assert!(format!("{err}").contains("does not support step `bean`"));
3984 }
3985
3986 #[test]
3987 fn test_build_canonical_rejects_script_step() {
3988 let err = RouteBuilder::from("direct:start")
3989 .route_id("canonical-script")
3990 .script("rhai", "x = 1")
3991 .build_canonical()
3992 .unwrap_err();
3993
3994 assert!(format!("{err}").contains("does not support step `script`"));
3995 }
3996
3997 #[test]
3998 fn test_build_canonical_accepts_delay_step() {
3999 let spec = RouteBuilder::from("direct:start")
4000 .route_id("canonical-delay")
4001 .delay(Duration::from_millis(100))
4002 .build_canonical()
4003 .unwrap();
4004
4005 assert!(
4006 spec.steps.iter().any(
4007 |s| matches!(s, CanonicalStepSpec::Delay { delay_ms, .. } if *delay_ms == 100)
4008 )
4009 );
4010 }
4011
4012 #[test]
4013 fn test_build_canonical_accepts_wire_tap_step() {
4014 let spec = RouteBuilder::from("direct:start")
4015 .route_id("canonical-wiretap")
4016 .wire_tap("mock:tap")
4017 .build_canonical()
4018 .unwrap();
4019
4020 assert!(
4021 spec.steps
4022 .iter()
4023 .any(|s| matches!(s, CanonicalStepSpec::WireTap { uri } if uri == "mock:tap"))
4024 );
4025 }
4026
4027 #[test]
4028 fn test_build_canonical_rejects_dynamic_router_step() {
4029 let err = RouteBuilder::from("direct:start")
4030 .route_id("canonical-dyn-router")
4031 .dynamic_router(Arc::new(|_| Some("mock:a".to_string())))
4032 .build_canonical()
4033 .unwrap_err();
4034
4035 assert!(format!("{err}").contains("does not support step `dynamic_router`"));
4036 }
4037
4038 #[test]
4039 fn test_build_canonical_rejects_routing_slip_step() {
4040 let err = RouteBuilder::from("direct:start")
4041 .route_id("canonical-routing-slip")
4042 .routing_slip(Arc::new(|_| Some("mock:a".to_string())))
4043 .build_canonical()
4044 .unwrap_err();
4045
4046 assert!(format!("{err}").contains("does not support step `routing_slip`"));
4047 }
4048
4049 #[test]
4050 fn test_build_canonical_rejects_recipient_list_step() {
4051 let err = RouteBuilder::from("direct:start")
4052 .route_id("canonical-recipient")
4053 .recipient_list(Arc::new(|_| "mock:a".to_string()))
4054 .build_canonical()
4055 .unwrap_err();
4056
4057 assert!(format!("{err}").contains("does not support step `recipient_list`"));
4058 }
4059
4060 #[test]
4063 fn test_build_canonical_rejects_any_mode_with_predicate() {
4064 let err = RouteBuilder::from("direct:start")
4065 .route_id("canonical-any-pred")
4066 .aggregate(AggregatorConfig {
4067 header_name: "key".to_string(),
4068 completion: CompletionMode::Any(vec![
4069 CompletionCondition::Size(5),
4070 CompletionCondition::Predicate(Arc::new(|_| false)),
4071 ]),
4072 correlation: CorrelationStrategy::HeaderName("key".to_string()),
4073 strategy: AggregationStrategy::CollectAll,
4074 max_buckets: None,
4075 bucket_ttl: None,
4076 force_completion_on_stop: false,
4077 discard_on_timeout: false,
4078 })
4079 .build_canonical()
4080 .unwrap_err();
4081
4082 assert!(format!("{err}").contains("predicate completion"));
4083 }
4084
4085 #[test]
4088 fn test_builder_validation_missing_from_uri() {
4089 let result = RouteBuilder::from("")
4090 .route_id("missing-uri-route")
4091 .to("log:info")
4092 .build();
4093 assert!(result.is_err(), "empty from URI should fail validation");
4094 let err = result.err().unwrap().to_string();
4095 assert!(
4096 err.contains("'from'") || err.contains("URI"),
4097 "error should mention from/URI, got: {err}"
4098 );
4099 }
4100
4101 #[test]
4102 fn test_builder_validation_invalid_step_uri_scheme() {
4103 let result = RouteBuilder::from("timer:tick")
4104 .route_id("bad-step-route")
4105 .to("not-a-valid-uri") .build();
4107 assert!(
4110 result.is_ok(),
4111 "builder should accept opaque step URIs; resolution happens later"
4112 );
4113 }
4114
4115 #[test]
4118 fn test_builder_duplicate_route_ids_produce_identical_definitions() {
4119 let route1 = RouteBuilder::from("direct:a")
4122 .route_id("dup-route")
4123 .to("mock:out")
4124 .build();
4125 let route2 = RouteBuilder::from("direct:b")
4126 .route_id("dup-route")
4127 .to("mock:out")
4128 .build();
4129
4130 assert!(route1.is_ok());
4131 assert!(route2.is_ok());
4132 assert_eq!(route1.unwrap().route_id(), route2.unwrap().route_id());
4133 }
4134}