1use camel_api::DelayConfig;
7use camel_api::aggregator::{
8 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
9};
10use camel_api::body::Body;
11use camel_api::body_converter::BodyType;
12use camel_api::circuit_breaker::CircuitBreakerConfig;
13use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
14use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
15use camel_api::load_balancer::LoadBalancerConfig;
16use camel_api::loop_eip::{LoopConfig, LoopMode};
17use camel_api::multicast::{MulticastConfig, MulticastStrategy};
18use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
19use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
20use camel_api::splitter::SplitterConfig;
21use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
22use camel_api::{
23 BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
24 ProcessorFn, Value,
25 runtime::{
26 CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
27 CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
28 CanonicalWhenSpec,
29 },
30};
31use camel_component_api::ConcurrencyModel;
32use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
33use camel_processor::{
34 ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
35 StreamCacheService, UnmarshalService, builtin_data_format,
36};
37
38pub trait StepAccumulator: Sized {
44 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
45
46 fn to(mut self, endpoint: impl Into<String>) -> Self {
47 self.steps_mut().push(BuilderStep::To(endpoint.into()));
48 self
49 }
50
51 fn process<F, Fut>(mut self, f: F) -> Self
52 where
53 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
54 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
55 {
56 let svc = ProcessorFn::new(f);
57 self.steps_mut()
58 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
59 self
60 }
61
62 fn process_fn(mut self, processor: BoxProcessor) -> Self {
63 self.steps_mut().push(BuilderStep::Processor(processor));
64 self
65 }
66
67 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
68 let svc = SetHeader::new(IdentityProcessor, key, value);
69 self.steps_mut()
70 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
71 self
72 }
73
74 fn map_body<F>(mut self, mapper: F) -> Self
75 where
76 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
77 {
78 let svc = MapBody::new(IdentityProcessor, mapper);
79 self.steps_mut()
80 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
81 self
82 }
83
84 fn set_body<B>(mut self, body: B) -> Self
85 where
86 B: Into<Body> + Clone + Send + Sync + 'static,
87 {
88 let body: Body = body.into();
89 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
90 self.steps_mut()
91 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
92 self
93 }
94
95 fn transform<B>(self, body: B) -> Self
100 where
101 B: Into<Body> + Clone + Send + Sync + 'static,
102 {
103 self.set_body(body)
104 }
105
106 fn set_body_fn<F>(mut self, expr: F) -> Self
107 where
108 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
109 {
110 let svc = SetBody::new(IdentityProcessor, expr);
111 self.steps_mut()
112 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
113 self
114 }
115
116 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
117 where
118 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
119 {
120 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
121 self.steps_mut()
122 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
123 self
124 }
125
126 fn aggregate(mut self, config: AggregatorConfig) -> Self {
127 self.steps_mut().push(BuilderStep::Aggregate { config });
128 self
129 }
130
131 fn stop(mut self) -> Self {
137 self.steps_mut().push(BuilderStep::Stop);
138 self
139 }
140
141 fn delay(mut self, duration: std::time::Duration) -> Self {
142 self.steps_mut().push(BuilderStep::Delay {
143 config: DelayConfig::from_duration(duration),
144 });
145 self
146 }
147
148 fn delay_with_header(
149 mut self,
150 duration: std::time::Duration,
151 header: impl Into<String>,
152 ) -> Self {
153 self.steps_mut().push(BuilderStep::Delay {
154 config: DelayConfig::from_duration_with_header(duration, header),
155 });
156 self
157 }
158
159 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
163 self.steps_mut().push(BuilderStep::Log {
164 level,
165 message: message.into(),
166 });
167 self
168 }
169
170 fn convert_body_to(mut self, target: BodyType) -> Self {
182 let svc = ConvertBodyTo::new(IdentityProcessor, target);
183 self.steps_mut()
184 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
185 self
186 }
187
188 fn stream_cache(mut self, threshold: usize) -> Self {
189 let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
190 let svc = StreamCacheService::new(IdentityProcessor, config);
191 self.steps_mut()
192 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
193 self
194 }
195
196 fn stream_cache_default(self) -> Self {
200 self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
201 }
202
203 fn marshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
214 let name = format.into();
215 let df = builtin_data_format(&name)
216 .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
217 let svc = MarshalService::new(IdentityProcessor, df);
218 self.steps_mut()
219 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
220 Ok(self)
221 }
222
223 fn unmarshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
234 let name = format.into();
235 let df = builtin_data_format(&name)
236 .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
237 let svc = UnmarshalService::new(IdentityProcessor, df);
238 self.steps_mut()
239 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
240 Ok(self)
241 }
242
243 fn validate(mut self, schema_path: impl Into<String>) -> Self {
252 let path = schema_path.into();
253 let uri = if path.starts_with("validator:") {
254 path
255 } else {
256 format!("validator:{path}")
257 };
258 self.steps_mut().push(BuilderStep::To(uri));
259 self
260 }
261
262 fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
273 self.steps_mut().push(BuilderStep::Script {
274 language: language.into(),
275 script: script.into(),
276 });
277 self
278 }
279
280 fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
281 self.steps_mut().push(BuilderStep::Bean {
282 name: name.into(),
283 method: method.into(),
284 });
285 self
286 }
287}
288
289pub struct RouteBuilder {
304 from_uri: String,
305 steps: Vec<BuilderStep>,
306 error_handler: Option<ErrorHandlerConfig>,
307 error_handler_mode: ErrorHandlerMode,
308 circuit_breaker_config: Option<CircuitBreakerConfig>,
309 security_policy_config: Option<camel_api::security_policy::SecurityPolicyConfig>,
310 security_authenticator: Option<std::sync::Arc<dyn camel_auth::TokenAuthenticator>>,
311 concurrency: Option<ConcurrencyModel>,
312 route_id: Option<String>,
313 auto_startup: Option<bool>,
314 startup_order: Option<i32>,
315}
316
317#[derive(Default)]
318enum ErrorHandlerMode {
319 #[default]
320 None,
321 ExplicitConfig,
322 Shorthand {
323 dlc_uri: Option<String>,
324 specs: Vec<OnExceptionSpec>,
325 },
326 Mixed,
327}
328
329#[derive(Clone)]
330struct OnExceptionSpec {
331 matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
332 retry: Option<RedeliveryPolicy>,
333 handled_by: Option<String>,
334}
335
336impl RouteBuilder {
337 pub fn from(endpoint: &str) -> Self {
339 Self {
340 from_uri: endpoint.to_string(),
341 steps: Vec::new(),
342 error_handler: None,
343 error_handler_mode: ErrorHandlerMode::None,
344 circuit_breaker_config: None,
345 security_policy_config: None,
346 security_authenticator: None,
347 concurrency: None,
348 route_id: None,
349 auto_startup: None,
350 startup_order: None,
351 }
352 }
353
354 pub fn filter<F>(self, predicate: F) -> FilterBuilder
358 where
359 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
360 {
361 FilterBuilder {
362 parent: self,
363 predicate: std::sync::Arc::new(predicate),
364 steps: vec![],
365 }
366 }
367
368 pub fn choice(self) -> ChoiceBuilder {
374 ChoiceBuilder {
375 parent: self,
376 whens: vec![],
377 _otherwise: None,
378 }
379 }
380
381 pub fn wire_tap(mut self, endpoint: &str) -> Self {
385 self.steps.push(BuilderStep::WireTap {
386 uri: endpoint.to_string(),
387 });
388 self
389 }
390
391 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
393 self.error_handler_mode = match self.error_handler_mode {
394 ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
395 ErrorHandlerMode::ExplicitConfig
396 }
397 ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
398 };
399 self.error_handler = Some(config);
400 self
401 }
402
403 pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
405 let uri = uri.into();
406 self.error_handler_mode = match self.error_handler_mode {
407 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
408 dlc_uri: Some(uri),
409 specs: Vec::new(),
410 },
411 ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
412 dlc_uri: Some(uri),
413 specs,
414 },
415 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
416 };
417 self
418 }
419
420 pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
422 where
423 F: Fn(&CamelError) -> bool + Send + Sync + 'static,
424 {
425 self.error_handler_mode = match self.error_handler_mode {
426 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
427 dlc_uri: None,
428 specs: Vec::new(),
429 },
430 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
431 shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
432 };
433
434 OnExceptionBuilder {
435 parent: self,
436 policy: OnExceptionSpec {
437 matches: std::sync::Arc::new(matches),
438 retry: None,
439 handled_by: None,
440 },
441 }
442 }
443
444 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
446 self.circuit_breaker_config = Some(config);
447 self
448 }
449
450 pub fn security_policy(
451 mut self,
452 config: camel_api::security_policy::SecurityPolicyConfig,
453 ) -> Self {
454 self.security_policy_config = Some(config);
455 self
456 }
457
458 pub fn security_authenticator(
459 mut self,
460 auth: std::sync::Arc<dyn camel_auth::TokenAuthenticator>,
461 ) -> Self {
462 self.security_authenticator = Some(auth);
463 self
464 }
465
466 pub fn concurrent(mut self, max: usize) -> Self {
480 let max = if max == 0 { None } else { Some(max) };
481 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
482 self
483 }
484
485 pub fn sequential(mut self) -> Self {
490 self.concurrency = Some(ConcurrencyModel::Sequential);
491 self
492 }
493
494 pub fn route_id(mut self, id: impl Into<String>) -> Self {
498 self.route_id = Some(id.into());
499 self
500 }
501
502 pub fn auto_startup(mut self, auto: bool) -> Self {
506 self.auto_startup = Some(auto);
507 self
508 }
509
510 pub fn startup_order(mut self, order: i32) -> Self {
514 self.startup_order = Some(order);
515 self
516 }
517
518 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
524 SplitBuilder {
525 parent: self,
526 config,
527 steps: Vec::new(),
528 }
529 }
530
531 pub fn multicast(self) -> MulticastBuilder {
537 MulticastBuilder {
538 parent: self,
539 steps: Vec::new(),
540 config: MulticastConfig::new(),
541 }
542 }
543
544 pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
551 ThrottleBuilder {
552 parent: self,
553 config: ThrottlerConfig::new(max_requests, period),
554 steps: Vec::new(),
555 }
556 }
557
558 pub fn loop_count(self, count: usize) -> LoopBuilder {
560 LoopBuilder {
561 parent: self,
562 config: LoopConfig {
563 mode: LoopMode::Count(count),
564 },
565 steps: vec![],
566 }
567 }
568
569 pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
571 where
572 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
573 {
574 LoopBuilder {
575 parent: self,
576 config: LoopConfig {
577 mode: LoopMode::While(std::sync::Arc::new(predicate)),
578 },
579 steps: vec![],
580 }
581 }
582
583 pub fn load_balance(self) -> LoadBalancerBuilder {
589 LoadBalancerBuilder {
590 parent: self,
591 config: LoadBalancerConfig::round_robin(),
592 steps: Vec::new(),
593 }
594 }
595
596 pub fn dynamic_router(self, expression: RouterExpression) -> Self {
612 self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
613 }
614
615 pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
619 self.steps.push(BuilderStep::DynamicRouter { config });
620 self
621 }
622
623 pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
624 self.routing_slip_with_config(RoutingSlipConfig::new(expression))
625 }
626
627 pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
628 self.steps.push(BuilderStep::RoutingSlip { config });
629 self
630 }
631
632 pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
633 self.recipient_list_with_config(RecipientListConfig::new(expression))
634 }
635
636 pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
637 self.steps.push(BuilderStep::RecipientList { config });
638 self
639 }
640
641 pub fn build(self) -> Result<RouteDefinition, CamelError> {
647 validate_uri(&self.from_uri)?;
648 let route_id = self
649 .route_id
650 .filter(|s| !s.trim().is_empty())
651 .ok_or_else(|| {
652 CamelError::RouteError(
653 "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
654 .to_string(),
655 )
656 })?;
657 let resolved_error_handler = match self.error_handler_mode {
658 ErrorHandlerMode::None => self.error_handler,
659 ErrorHandlerMode::ExplicitConfig => self.error_handler,
660 ErrorHandlerMode::Mixed => {
661 return Err(CamelError::RouteError(
662 "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
663 ));
664 }
665 ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
666 let mut config = if let Some(uri) = dlc_uri {
667 ErrorHandlerConfig::dead_letter_channel(uri)
668 } else {
669 ErrorHandlerConfig::log_only()
670 };
671
672 for spec in specs {
673 let matcher = spec.matches.clone();
674 let mut builder = config.on_exception(move |e| matcher(e));
675
676 if let Some(retry) = spec.retry {
677 builder = builder.retry(retry.max_attempts).with_backoff(
678 retry.initial_delay,
679 retry.multiplier,
680 retry.max_delay,
681 );
682 if retry.jitter_factor > 0.0 {
683 builder = builder.with_jitter(retry.jitter_factor);
684 }
685 }
686
687 if let Some(uri) = spec.handled_by {
688 builder = builder.handled_by(uri);
689 }
690
691 config = builder.build();
692 }
693
694 Some(config)
695 }
696 };
697
698 let definition = RouteDefinition::new(self.from_uri, self.steps);
699 let definition = if let Some(eh) = resolved_error_handler {
700 definition.with_error_handler(eh)
701 } else {
702 definition
703 };
704 let definition = if let Some(cb) = self.circuit_breaker_config {
705 definition.with_circuit_breaker(cb)
706 } else {
707 definition
708 };
709 let definition = if let Some(sp) = self.security_policy_config {
710 definition.with_security_policy(sp)
711 } else {
712 definition
713 };
714 let definition = if let Some(auth) = self.security_authenticator {
715 definition.with_security_authenticator(auth)
716 } else {
717 definition
718 };
719 let definition = if let Some(concurrency) = self.concurrency {
720 definition.with_concurrency(concurrency)
721 } else {
722 definition
723 };
724 let definition = definition.with_route_id(route_id);
725 let definition = if let Some(auto) = self.auto_startup {
726 definition.with_auto_startup(auto)
727 } else {
728 definition
729 };
730 let definition = if let Some(order) = self.startup_order {
731 definition.with_startup_order(order)
732 } else {
733 definition
734 };
735 Ok(definition)
736 }
737
738 pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
740 validate_uri(&self.from_uri)?;
741 let route_id = self
742 .route_id
743 .filter(|s| !s.trim().is_empty())
744 .ok_or_else(|| {
745 CamelError::RouteError(
746 "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
747 .to_string(),
748 )
749 })?;
750
751 let steps = canonicalize_steps(self.steps)?;
752 let circuit_breaker = self
753 .circuit_breaker_config
754 .map(canonicalize_circuit_breaker);
755
756 if self.security_policy_config.is_some() {
757 return Err(CamelError::RouteError(
758 "routes with security_policy cannot use the canonical/hot-reload path (not yet supported)"
759 .into(),
760 ));
761 }
762
763 let spec = CanonicalRouteSpec {
764 route_id,
765 from: self.from_uri,
766 steps,
767 circuit_breaker,
768 version: camel_api::CANONICAL_CONTRACT_VERSION,
769 };
770 spec.validate_contract()?;
771 Ok(spec)
772 }
773}
774
775pub struct OnExceptionBuilder {
776 parent: RouteBuilder,
777 policy: OnExceptionSpec,
778}
779
780impl OnExceptionBuilder {
781 pub fn retry(mut self, max_attempts: u32) -> Self {
782 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
783 self
784 }
785
786 pub fn with_backoff(
787 mut self,
788 initial: std::time::Duration,
789 multiplier: f64,
790 max: std::time::Duration,
791 ) -> Self {
792 if let Some(ref mut retry) = self.policy.retry {
793 retry.initial_delay = initial;
794 retry.multiplier = multiplier;
795 retry.max_delay = max;
796 } else {
797 tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
798 }
799 self
800 }
801
802 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
803 if let Some(ref mut retry) = self.policy.retry {
804 retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
805 } else {
806 tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
807 }
808 self
809 }
810
811 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
812 self.policy.handled_by = Some(uri.into());
813 self
814 }
815
816 pub fn end_on_exception(mut self) -> RouteBuilder {
817 if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
818 specs.push(self.policy);
819 }
820 self.parent
821 }
822}
823
824fn validate_uri(uri: &str) -> Result<(), CamelError> {
826 let trimmed = uri.trim();
827 if trimmed.is_empty() {
828 return Err(CamelError::RouteError(
829 "route must have a 'from' URI".to_string(),
830 ));
831 }
832 if !trimmed.contains(':') {
833 return Err(CamelError::RouteError(
834 "URI must have a scheme (e.g. 'timer:tick')".to_string(),
835 ));
836 }
837 let scheme = trimmed.split(':').next().unwrap_or("");
838 if scheme.trim().is_empty() {
839 return Err(CamelError::RouteError(
840 "URI scheme must not be empty".to_string(),
841 ));
842 }
843 Ok(())
844}
845
846fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
847 let mut canonical = Vec::with_capacity(steps.len());
848 for step in steps {
849 canonical.push(canonicalize_step(step)?);
850 }
851 Ok(canonical)
852}
853
854fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
855 match step {
856 BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
857 BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
858 BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
859 BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
860 BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
861 delay_ms: config.delay_ms,
862 dynamic_header: config.dynamic_header,
863 }),
864 BuilderStep::DeclarativeScript { expression } => {
865 Ok(CanonicalStepSpec::Script { expression })
866 }
867 BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
868 predicate,
869 steps: canonicalize_steps(steps)?,
870 }),
871 BuilderStep::DeclarativeChoice { whens, otherwise } => {
872 let mut canonical_whens = Vec::with_capacity(whens.len());
873 for DeclarativeWhenStep { predicate, steps } in whens {
874 canonical_whens.push(CanonicalWhenSpec {
875 predicate,
876 steps: canonicalize_steps(steps)?,
877 });
878 }
879 let otherwise = match otherwise {
880 Some(steps) => Some(canonicalize_steps(steps)?),
881 None => None,
882 };
883 Ok(CanonicalStepSpec::Choice {
884 whens: canonical_whens,
885 otherwise,
886 })
887 }
888 BuilderStep::DeclarativeSplit {
889 expression,
890 aggregation,
891 parallel,
892 parallel_limit,
893 stop_on_exception,
894 steps,
895 } => Ok(CanonicalStepSpec::Split {
896 expression: CanonicalSplitExpressionSpec::Language(expression),
897 aggregation: canonicalize_split_aggregation(aggregation)?,
898 parallel,
899 parallel_limit,
900 stop_on_exception,
901 steps: canonicalize_steps(steps)?,
902 }),
903 BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate(
904 canonicalize_aggregate(config)?,
905 )),
906 other => {
907 let step_name = canonical_step_name(&other);
908 let detail = camel_api::canonical_contract_rejection_reason(step_name)
909 .unwrap_or("not included in canonical v1");
910 Err(CamelError::RouteError(format!(
911 "canonical v1 does not support step `{step_name}`: {detail}"
912 )))
913 }
914 }
915}
916
917fn canonicalize_split_aggregation(
918 strategy: camel_api::splitter::AggregationStrategy,
919) -> Result<CanonicalSplitAggregationSpec, CamelError> {
920 match strategy {
921 camel_api::splitter::AggregationStrategy::LastWins => {
922 Ok(CanonicalSplitAggregationSpec::LastWins)
923 }
924 camel_api::splitter::AggregationStrategy::CollectAll => {
925 Ok(CanonicalSplitAggregationSpec::CollectAll)
926 }
927 camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
928 "canonical v1 does not support custom split aggregation".to_string(),
929 )),
930 camel_api::splitter::AggregationStrategy::Original => {
931 Ok(CanonicalSplitAggregationSpec::Original)
932 }
933 }
934}
935
936fn extract_completion_fields(
937 mode: &CompletionMode,
938) -> Result<(Option<usize>, Option<u64>), CamelError> {
939 match mode {
940 CompletionMode::Single(cond) => match cond {
941 CompletionCondition::Size(n) => Ok((Some(*n), None)),
942 CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
943 CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
944 "canonical v1 does not support aggregate predicate completion".to_string(),
945 )),
946 },
947 CompletionMode::Any(conds) => {
948 let mut size = None;
949 let mut timeout_ms = None;
950 for cond in conds {
951 match cond {
952 CompletionCondition::Size(n) => size = Some(*n),
953 CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
954 CompletionCondition::Predicate(_) => {
955 return Err(CamelError::RouteError(
956 "canonical v1 does not support aggregate predicate completion"
957 .to_string(),
958 ));
959 }
960 }
961 }
962 Ok((size, timeout_ms))
963 }
964 }
965}
966
967fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
968 let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
969
970 let header = match &config.correlation {
971 CorrelationStrategy::HeaderName(h) => h.clone(),
972 CorrelationStrategy::Expression { expr, .. } => expr.clone(),
973 CorrelationStrategy::Fn(_) => {
974 return Err(CamelError::RouteError(
975 "canonical v1 does not support Fn correlation strategy".to_string(),
976 ));
977 }
978 };
979
980 let correlation_key = match &config.correlation {
981 CorrelationStrategy::HeaderName(_) => None,
982 CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
983 CorrelationStrategy::Fn(_) => unreachable!(),
984 };
985
986 let strategy = match config.strategy {
987 AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
988 AggregationStrategy::Custom(_) => {
989 return Err(CamelError::RouteError(
990 "canonical v1 does not support custom aggregate strategy".to_string(),
991 ));
992 }
993 };
994 let bucket_ttl_ms = config
995 .bucket_ttl
996 .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
997
998 Ok(CanonicalAggregateSpec {
999 header,
1000 completion_size,
1001 completion_timeout_ms,
1002 correlation_key,
1003 force_completion_on_stop: if config.force_completion_on_stop {
1004 Some(true)
1005 } else {
1006 None
1007 },
1008 discard_on_timeout: if config.discard_on_timeout {
1009 Some(true)
1010 } else {
1011 None
1012 },
1013 strategy,
1014 max_buckets: config.max_buckets,
1015 bucket_ttl_ms,
1016 })
1017}
1018
1019fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
1020 CanonicalCircuitBreakerSpec {
1021 failure_threshold: config.failure_threshold,
1022 open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
1023 }
1024}
1025
1026fn canonical_step_name(step: &BuilderStep) -> &'static str {
1027 match step {
1028 BuilderStep::Processor(_) => "processor",
1029 BuilderStep::To(_) => "to",
1030 BuilderStep::Stop => "stop",
1031 BuilderStep::Log { .. } => "log",
1032 BuilderStep::DeclarativeSetHeader { .. } => "set_header",
1033 BuilderStep::DeclarativeSetBody { .. } => "set_body",
1034 BuilderStep::DeclarativeFilter { .. } => "filter",
1035 BuilderStep::DeclarativeChoice { .. } => "choice",
1036 BuilderStep::DeclarativeScript { .. } => "script",
1037 BuilderStep::DeclarativeFunction { .. } => "function",
1038 BuilderStep::DeclarativeSplit { .. } => "split",
1039 BuilderStep::Split { .. } => "split",
1040 BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
1041 BuilderStep::Aggregate { .. } => "aggregate",
1042 BuilderStep::Filter { .. } => "filter",
1043 BuilderStep::Choice { .. } => "choice",
1044 BuilderStep::WireTap { .. } => "wire_tap",
1045 BuilderStep::Delay { .. } => "delay",
1046 BuilderStep::Multicast { .. } => "multicast",
1047 BuilderStep::DeclarativeLog { .. } => "log",
1048 BuilderStep::Bean { .. } => "bean",
1049 BuilderStep::Script { .. } => "script",
1050 BuilderStep::Throttle { .. } => "throttle",
1051 BuilderStep::LoadBalance { .. } => "load_balancer",
1052 BuilderStep::DynamicRouter { .. } => "dynamic_router",
1053 BuilderStep::RoutingSlip { .. } => "routing_slip",
1054 BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
1055 BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
1056 BuilderStep::RecipientList { .. } => "recipient_list",
1057 BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
1058 BuilderStep::DeclarativeSetProperty { .. } => "set_property",
1059 }
1060}
1061
1062impl StepAccumulator for RouteBuilder {
1063 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1064 &mut self.steps
1065 }
1066}
1067
1068pub struct SplitBuilder {
1076 parent: RouteBuilder,
1077 config: SplitterConfig,
1078 steps: Vec<BuilderStep>,
1079}
1080
1081impl SplitBuilder {
1082 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1084 where
1085 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1086 {
1087 FilterInSplitBuilder {
1088 parent: self,
1089 predicate: std::sync::Arc::new(predicate),
1090 steps: vec![],
1091 }
1092 }
1093
1094 pub fn end_split(mut self) -> RouteBuilder {
1097 let split_step = BuilderStep::Split {
1098 config: self.config,
1099 steps: self.steps,
1100 };
1101 self.parent.steps.push(split_step);
1102 self.parent
1103 }
1104}
1105
1106impl StepAccumulator for SplitBuilder {
1107 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1108 &mut self.steps
1109 }
1110}
1111
1112pub struct FilterBuilder {
1114 parent: RouteBuilder,
1115 predicate: FilterPredicate,
1116 steps: Vec<BuilderStep>,
1117}
1118
1119impl FilterBuilder {
1120 pub fn end_filter(mut self) -> RouteBuilder {
1123 let step = BuilderStep::Filter {
1124 predicate: self.predicate,
1125 steps: self.steps,
1126 };
1127 self.parent.steps.push(step);
1128 self.parent
1129 }
1130}
1131
1132impl StepAccumulator for FilterBuilder {
1133 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1134 &mut self.steps
1135 }
1136}
1137
1138pub struct FilterInSplitBuilder {
1140 parent: SplitBuilder,
1141 predicate: FilterPredicate,
1142 steps: Vec<BuilderStep>,
1143}
1144
1145impl FilterInSplitBuilder {
1146 pub fn end_filter(mut self) -> SplitBuilder {
1148 let step = BuilderStep::Filter {
1149 predicate: self.predicate,
1150 steps: self.steps,
1151 };
1152 self.parent.steps.push(step);
1153 self.parent
1154 }
1155}
1156
1157impl StepAccumulator for FilterInSplitBuilder {
1158 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1159 &mut self.steps
1160 }
1161}
1162
1163pub struct ChoiceBuilder {
1170 parent: RouteBuilder,
1171 whens: Vec<WhenStep>,
1172 _otherwise: Option<Vec<BuilderStep>>,
1173}
1174
1175impl ChoiceBuilder {
1176 pub fn when<F>(self, predicate: F) -> WhenBuilder
1179 where
1180 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1181 {
1182 WhenBuilder {
1183 parent: self,
1184 predicate: std::sync::Arc::new(predicate),
1185 steps: vec![],
1186 }
1187 }
1188
1189 pub fn otherwise(self) -> OtherwiseBuilder {
1193 OtherwiseBuilder {
1194 parent: self,
1195 steps: vec![],
1196 }
1197 }
1198
1199 pub fn end_choice(mut self) -> RouteBuilder {
1203 let step = BuilderStep::Choice {
1204 whens: self.whens,
1205 otherwise: self._otherwise,
1206 };
1207 self.parent.steps.push(step);
1208 self.parent
1209 }
1210}
1211
1212pub struct WhenBuilder {
1214 parent: ChoiceBuilder,
1215 predicate: camel_api::FilterPredicate,
1216 steps: Vec<BuilderStep>,
1217}
1218
1219impl WhenBuilder {
1220 pub fn end_when(mut self) -> ChoiceBuilder {
1223 self.parent.whens.push(WhenStep {
1224 predicate: self.predicate,
1225 steps: self.steps,
1226 });
1227 self.parent
1228 }
1229}
1230
1231impl StepAccumulator for WhenBuilder {
1232 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1233 &mut self.steps
1234 }
1235}
1236
1237pub struct OtherwiseBuilder {
1239 parent: ChoiceBuilder,
1240 steps: Vec<BuilderStep>,
1241}
1242
1243impl OtherwiseBuilder {
1244 pub fn end_otherwise(self) -> ChoiceBuilder {
1246 let OtherwiseBuilder { mut parent, steps } = self;
1247 parent._otherwise = Some(steps);
1248 parent
1249 }
1250}
1251
1252impl StepAccumulator for OtherwiseBuilder {
1253 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1254 &mut self.steps
1255 }
1256}
1257
1258pub struct MulticastBuilder {
1266 parent: RouteBuilder,
1267 steps: Vec<BuilderStep>,
1268 config: MulticastConfig,
1269}
1270
1271impl MulticastBuilder {
1272 pub fn parallel(mut self, parallel: bool) -> Self {
1273 self.config = self.config.parallel(parallel);
1274 self
1275 }
1276
1277 pub fn parallel_limit(mut self, limit: usize) -> Self {
1278 self.config = self.config.parallel_limit(limit);
1279 self
1280 }
1281
1282 pub fn stop_on_exception(mut self, stop: bool) -> Self {
1283 self.config = self.config.stop_on_exception(stop);
1284 self
1285 }
1286
1287 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1288 self.config = self.config.timeout(duration);
1289 self
1290 }
1291
1292 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1293 self.config = self.config.aggregation(strategy);
1294 self
1295 }
1296
1297 pub fn end_multicast(mut self) -> RouteBuilder {
1298 let step = BuilderStep::Multicast {
1299 steps: self.steps,
1300 config: self.config,
1301 };
1302 self.parent.steps.push(step);
1303 self.parent
1304 }
1305}
1306
1307impl StepAccumulator for MulticastBuilder {
1308 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1309 &mut self.steps
1310 }
1311}
1312
1313pub struct ThrottleBuilder {
1321 parent: RouteBuilder,
1322 config: ThrottlerConfig,
1323 steps: Vec<BuilderStep>,
1324}
1325
1326impl ThrottleBuilder {
1327 pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1333 self.config = self.config.strategy(strategy);
1334 self
1335 }
1336
1337 pub fn end_throttle(mut self) -> RouteBuilder {
1340 let step = BuilderStep::Throttle {
1341 config: self.config,
1342 steps: self.steps,
1343 };
1344 self.parent.steps.push(step);
1345 self.parent
1346 }
1347}
1348
1349impl StepAccumulator for ThrottleBuilder {
1350 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1351 &mut self.steps
1352 }
1353}
1354
1355pub struct LoopBuilder {
1357 parent: RouteBuilder,
1358 config: LoopConfig,
1359 steps: Vec<BuilderStep>,
1360}
1361
1362impl LoopBuilder {
1363 pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1364 LoopInLoopBuilder {
1365 parent: self,
1366 config: LoopConfig {
1367 mode: LoopMode::Count(count),
1368 },
1369 steps: vec![],
1370 }
1371 }
1372
1373 pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1374 where
1375 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1376 {
1377 LoopInLoopBuilder {
1378 parent: self,
1379 config: LoopConfig {
1380 mode: LoopMode::While(std::sync::Arc::new(predicate)),
1381 },
1382 steps: vec![],
1383 }
1384 }
1385
1386 pub fn end_loop(mut self) -> RouteBuilder {
1387 let step = BuilderStep::Loop {
1388 config: self.config,
1389 steps: self.steps,
1390 };
1391 self.parent.steps.push(step);
1392 self.parent
1393 }
1394}
1395
1396impl StepAccumulator for LoopBuilder {
1397 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1398 &mut self.steps
1399 }
1400}
1401
1402pub struct LoopInLoopBuilder {
1403 parent: LoopBuilder,
1404 config: LoopConfig,
1405 steps: Vec<BuilderStep>,
1406}
1407
1408impl LoopInLoopBuilder {
1409 pub fn end_loop(mut self) -> LoopBuilder {
1410 let step = BuilderStep::Loop {
1411 config: self.config,
1412 steps: self.steps,
1413 };
1414 self.parent.steps.push(step);
1415 self.parent
1416 }
1417}
1418
1419impl StepAccumulator for LoopInLoopBuilder {
1420 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1421 &mut self.steps
1422 }
1423}
1424
1425pub struct LoadBalancerBuilder {
1433 parent: RouteBuilder,
1434 config: LoadBalancerConfig,
1435 steps: Vec<BuilderStep>,
1436}
1437
1438impl LoadBalancerBuilder {
1439 pub fn round_robin(mut self) -> Self {
1441 self.config = LoadBalancerConfig::round_robin();
1442 self
1443 }
1444
1445 pub fn random(mut self) -> Self {
1447 self.config = LoadBalancerConfig::random();
1448 self
1449 }
1450
1451 pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1456 self.config = LoadBalancerConfig::weighted(weights);
1457 self
1458 }
1459
1460 pub fn failover(mut self) -> Self {
1465 self.config = LoadBalancerConfig::failover();
1466 self
1467 }
1468
1469 pub fn parallel(mut self, parallel: bool) -> Self {
1474 self.config = self.config.parallel(parallel);
1475 self
1476 }
1477
1478 pub fn end_load_balance(mut self) -> RouteBuilder {
1481 let step = BuilderStep::LoadBalance {
1482 config: self.config,
1483 steps: self.steps,
1484 };
1485 self.parent.steps.push(step);
1486 self.parent
1487 }
1488}
1489
1490impl StepAccumulator for LoadBalancerBuilder {
1491 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1492 &mut self.steps
1493 }
1494}
1495
1496#[cfg(test)]
1501mod tests {
1502 use super::*;
1503 use camel_api::error_handler::ErrorHandlerConfig;
1504 use camel_api::load_balancer::LoadBalanceStrategy;
1505 use camel_api::{Exchange, Message};
1506 use camel_core::route::BuilderStep;
1507 use std::sync::Arc;
1508 use std::time::Duration;
1509 use tower::{Service, ServiceExt};
1510
1511 #[test]
1512 fn test_builder_from_creates_definition() {
1513 let definition = RouteBuilder::from("timer:tick")
1514 .route_id("test-route")
1515 .build()
1516 .unwrap();
1517 assert_eq!(definition.from_uri(), "timer:tick");
1518 }
1519
1520 #[test]
1521 fn test_builder_empty_from_uri_errors() {
1522 let result = RouteBuilder::from("").route_id("test-route").build();
1523 assert!(result.is_err());
1524 }
1525
1526 #[test]
1527 fn test_build_rejects_schemeless_uri() {
1528 let result = RouteBuilder::from("no-scheme-here")
1529 .route_id("test-route")
1530 .build();
1531 match result {
1532 Err(err) => {
1533 let err_msg = format!("{err}");
1534 assert!(
1535 err_msg.contains("scheme"),
1536 "expected scheme-related error, got: {err_msg}"
1537 );
1538 }
1539 Ok(_) => panic!("schemeless URI should fail"),
1540 }
1541 }
1542
1543 #[test]
1544 fn test_build_rejects_empty_scheme_uri() {
1545 let result = RouteBuilder::from(":missing-scheme")
1546 .route_id("test-route")
1547 .build();
1548 match result {
1549 Err(err) => {
1550 let err_msg = format!("{err}");
1551 assert!(
1552 err_msg.contains("scheme"),
1553 "expected scheme-related error, got: {err_msg}"
1554 );
1555 }
1556 Ok(_) => panic!("empty-scheme URI should fail"),
1557 }
1558 }
1559
1560 #[test]
1561 fn test_build_accepts_valid_uri() {
1562 let result = RouteBuilder::from("timer:tick")
1563 .route_id("test-route")
1564 .build();
1565 assert!(result.is_ok());
1566 }
1567
1568 #[test]
1569 fn test_build_canonical_rejects_schemeless_uri() {
1570 let result = RouteBuilder::from("no-scheme-here")
1571 .route_id("test-route")
1572 .build_canonical();
1573 assert!(result.is_err());
1574 }
1575
1576 #[test]
1577 fn test_builder_to_adds_step() {
1578 let definition = RouteBuilder::from("timer:tick")
1579 .route_id("test-route")
1580 .to("log:info")
1581 .build()
1582 .unwrap();
1583
1584 assert_eq!(definition.from_uri(), "timer:tick");
1585 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1587 }
1588
1589 #[test]
1590 fn test_builder_filter_adds_filter_step() {
1591 let definition = RouteBuilder::from("timer:tick")
1592 .route_id("test-route")
1593 .filter(|_ex| true)
1594 .to("mock:result")
1595 .end_filter()
1596 .build()
1597 .unwrap();
1598
1599 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1600 }
1601
1602 #[test]
1603 fn test_builder_set_header_adds_processor_step() {
1604 let definition = RouteBuilder::from("timer:tick")
1605 .route_id("test-route")
1606 .set_header("key", Value::String("value".into()))
1607 .build()
1608 .unwrap();
1609
1610 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1611 }
1612
1613 #[test]
1614 fn test_builder_map_body_adds_processor_step() {
1615 let definition = RouteBuilder::from("timer:tick")
1616 .route_id("test-route")
1617 .map_body(|body| body)
1618 .build()
1619 .unwrap();
1620
1621 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1622 }
1623
1624 #[test]
1625 fn test_builder_process_adds_processor_step() {
1626 let definition = RouteBuilder::from("timer:tick")
1627 .route_id("test-route")
1628 .process(|ex| async move { Ok(ex) })
1629 .build()
1630 .unwrap();
1631
1632 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1633 }
1634
1635 #[test]
1636 fn test_builder_chain_multiple_steps() {
1637 let definition = RouteBuilder::from("timer:tick")
1638 .route_id("test-route")
1639 .set_header("source", Value::String("timer".into()))
1640 .filter(|ex| ex.input.header("source").is_some())
1641 .to("log:info")
1642 .end_filter()
1643 .to("mock:result")
1644 .build()
1645 .unwrap();
1646
1647 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1651 }
1652
1653 #[test]
1654 fn test_loop_count_builder() {
1655 use camel_api::loop_eip::LoopMode;
1656
1657 let def = RouteBuilder::from("direct:start")
1658 .route_id("loop-test")
1659 .loop_count(3)
1660 .to("mock:inside")
1661 .end_loop()
1662 .to("mock:after")
1663 .build()
1664 .unwrap();
1665
1666 assert_eq!(def.steps().len(), 2);
1667 match &def.steps()[0] {
1668 BuilderStep::Loop { config, steps } => {
1669 assert!(matches!(config.mode, LoopMode::Count(3)));
1670 assert_eq!(steps.len(), 1);
1671 }
1672 other => panic!("Expected Loop, got {:?}", other),
1673 }
1674 assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1675 }
1676
1677 #[test]
1678 fn test_loop_while_builder() {
1679 use camel_api::loop_eip::LoopMode;
1680
1681 let def = RouteBuilder::from("direct:start")
1682 .route_id("loop-while-test")
1683 .loop_while(|_ex| true)
1684 .to("mock:retry")
1685 .end_loop()
1686 .build()
1687 .unwrap();
1688
1689 assert_eq!(def.steps().len(), 1);
1690 match &def.steps()[0] {
1691 BuilderStep::Loop { config, steps } => {
1692 assert!(matches!(config.mode, LoopMode::While(_)));
1693 assert_eq!(steps.len(), 1);
1694 }
1695 other => panic!("Expected Loop, got {:?}", other),
1696 }
1697 }
1698
1699 #[test]
1700 fn test_nested_loop_builder() {
1701 use camel_api::loop_eip::LoopMode;
1702
1703 let def = RouteBuilder::from("direct:start")
1704 .route_id("nested-loop-test")
1705 .loop_count(2)
1706 .to("mock:outer")
1707 .loop_count(3)
1708 .to("mock:inner")
1709 .end_loop()
1710 .end_loop()
1711 .to("mock:after")
1712 .build()
1713 .unwrap();
1714
1715 assert_eq!(def.steps().len(), 2);
1716 match &def.steps()[0] {
1717 BuilderStep::Loop { steps, .. } => {
1718 assert_eq!(steps.len(), 2);
1719 match &steps[1] {
1720 BuilderStep::Loop {
1721 config,
1722 steps: inner_steps,
1723 } => {
1724 assert!(matches!(config.mode, LoopMode::Count(3)));
1725 assert_eq!(inner_steps.len(), 1);
1726 }
1727 other => panic!("Expected nested Loop, got {:?}", other),
1728 }
1729 }
1730 other => panic!("Expected outer Loop, got {:?}", other),
1731 }
1732 }
1733
1734 #[tokio::test]
1739 async fn test_set_header_processor_works() {
1740 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1741 let exchange = Exchange::new(Message::new("test"));
1742 let result = svc.call(exchange).await.unwrap();
1743 assert_eq!(
1744 result.input.header("greeting"),
1745 Some(&Value::String("hello".into()))
1746 );
1747 }
1748
1749 #[tokio::test]
1750 async fn test_filter_processor_passes() {
1751 use camel_api::BoxProcessorExt;
1752 use camel_processor::FilterService;
1753
1754 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1755 let mut svc =
1756 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1757 let exchange = Exchange::new(Message::new("pass"));
1758 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1759 assert_eq!(result.input.body.as_text(), Some("pass"));
1760 }
1761
1762 #[tokio::test]
1763 async fn test_filter_processor_blocks() {
1764 use camel_api::BoxProcessorExt;
1765 use camel_processor::FilterService;
1766
1767 let sub = BoxProcessor::from_fn(|_ex| {
1768 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1769 });
1770 let mut svc =
1771 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1772 let exchange = Exchange::new(Message::new("reject"));
1773 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1774 assert_eq!(result.input.body.as_text(), Some("reject"));
1775 }
1776
1777 #[tokio::test]
1778 async fn test_map_body_processor_works() {
1779 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1780 if let Some(text) = body.as_text() {
1781 Body::Text(text.to_uppercase())
1782 } else {
1783 body
1784 }
1785 });
1786 let exchange = Exchange::new(Message::new("hello"));
1787 let result = mapper.oneshot(exchange).await.unwrap();
1788 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1789 }
1790
1791 #[tokio::test]
1792 async fn test_process_custom_processor_works() {
1793 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1794 ex.set_property("custom", Value::Bool(true));
1795 Ok(ex)
1796 });
1797 let exchange = Exchange::new(Message::default());
1798 let result = processor.oneshot(exchange).await.unwrap();
1799 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1800 }
1801
1802 #[tokio::test]
1807 async fn test_compose_pipeline_runs_steps_in_order() {
1808 use camel_core::route::compose_pipeline;
1809
1810 let processors = vec![
1811 BoxProcessor::new(SetHeader::new(
1812 IdentityProcessor,
1813 "step",
1814 Value::String("one".into()),
1815 )),
1816 BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1817 if let Some(text) = body.as_text() {
1818 Body::Text(format!("{}-processed", text))
1819 } else {
1820 body
1821 }
1822 })),
1823 ];
1824
1825 let pipeline = compose_pipeline(processors);
1826 let exchange = Exchange::new(Message::new("hello"));
1827 let result = pipeline.oneshot(exchange).await.unwrap();
1828
1829 assert_eq!(
1830 result.input.header("step"),
1831 Some(&Value::String("one".into()))
1832 );
1833 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1834 }
1835
1836 #[tokio::test]
1837 async fn test_compose_pipeline_empty_is_identity() {
1838 use camel_core::route::compose_pipeline;
1839
1840 let pipeline = compose_pipeline(vec![]);
1841 let exchange = Exchange::new(Message::new("unchanged"));
1842 let result = pipeline.oneshot(exchange).await.unwrap();
1843 assert_eq!(result.input.body.as_text(), Some("unchanged"));
1844 }
1845
1846 #[test]
1851 fn test_builder_circuit_breaker_sets_config() {
1852 use camel_api::circuit_breaker::CircuitBreakerConfig;
1853
1854 let config = CircuitBreakerConfig::new().failure_threshold(5);
1855 let definition = RouteBuilder::from("timer:tick")
1856 .route_id("test-route")
1857 .circuit_breaker(config)
1858 .build()
1859 .unwrap();
1860
1861 let cb = definition
1862 .circuit_breaker_config()
1863 .expect("circuit breaker should be set");
1864 assert_eq!(cb.failure_threshold, 5);
1865 }
1866
1867 #[test]
1868 fn test_builder_circuit_breaker_with_error_handler() {
1869 use camel_api::circuit_breaker::CircuitBreakerConfig;
1870 use camel_api::error_handler::ErrorHandlerConfig;
1871
1872 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1873 let eh_config = ErrorHandlerConfig::log_only();
1874
1875 let definition = RouteBuilder::from("timer:tick")
1876 .route_id("test-route")
1877 .to("log:info")
1878 .circuit_breaker(cb_config)
1879 .error_handler(eh_config)
1880 .build()
1881 .unwrap();
1882
1883 assert!(
1884 definition.circuit_breaker_config().is_some(),
1885 "circuit breaker config should be set"
1886 );
1887 }
1889
1890 #[test]
1891 fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1892 let definition = RouteBuilder::from("direct:start")
1893 .route_id("test-route")
1894 .dead_letter_channel("log:dlc")
1895 .on_exception(|e| matches!(e, CamelError::Io(_)))
1896 .retry(3)
1897 .handled_by("log:io")
1898 .end_on_exception()
1899 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1900 .retry(1)
1901 .end_on_exception()
1902 .to("mock:out")
1903 .build()
1904 .expect("route should build");
1905
1906 let cfg = definition
1907 .error_handler_config()
1908 .expect("error handler should be set");
1909 assert_eq!(cfg.policies.len(), 2);
1910 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1911 assert_eq!(
1912 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1913 Some(3)
1914 );
1915 assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1916 assert_eq!(
1917 cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1918 Some(1)
1919 );
1920 }
1921
1922 #[test]
1923 fn test_builder_on_exception_mixed_mode_rejected() {
1924 let result = RouteBuilder::from("direct:start")
1925 .route_id("test-route")
1926 .error_handler(ErrorHandlerConfig::log_only())
1927 .on_exception(|_e| true)
1928 .end_on_exception()
1929 .to("mock:out")
1930 .build();
1931
1932 let err = result.err().expect("mixed mode should fail with an error");
1933
1934 assert!(
1935 format!("{err}").contains("mixed error handler modes"),
1936 "unexpected error: {err}"
1937 );
1938 }
1939
1940 #[test]
1941 fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1942 let definition = RouteBuilder::from("direct:start")
1943 .route_id("test-route")
1944 .on_exception(|_e| true)
1945 .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1946 .with_jitter(0.5)
1947 .end_on_exception()
1948 .to("mock:out")
1949 .build()
1950 .expect("route should build");
1951
1952 let cfg = definition
1953 .error_handler_config()
1954 .expect("error handler should be set");
1955 assert_eq!(cfg.policies.len(), 1);
1956 assert!(cfg.policies[0].retry.is_none());
1957 }
1958
1959 #[test]
1960 fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1961 let definition = RouteBuilder::from("direct:start")
1962 .route_id("test-route")
1963 .dead_letter_channel("log:dlc")
1964 .to("mock:out")
1965 .build()
1966 .expect("route should build");
1967
1968 let cfg = definition
1969 .error_handler_config()
1970 .expect("error handler should be set");
1971 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1972 assert!(cfg.policies.is_empty());
1973 }
1974
1975 #[test]
1976 fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1977 let definition = RouteBuilder::from("direct:start")
1978 .route_id("test-route")
1979 .dead_letter_channel("log:first")
1980 .on_exception(|e| matches!(e, CamelError::Io(_)))
1981 .retry(2)
1982 .end_on_exception()
1983 .dead_letter_channel("log:second")
1984 .to("mock:out")
1985 .build()
1986 .expect("route should build");
1987
1988 let cfg = definition
1989 .error_handler_config()
1990 .expect("error handler should be set");
1991 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1992 assert_eq!(cfg.policies.len(), 1);
1993 assert_eq!(
1994 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1995 Some(2)
1996 );
1997 }
1998
1999 #[test]
2000 fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
2001 let definition = RouteBuilder::from("direct:start")
2002 .route_id("test-route")
2003 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
2004 .retry(1)
2005 .end_on_exception()
2006 .to("mock:out")
2007 .build()
2008 .expect("route should build");
2009
2010 let cfg = definition
2011 .error_handler_config()
2012 .expect("error handler should be set");
2013 assert!(cfg.dlc_uri.is_none());
2014 assert_eq!(cfg.policies.len(), 1);
2015 }
2016
2017 #[test]
2018 fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
2019 let first = ErrorHandlerConfig::dead_letter_channel("log:first");
2020 let second = ErrorHandlerConfig::dead_letter_channel("log:second");
2021
2022 let definition = RouteBuilder::from("direct:start")
2023 .route_id("test-route")
2024 .error_handler(first)
2025 .error_handler(second)
2026 .to("mock:out")
2027 .build()
2028 .expect("route should build");
2029
2030 let cfg = definition
2031 .error_handler_config()
2032 .expect("error handler should be set");
2033 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
2034 }
2035
2036 #[test]
2039 fn test_split_builder_typestate() {
2040 use camel_api::splitter::{SplitterConfig, split_body_lines};
2041
2042 let definition = RouteBuilder::from("timer:test?period=1000")
2044 .route_id("test-route")
2045 .split(SplitterConfig::new(split_body_lines()))
2046 .to("mock:per-fragment")
2047 .end_split()
2048 .to("mock:final")
2049 .build()
2050 .unwrap();
2051
2052 assert_eq!(definition.steps().len(), 2);
2054 }
2055
2056 #[test]
2057 fn test_split_builder_steps_collected() {
2058 use camel_api::splitter::{SplitterConfig, split_body_lines};
2059
2060 let definition = RouteBuilder::from("timer:test?period=1000")
2061 .route_id("test-route")
2062 .split(SplitterConfig::new(split_body_lines()))
2063 .set_header("fragment", Value::String("yes".into()))
2064 .to("mock:per-fragment")
2065 .end_split()
2066 .build()
2067 .unwrap();
2068
2069 assert_eq!(definition.steps().len(), 1);
2071 match &definition.steps()[0] {
2072 BuilderStep::Split { steps, .. } => {
2073 assert_eq!(steps.len(), 2); }
2075 other => panic!("Expected Split, got {:?}", other),
2076 }
2077 }
2078
2079 #[test]
2080 fn test_split_builder_config_propagated() {
2081 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
2082
2083 let definition = RouteBuilder::from("timer:test?period=1000")
2084 .route_id("test-route")
2085 .split(
2086 SplitterConfig::new(split_body_lines())
2087 .parallel(true)
2088 .parallel_limit(4)
2089 .aggregation(AggregationStrategy::CollectAll),
2090 )
2091 .to("mock:per-fragment")
2092 .end_split()
2093 .build()
2094 .unwrap();
2095
2096 match &definition.steps()[0] {
2097 BuilderStep::Split { config, .. } => {
2098 assert!(config.parallel);
2099 assert_eq!(config.parallel_limit, Some(4));
2100 assert!(matches!(
2101 config.aggregation,
2102 AggregationStrategy::CollectAll
2103 ));
2104 }
2105 other => panic!("Expected Split, got {:?}", other),
2106 }
2107 }
2108
2109 #[test]
2110 fn test_aggregate_builder_adds_step() {
2111 use camel_api::aggregator::AggregatorConfig;
2112 use camel_core::route::BuilderStep;
2113
2114 let definition = RouteBuilder::from("timer:tick")
2115 .route_id("test-route")
2116 .aggregate(
2117 AggregatorConfig::correlate_by("key")
2118 .complete_when_size(2)
2119 .build()
2120 .unwrap(),
2121 )
2122 .build()
2123 .unwrap();
2124
2125 assert_eq!(definition.steps().len(), 1);
2126 assert!(matches!(
2127 definition.steps()[0],
2128 BuilderStep::Aggregate { .. }
2129 ));
2130 }
2131
2132 #[test]
2133 fn test_aggregate_in_split_builder() {
2134 use camel_api::aggregator::AggregatorConfig;
2135 use camel_api::splitter::{SplitterConfig, split_body_lines};
2136 use camel_core::route::BuilderStep;
2137
2138 let definition = RouteBuilder::from("timer:tick")
2139 .route_id("test-route")
2140 .split(SplitterConfig::new(split_body_lines()))
2141 .aggregate(
2142 AggregatorConfig::correlate_by("key")
2143 .complete_when_size(1)
2144 .build()
2145 .unwrap(),
2146 )
2147 .end_split()
2148 .build()
2149 .unwrap();
2150
2151 assert_eq!(definition.steps().len(), 1);
2152 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2153 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2154 } else {
2155 panic!("expected Split step");
2156 }
2157 }
2158
2159 #[test]
2162 fn test_builder_set_body_static_adds_processor() {
2163 let definition = RouteBuilder::from("timer:tick")
2164 .route_id("test-route")
2165 .set_body("fixed")
2166 .build()
2167 .unwrap();
2168 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2169 }
2170
2171 #[test]
2172 fn test_builder_set_body_fn_adds_processor() {
2173 let definition = RouteBuilder::from("timer:tick")
2174 .route_id("test-route")
2175 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2176 .build()
2177 .unwrap();
2178 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2179 }
2180
2181 #[test]
2182 fn transform_alias_produces_same_as_set_body() {
2183 let route_transform = RouteBuilder::from("timer:tick")
2184 .route_id("test-route")
2185 .transform("hello")
2186 .build()
2187 .unwrap();
2188
2189 let route_set_body = RouteBuilder::from("timer:tick")
2190 .route_id("test-route")
2191 .set_body("hello")
2192 .build()
2193 .unwrap();
2194
2195 assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2196 }
2197
2198 #[test]
2199 fn test_builder_set_header_fn_adds_processor() {
2200 let definition = RouteBuilder::from("timer:tick")
2201 .route_id("test-route")
2202 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2203 .build()
2204 .unwrap();
2205 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2206 }
2207
2208 #[tokio::test]
2209 async fn test_set_body_static_processor_works() {
2210 use camel_core::route::compose_pipeline;
2211 let def = RouteBuilder::from("t:t")
2212 .route_id("test-route")
2213 .set_body("replaced")
2214 .build()
2215 .unwrap();
2216 let pipeline = compose_pipeline(
2217 def.steps()
2218 .iter()
2219 .filter_map(|s| {
2220 if let BuilderStep::Processor(p) = s {
2221 Some(p.clone())
2222 } else {
2223 None
2224 }
2225 })
2226 .collect(),
2227 );
2228 let exchange = Exchange::new(Message::new("original"));
2229 let result = pipeline.oneshot(exchange).await.unwrap();
2230 assert_eq!(result.input.body.as_text(), Some("replaced"));
2231 }
2232
2233 #[tokio::test]
2234 async fn test_set_body_fn_processor_works() {
2235 use camel_core::route::compose_pipeline;
2236 let def = RouteBuilder::from("t:t")
2237 .route_id("test-route")
2238 .set_body_fn(|ex: &Exchange| {
2239 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2240 })
2241 .build()
2242 .unwrap();
2243 let pipeline = compose_pipeline(
2244 def.steps()
2245 .iter()
2246 .filter_map(|s| {
2247 if let BuilderStep::Processor(p) = s {
2248 Some(p.clone())
2249 } else {
2250 None
2251 }
2252 })
2253 .collect(),
2254 );
2255 let exchange = Exchange::new(Message::new("hello"));
2256 let result = pipeline.oneshot(exchange).await.unwrap();
2257 assert_eq!(result.input.body.as_text(), Some("HELLO"));
2258 }
2259
2260 #[tokio::test]
2261 async fn test_set_header_fn_processor_works() {
2262 use camel_core::route::compose_pipeline;
2263 let def = RouteBuilder::from("t:t")
2264 .route_id("test-route")
2265 .set_header_fn("echo", |ex: &Exchange| {
2266 ex.input
2267 .body
2268 .as_text()
2269 .map(|t| Value::String(t.into()))
2270 .unwrap_or(Value::Null)
2271 })
2272 .build()
2273 .unwrap();
2274 let pipeline = compose_pipeline(
2275 def.steps()
2276 .iter()
2277 .filter_map(|s| {
2278 if let BuilderStep::Processor(p) = s {
2279 Some(p.clone())
2280 } else {
2281 None
2282 }
2283 })
2284 .collect(),
2285 );
2286 let exchange = Exchange::new(Message::new("ping"));
2287 let result = pipeline.oneshot(exchange).await.unwrap();
2288 assert_eq!(
2289 result.input.header("echo"),
2290 Some(&Value::String("ping".into()))
2291 );
2292 }
2293
2294 #[test]
2297 fn test_filter_builder_typestate() {
2298 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2299 .route_id("test-route")
2300 .filter(|_ex| true)
2301 .to("mock:inner")
2302 .end_filter()
2303 .to("mock:outer")
2304 .build();
2305 assert!(result.is_ok());
2306 }
2307
2308 #[test]
2309 fn test_filter_builder_steps_collected() {
2310 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2311 .route_id("test-route")
2312 .filter(|_ex| true)
2313 .to("mock:inner")
2314 .end_filter()
2315 .build()
2316 .unwrap();
2317
2318 assert_eq!(definition.steps().len(), 1);
2319 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2320 }
2321
2322 #[test]
2323 fn test_wire_tap_builder_adds_step() {
2324 let definition = RouteBuilder::from("timer:tick")
2325 .route_id("test-route")
2326 .wire_tap("mock:tap")
2327 .to("mock:result")
2328 .build()
2329 .unwrap();
2330
2331 assert_eq!(definition.steps().len(), 2);
2332 assert!(
2333 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2334 );
2335 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2336 }
2337
2338 #[test]
2341 fn test_multicast_builder_typestate() {
2342 let definition = RouteBuilder::from("timer:tick")
2343 .route_id("test-route")
2344 .multicast()
2345 .to("direct:a")
2346 .to("direct:b")
2347 .end_multicast()
2348 .to("mock:result")
2349 .build()
2350 .unwrap();
2351
2352 assert_eq!(definition.steps().len(), 2); }
2354
2355 #[test]
2356 fn test_multicast_builder_steps_collected() {
2357 let definition = RouteBuilder::from("timer:tick")
2358 .route_id("test-route")
2359 .multicast()
2360 .to("direct:a")
2361 .to("direct:b")
2362 .end_multicast()
2363 .build()
2364 .unwrap();
2365
2366 match &definition.steps()[0] {
2367 BuilderStep::Multicast { steps, .. } => {
2368 assert_eq!(steps.len(), 2);
2369 }
2370 other => panic!("Expected Multicast, got {:?}", other),
2371 }
2372 }
2373
2374 #[test]
2377 fn test_builder_concurrent_sets_concurrency() {
2378 use camel_component_api::ConcurrencyModel;
2379
2380 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2381 .route_id("test-route")
2382 .concurrent(16)
2383 .to("log:info")
2384 .build()
2385 .unwrap();
2386
2387 assert_eq!(
2388 definition.concurrency_override(),
2389 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2390 );
2391 }
2392
2393 #[test]
2394 fn test_builder_concurrent_zero_means_unbounded() {
2395 use camel_component_api::ConcurrencyModel;
2396
2397 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2398 .route_id("test-route")
2399 .concurrent(0)
2400 .to("log:info")
2401 .build()
2402 .unwrap();
2403
2404 assert_eq!(
2405 definition.concurrency_override(),
2406 Some(&ConcurrencyModel::Concurrent { max: None })
2407 );
2408 }
2409
2410 #[test]
2411 fn test_builder_sequential_sets_concurrency() {
2412 use camel_component_api::ConcurrencyModel;
2413
2414 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2415 .route_id("test-route")
2416 .sequential()
2417 .to("log:info")
2418 .build()
2419 .unwrap();
2420
2421 assert_eq!(
2422 definition.concurrency_override(),
2423 Some(&ConcurrencyModel::Sequential)
2424 );
2425 }
2426
2427 #[test]
2428 fn test_builder_default_concurrency_is_none() {
2429 let definition = RouteBuilder::from("timer:tick")
2430 .route_id("test-route")
2431 .to("log:info")
2432 .build()
2433 .unwrap();
2434
2435 assert_eq!(definition.concurrency_override(), None);
2436 }
2437
2438 #[test]
2441 fn test_builder_route_id_sets_id() {
2442 let definition = RouteBuilder::from("timer:tick")
2443 .route_id("my-route")
2444 .build()
2445 .unwrap();
2446
2447 assert_eq!(definition.route_id(), "my-route");
2448 }
2449
2450 #[test]
2451 fn test_build_without_route_id_fails() {
2452 let result = RouteBuilder::from("timer:tick?period=1000")
2453 .to("log:info")
2454 .build();
2455 let err = match result {
2456 Err(e) => e.to_string(),
2457 Ok(_) => panic!("build() should fail without route_id"),
2458 };
2459 assert!(
2460 err.contains("route_id"),
2461 "error should mention route_id, got: {}",
2462 err
2463 );
2464 }
2465
2466 #[test]
2467 fn test_builder_empty_route_id_rejected() {
2468 let result = RouteBuilder::from("timer:tick").route_id("").build();
2469 let err = result.err().expect("empty route_id should be rejected");
2470 assert!(matches!(err, CamelError::RouteError(_)));
2471 }
2472
2473 #[test]
2474 fn test_builder_whitespace_route_id_rejected() {
2475 let result = RouteBuilder::from("timer:tick").route_id(" ").build();
2476 assert!(result.is_err());
2477 }
2478
2479 #[test]
2480 fn test_builder_auto_startup_false() {
2481 let definition = RouteBuilder::from("timer:tick")
2482 .route_id("test-route")
2483 .auto_startup(false)
2484 .build()
2485 .unwrap();
2486
2487 assert!(!definition.auto_startup());
2488 }
2489
2490 #[test]
2491 fn test_builder_startup_order_custom() {
2492 let definition = RouteBuilder::from("timer:tick")
2493 .route_id("test-route")
2494 .startup_order(50)
2495 .build()
2496 .unwrap();
2497
2498 assert_eq!(definition.startup_order(), 50);
2499 }
2500
2501 #[test]
2502 fn test_builder_defaults() {
2503 let definition = RouteBuilder::from("timer:tick")
2504 .route_id("test-route")
2505 .build()
2506 .unwrap();
2507
2508 assert_eq!(definition.route_id(), "test-route");
2509 assert!(definition.auto_startup());
2510 assert_eq!(definition.startup_order(), 1000);
2511 }
2512
2513 #[test]
2516 fn test_choice_builder_single_when() {
2517 let definition = RouteBuilder::from("timer:tick")
2518 .route_id("test-route")
2519 .choice()
2520 .when(|ex: &Exchange| ex.input.header("type").is_some())
2521 .to("mock:typed")
2522 .end_when()
2523 .end_choice()
2524 .build()
2525 .unwrap();
2526 assert_eq!(definition.steps().len(), 1);
2527 assert!(
2528 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2529 if whens.len() == 1 && otherwise.is_none())
2530 );
2531 }
2532
2533 #[test]
2534 fn test_choice_builder_when_otherwise() {
2535 let definition = RouteBuilder::from("timer:tick")
2536 .route_id("test-route")
2537 .choice()
2538 .when(|ex: &Exchange| ex.input.header("a").is_some())
2539 .to("mock:a")
2540 .end_when()
2541 .otherwise()
2542 .to("mock:fallback")
2543 .end_otherwise()
2544 .end_choice()
2545 .build()
2546 .unwrap();
2547 assert!(
2548 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2549 if whens.len() == 1 && otherwise.is_some())
2550 );
2551 }
2552
2553 #[test]
2554 fn test_choice_builder_multiple_whens() {
2555 let definition = RouteBuilder::from("timer:tick")
2556 .route_id("test-route")
2557 .choice()
2558 .when(|ex: &Exchange| ex.input.header("a").is_some())
2559 .to("mock:a")
2560 .end_when()
2561 .when(|ex: &Exchange| ex.input.header("b").is_some())
2562 .to("mock:b")
2563 .end_when()
2564 .end_choice()
2565 .build()
2566 .unwrap();
2567 assert!(
2568 matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2569 if whens.len() == 2)
2570 );
2571 }
2572
2573 #[test]
2574 fn test_choice_step_after_choice() {
2575 let definition = RouteBuilder::from("timer:tick")
2577 .route_id("test-route")
2578 .choice()
2579 .when(|_ex: &Exchange| true)
2580 .to("mock:inner")
2581 .end_when()
2582 .end_choice()
2583 .to("mock:outer") .build()
2585 .unwrap();
2586 assert_eq!(definition.steps().len(), 2);
2587 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2588 }
2589
2590 #[test]
2593 fn test_throttle_builder_typestate() {
2594 let definition = RouteBuilder::from("timer:tick")
2595 .route_id("test-route")
2596 .throttle(10, std::time::Duration::from_secs(1))
2597 .to("mock:result")
2598 .end_throttle()
2599 .build()
2600 .unwrap();
2601
2602 assert_eq!(definition.steps().len(), 1);
2603 assert!(matches!(
2604 &definition.steps()[0],
2605 BuilderStep::Throttle { .. }
2606 ));
2607 }
2608
2609 #[test]
2610 fn test_throttle_builder_with_strategy() {
2611 let definition = RouteBuilder::from("timer:tick")
2612 .route_id("test-route")
2613 .throttle(10, std::time::Duration::from_secs(1))
2614 .strategy(ThrottleStrategy::Reject)
2615 .to("mock:result")
2616 .end_throttle()
2617 .build()
2618 .unwrap();
2619
2620 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2621 assert_eq!(config.strategy, ThrottleStrategy::Reject);
2622 } else {
2623 panic!("Expected Throttle step");
2624 }
2625 }
2626
2627 #[test]
2628 fn test_throttle_builder_steps_collected() {
2629 let definition = RouteBuilder::from("timer:tick")
2630 .route_id("test-route")
2631 .throttle(5, std::time::Duration::from_secs(1))
2632 .set_header("throttled", Value::Bool(true))
2633 .to("mock:throttled")
2634 .end_throttle()
2635 .build()
2636 .unwrap();
2637
2638 match &definition.steps()[0] {
2639 BuilderStep::Throttle { steps, .. } => {
2640 assert_eq!(steps.len(), 2); }
2642 other => panic!("Expected Throttle, got {:?}", other),
2643 }
2644 }
2645
2646 #[test]
2647 fn test_throttle_step_after_throttle() {
2648 let definition = RouteBuilder::from("timer:tick")
2650 .route_id("test-route")
2651 .throttle(10, std::time::Duration::from_secs(1))
2652 .to("mock:inner")
2653 .end_throttle()
2654 .to("mock:outer")
2655 .build()
2656 .unwrap();
2657
2658 assert_eq!(definition.steps().len(), 2);
2659 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2660 }
2661
2662 #[test]
2665 fn test_load_balance_builder_typestate() {
2666 let definition = RouteBuilder::from("timer:tick")
2667 .route_id("test-route")
2668 .load_balance()
2669 .round_robin()
2670 .to("mock:a")
2671 .to("mock:b")
2672 .end_load_balance()
2673 .build()
2674 .unwrap();
2675
2676 assert_eq!(definition.steps().len(), 1);
2677 assert!(matches!(
2678 &definition.steps()[0],
2679 BuilderStep::LoadBalance { .. }
2680 ));
2681 }
2682
2683 #[test]
2684 fn test_load_balance_builder_with_strategy() {
2685 let definition = RouteBuilder::from("timer:tick")
2686 .route_id("test-route")
2687 .load_balance()
2688 .random()
2689 .to("mock:result")
2690 .end_load_balance()
2691 .build()
2692 .unwrap();
2693
2694 if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2695 assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2696 } else {
2697 panic!("Expected LoadBalance step");
2698 }
2699 }
2700
2701 #[test]
2702 fn test_load_balance_builder_steps_collected() {
2703 let definition = RouteBuilder::from("timer:tick")
2704 .route_id("test-route")
2705 .load_balance()
2706 .set_header("lb", Value::Bool(true))
2707 .to("mock:a")
2708 .end_load_balance()
2709 .build()
2710 .unwrap();
2711
2712 match &definition.steps()[0] {
2713 BuilderStep::LoadBalance { steps, .. } => {
2714 assert_eq!(steps.len(), 2); }
2716 other => panic!("Expected LoadBalance, got {:?}", other),
2717 }
2718 }
2719
2720 #[test]
2721 fn test_load_balance_step_after_load_balance() {
2722 let definition = RouteBuilder::from("timer:tick")
2724 .route_id("test-route")
2725 .load_balance()
2726 .to("mock:inner")
2727 .end_load_balance()
2728 .to("mock:outer")
2729 .build()
2730 .unwrap();
2731
2732 assert_eq!(definition.steps().len(), 2);
2733 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2734 }
2735
2736 #[test]
2739 fn test_dynamic_router_builder() {
2740 let definition = RouteBuilder::from("timer:tick")
2741 .route_id("test-route")
2742 .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2743 .build()
2744 .unwrap();
2745
2746 assert_eq!(definition.steps().len(), 1);
2747 assert!(matches!(
2748 &definition.steps()[0],
2749 BuilderStep::DynamicRouter { .. }
2750 ));
2751 }
2752
2753 #[test]
2754 fn test_dynamic_router_builder_with_config() {
2755 let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2756 .max_iterations(100)
2757 .cache_size(500);
2758
2759 let definition = RouteBuilder::from("timer:tick")
2760 .route_id("test-route")
2761 .dynamic_router_with_config(config)
2762 .build()
2763 .unwrap();
2764
2765 assert_eq!(definition.steps().len(), 1);
2766 if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2767 assert_eq!(config.max_iterations, 100);
2768 assert_eq!(config.cache_size, 500);
2769 } else {
2770 panic!("Expected DynamicRouter step");
2771 }
2772 }
2773
2774 #[test]
2775 fn test_dynamic_router_step_after_router() {
2776 let definition = RouteBuilder::from("timer:tick")
2778 .route_id("test-route")
2779 .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2780 .to("mock:outer")
2781 .build()
2782 .unwrap();
2783
2784 assert_eq!(definition.steps().len(), 2);
2785 assert!(matches!(
2786 &definition.steps()[0],
2787 BuilderStep::DynamicRouter { .. }
2788 ));
2789 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2790 }
2791
2792 #[test]
2793 fn routing_slip_builder_creates_step() {
2794 use camel_api::RoutingSlipExpression;
2795
2796 let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2797
2798 let route = RouteBuilder::from("direct:start")
2799 .route_id("routing-slip-test")
2800 .routing_slip(expression)
2801 .build()
2802 .unwrap();
2803
2804 assert!(
2805 matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2806 "Expected RoutingSlip step"
2807 );
2808 }
2809
2810 #[test]
2811 fn routing_slip_with_config_builder_creates_step() {
2812 use camel_api::RoutingSlipConfig;
2813
2814 let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2815 .uri_delimiter("|")
2816 .cache_size(50)
2817 .ignore_invalid_endpoints(true);
2818
2819 let route = RouteBuilder::from("direct:start")
2820 .route_id("routing-slip-config-test")
2821 .routing_slip_with_config(config)
2822 .build()
2823 .unwrap();
2824
2825 if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2826 assert_eq!(config.uri_delimiter, "|");
2827 assert_eq!(config.cache_size, 50);
2828 assert!(config.ignore_invalid_endpoints);
2829 } else {
2830 panic!("Expected RoutingSlip step");
2831 }
2832 }
2833
2834 #[test]
2835 fn test_builder_marshal_adds_processor_step() {
2836 let definition = RouteBuilder::from("timer:tick")
2837 .route_id("test-route")
2838 .marshal("json")
2839 .unwrap()
2840 .build()
2841 .unwrap();
2842 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2843 }
2844
2845 #[test]
2846 fn test_builder_unmarshal_adds_processor_step() {
2847 let definition = RouteBuilder::from("timer:tick")
2848 .route_id("test-route")
2849 .unmarshal("json")
2850 .unwrap()
2851 .build()
2852 .unwrap();
2853 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2854 }
2855
2856 #[test]
2857 fn test_builder_stream_cache_adds_processor_step() {
2858 let definition = RouteBuilder::from("timer:tick")
2859 .route_id("test-route")
2860 .stream_cache(1024)
2861 .build()
2862 .unwrap();
2863 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2864 }
2865
2866 #[test]
2867 fn validate_adds_to_step_with_validator_uri() {
2868 let def = RouteBuilder::from("direct:in")
2869 .route_id("test")
2870 .validate("schemas/order.xsd")
2871 .build()
2872 .unwrap();
2873 let steps = def.steps();
2874 assert_eq!(steps.len(), 1);
2875 assert!(
2876 matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2877 "got: {:?}",
2878 steps[0]
2879 );
2880 }
2881
2882 #[test]
2883 fn test_builder_marshal_returns_err_for_unknown_format() {
2884 let result = RouteBuilder::from("timer:tick")
2885 .route_id("test-route")
2886 .marshal("csv");
2887 let err = match result {
2888 Err(e) => e,
2889 Ok(_) => panic!("marshal with unknown format should return Err"),
2890 };
2891 let msg = err.to_string();
2892 assert!(
2893 msg.contains("unknown data format"),
2894 "error should mention unknown format, got: {msg}"
2895 );
2896 assert!(
2897 msg.contains("csv"),
2898 "error should mention format name, got: {msg}"
2899 );
2900 }
2901
2902 #[test]
2903 fn test_builder_unmarshal_returns_err_for_unknown_format() {
2904 let result = RouteBuilder::from("timer:tick")
2905 .route_id("test-route")
2906 .unmarshal("csv");
2907 let err = match result {
2908 Err(e) => e,
2909 Ok(_) => panic!("unmarshal with unknown format should return Err"),
2910 };
2911 let msg = err.to_string();
2912 assert!(
2913 msg.contains("unknown data format"),
2914 "error should mention unknown format, got: {msg}"
2915 );
2916 assert!(
2917 msg.contains("csv"),
2918 "error should mention format name, got: {msg}"
2919 );
2920 }
2921
2922 #[test]
2923 fn test_builder_recipient_list_creates_step() {
2924 let route = RouteBuilder::from("direct:start")
2925 .route_id("recipient-list-test")
2926 .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2927 .build()
2928 .unwrap();
2929
2930 assert!(matches!(
2931 &route.steps()[0],
2932 BuilderStep::RecipientList { .. }
2933 ));
2934 }
2935
2936 #[test]
2937 fn test_builder_recipient_list_with_config_creates_step() {
2938 let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2939
2940 let route = RouteBuilder::from("direct:start")
2941 .route_id("recipient-list-config-test")
2942 .recipient_list_with_config(config)
2943 .build()
2944 .unwrap();
2945
2946 assert!(matches!(
2947 &route.steps()[0],
2948 BuilderStep::RecipientList { .. }
2949 ));
2950 }
2951
2952 #[test]
2953 fn test_builder_script_adds_script_step() {
2954 let route = RouteBuilder::from("direct:start")
2955 .route_id("script-test")
2956 .script("rhai", "headers[\"x\"] = \"y\"")
2957 .build()
2958 .unwrap();
2959
2960 assert!(matches!(
2961 &route.steps()[0],
2962 BuilderStep::Script { language, script }
2963 if language == "rhai" && script == "headers[\"x\"] = \"y\""
2964 ));
2965 }
2966
2967 #[test]
2968 fn test_builder_delay_and_delay_with_header_add_steps() {
2969 let route = RouteBuilder::from("direct:start")
2970 .route_id("delay-test")
2971 .delay(Duration::from_millis(250))
2972 .delay_with_header(Duration::from_millis(500), "x-delay")
2973 .build()
2974 .unwrap();
2975
2976 assert_eq!(route.steps().len(), 2);
2977 assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
2978 assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
2979 }
2980
2981 #[test]
2982 fn test_builder_log_and_stop_add_steps_in_order() {
2983 let route = RouteBuilder::from("direct:start")
2984 .route_id("log-stop-test")
2985 .log("hello", LogLevel::Info)
2986 .stop()
2987 .to("mock:after")
2988 .build()
2989 .unwrap();
2990
2991 assert_eq!(route.steps().len(), 3);
2992 assert!(matches!(
2993 &route.steps()[0],
2994 BuilderStep::Log { message, .. } if message == "hello"
2995 ));
2996 assert!(matches!(&route.steps()[1], BuilderStep::Stop));
2997 assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
2998 }
2999
3000 #[test]
3001 fn test_builder_stream_cache_default_adds_processor_step() {
3002 let route = RouteBuilder::from("direct:start")
3003 .route_id("stream-cache-default-test")
3004 .stream_cache_default()
3005 .build()
3006 .unwrap();
3007
3008 assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
3009 }
3010
3011 #[test]
3012 fn test_validate_preserves_existing_validator_prefix() {
3013 let route = RouteBuilder::from("direct:in")
3014 .route_id("validate-prefix-test")
3015 .validate("validator:schemas/order.xsd")
3016 .build()
3017 .unwrap();
3018
3019 assert!(matches!(
3020 &route.steps()[0],
3021 BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
3022 ));
3023 }
3024
3025 #[test]
3026 fn test_load_balance_builder_weighted_failover_parallel_config() {
3027 let route = RouteBuilder::from("direct:start")
3028 .route_id("lb-weighted-failover-parallel")
3029 .load_balance()
3030 .weighted(vec![
3031 ("direct:a".to_string(), 3),
3032 ("direct:b".to_string(), 1),
3033 ])
3034 .failover()
3035 .parallel(true)
3036 .to("mock:result")
3037 .end_load_balance()
3038 .build()
3039 .unwrap();
3040
3041 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3042 assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3043 assert!(config.parallel);
3044 } else {
3045 panic!("Expected LoadBalance step");
3046 }
3047 }
3048
3049 #[test]
3050 fn test_multicast_builder_all_config_setters() {
3051 let route = RouteBuilder::from("direct:start")
3052 .route_id("multicast-config-test")
3053 .multicast()
3054 .parallel(true)
3055 .parallel_limit(4)
3056 .stop_on_exception(true)
3057 .timeout(Duration::from_millis(300))
3058 .aggregation(MulticastStrategy::Original)
3059 .to("mock:a")
3060 .end_multicast()
3061 .build()
3062 .unwrap();
3063
3064 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3065 assert!(config.parallel);
3066 assert_eq!(config.parallel_limit, Some(4));
3067 assert!(config.stop_on_exception);
3068 assert_eq!(config.timeout, Some(Duration::from_millis(300)));
3069 assert!(matches!(config.aggregation, MulticastStrategy::Original));
3070 } else {
3071 panic!("Expected Multicast step");
3072 }
3073 }
3074
3075 #[test]
3076 fn test_build_canonical_rejects_unsupported_processor_step() {
3077 let err = RouteBuilder::from("direct:start")
3078 .route_id("canonical-reject")
3079 .set_header("k", Value::String("v".into()))
3080 .build_canonical()
3081 .unwrap_err();
3082
3083 assert!(format!("{err}").contains("does not support step `processor`"));
3084 }
3085
3086 #[test]
3089 fn test_load_balance_builder_weighted_strategy() {
3090 let route = RouteBuilder::from("direct:start")
3091 .route_id("lb-weighted")
3092 .load_balance()
3093 .weighted(vec![
3094 ("direct:a".to_string(), 5),
3095 ("direct:b".to_string(), 2),
3096 ("direct:c".to_string(), 1),
3097 ])
3098 .to("mock:result")
3099 .end_load_balance()
3100 .build()
3101 .unwrap();
3102
3103 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3104 assert!(matches!(config.strategy, LoadBalanceStrategy::Weighted(_)));
3105 } else {
3106 panic!("Expected LoadBalance step");
3107 }
3108 }
3109
3110 #[test]
3111 fn test_load_balance_builder_failover_strategy() {
3112 let route = RouteBuilder::from("direct:start")
3113 .route_id("lb-failover")
3114 .load_balance()
3115 .failover()
3116 .to("mock:primary")
3117 .end_load_balance()
3118 .build()
3119 .unwrap();
3120
3121 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3122 assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3123 assert!(!config.parallel);
3124 } else {
3125 panic!("Expected LoadBalance step");
3126 }
3127 }
3128
3129 #[test]
3130 fn test_load_balance_builder_parallel_false_explicit() {
3131 let route = RouteBuilder::from("direct:start")
3132 .route_id("lb-parallel-false")
3133 .load_balance()
3134 .round_robin()
3135 .parallel(false)
3136 .to("mock:result")
3137 .end_load_balance()
3138 .build()
3139 .unwrap();
3140
3141 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3142 assert!(!config.parallel);
3143 } else {
3144 panic!("Expected LoadBalance step");
3145 }
3146 }
3147
3148 #[test]
3151 fn test_filter_in_split_builder_typestate() {
3152 use camel_api::splitter::{SplitterConfig, split_body_lines};
3153
3154 let definition = RouteBuilder::from("timer:test")
3155 .route_id("filter-in-split")
3156 .split(SplitterConfig::new(split_body_lines()))
3157 .filter(|_ex| true)
3158 .to("mock:filtered")
3159 .end_filter()
3160 .end_split()
3161 .build()
3162 .unwrap();
3163
3164 assert_eq!(definition.steps().len(), 1);
3165 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3166 assert_eq!(steps.len(), 1);
3167 assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3168 } else {
3169 panic!("Expected Split step");
3170 }
3171 }
3172
3173 #[test]
3174 fn test_filter_in_split_builder_multiple_steps() {
3175 use camel_api::splitter::{SplitterConfig, split_body_lines};
3176
3177 let definition = RouteBuilder::from("timer:test")
3178 .route_id("filter-in-split-multi")
3179 .split(SplitterConfig::new(split_body_lines()))
3180 .to("mock:before-filter")
3181 .filter(|_ex| true)
3182 .to("mock:inside-filter")
3183 .end_filter()
3184 .to("mock:after-filter")
3185 .end_split()
3186 .build()
3187 .unwrap();
3188
3189 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3190 assert_eq!(steps.len(), 3);
3192 } else {
3193 panic!("Expected Split step");
3194 }
3195 }
3196
3197 #[test]
3200 fn test_build_canonical_with_circuit_breaker() {
3201 use camel_api::circuit_breaker::CircuitBreakerConfig;
3202
3203 let spec = RouteBuilder::from("direct:start")
3204 .route_id("canonical-cb")
3205 .circuit_breaker(CircuitBreakerConfig::new().failure_threshold(10))
3206 .to("mock:result")
3207 .build_canonical()
3208 .unwrap();
3209
3210 let cb = spec.circuit_breaker.expect("circuit breaker should be set");
3211 assert_eq!(cb.failure_threshold, 10);
3212 }
3213
3214 #[test]
3215 fn test_build_canonical_rejects_custom_split_aggregation() {
3216 use camel_api::splitter::{SplitterConfig, split_body_lines};
3217
3218 let err = RouteBuilder::from("direct:start")
3219 .route_id("canonical-custom-split")
3220 .split(SplitterConfig::new(split_body_lines()).aggregation(
3221 camel_api::splitter::AggregationStrategy::Custom(Arc::new(|_, ex| ex)),
3222 ))
3223 .to("mock:frag")
3224 .end_split()
3225 .build_canonical()
3226 .unwrap_err();
3227
3228 assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3230 }
3231
3232 #[test]
3233 fn test_build_canonical_rejects_custom_aggregate_strategy() {
3234 let err = RouteBuilder::from("direct:start")
3235 .route_id("canonical-custom-agg")
3236 .aggregate(
3237 AggregatorConfig::correlate_by("key")
3238 .complete_when_size(2)
3239 .strategy(AggregationStrategy::Custom(Arc::new(|_, ex| ex)))
3240 .build()
3241 .unwrap(),
3242 )
3243 .build_canonical()
3244 .unwrap_err();
3245
3246 assert!(format!("{err}").contains("custom aggregate strategy"));
3247 }
3248
3249 #[test]
3250 fn test_build_canonical_rejects_fn_correlation_strategy() {
3251 let err = RouteBuilder::from("direct:start")
3252 .route_id("canonical-fn-corr")
3253 .aggregate(AggregatorConfig {
3254 header_name: "key".to_string(),
3255 completion: CompletionMode::Single(CompletionCondition::Size(1)),
3256 correlation: CorrelationStrategy::Fn(Arc::new(|_| Some("key".to_string()))),
3257 strategy: AggregationStrategy::CollectAll,
3258 max_buckets: None,
3259 bucket_ttl: None,
3260 force_completion_on_stop: false,
3261 discard_on_timeout: false,
3262 })
3263 .build_canonical()
3264 .unwrap_err();
3265
3266 assert!(format!("{err}").contains("Fn correlation strategy"));
3267 }
3268
3269 #[test]
3270 fn test_build_canonical_rejects_predicate_completion() {
3271 let err = RouteBuilder::from("direct:start")
3272 .route_id("canonical-pred-completion")
3273 .aggregate(AggregatorConfig {
3274 header_name: "key".to_string(),
3275 completion: CompletionMode::Single(CompletionCondition::Predicate(Arc::new(
3276 |_| false,
3277 ))),
3278 correlation: CorrelationStrategy::HeaderName("key".to_string()),
3279 strategy: AggregationStrategy::CollectAll,
3280 max_buckets: None,
3281 bucket_ttl: None,
3282 force_completion_on_stop: false,
3283 discard_on_timeout: false,
3284 })
3285 .build_canonical()
3286 .unwrap_err();
3287
3288 assert!(format!("{err}").contains("predicate completion"));
3289 }
3290
3291 #[test]
3292 fn test_build_canonical_with_expression_correlation() {
3293 let spec = RouteBuilder::from("direct:start")
3294 .route_id("canonical-expr-corr")
3295 .aggregate(AggregatorConfig {
3296 header_name: "key".to_string(),
3297 completion: CompletionMode::Single(CompletionCondition::Size(1)),
3298 correlation: CorrelationStrategy::Expression {
3299 expr: "header.key".to_string(),
3300 language: "simple".to_string(),
3301 },
3302 strategy: AggregationStrategy::CollectAll,
3303 max_buckets: None,
3304 bucket_ttl: None,
3305 force_completion_on_stop: false,
3306 discard_on_timeout: false,
3307 })
3308 .build_canonical()
3309 .unwrap();
3310
3311 assert!(spec.steps.iter().any(|s| matches!(s, CanonicalStepSpec::Aggregate(a) if a.correlation_key == Some("header.key".to_string()))));
3312 }
3313
3314 #[test]
3315 fn test_build_canonical_split_rejected_with_closure_expression() {
3316 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
3317
3318 let err = RouteBuilder::from("direct:start")
3320 .route_id("canonical-split-last")
3321 .split(
3322 SplitterConfig::new(split_body_lines()).aggregation(AggregationStrategy::LastWins),
3323 )
3324 .to("mock:frag")
3325 .end_split()
3326 .build_canonical()
3327 .unwrap_err();
3328
3329 assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3330 }
3331
3332 #[test]
3335 fn test_on_exception_full_chain_retry_backoff_jitter_handled_by() {
3336 let definition = RouteBuilder::from("direct:start")
3337 .route_id("on-exception-full")
3338 .dead_letter_channel("log:dlc")
3339 .on_exception(|e| matches!(e, CamelError::Io(_)))
3340 .retry(5)
3341 .with_backoff(Duration::from_millis(10), 2.0, Duration::from_millis(500))
3342 .with_jitter(0.3)
3343 .handled_by("log:io-handler")
3344 .end_on_exception()
3345 .to("mock:out")
3346 .build()
3347 .unwrap();
3348
3349 let cfg = definition
3350 .error_handler_config()
3351 .expect("error handler should be set");
3352 assert_eq!(cfg.policies.len(), 1);
3353 let policy = &cfg.policies[0];
3354 let retry = policy.retry.as_ref().expect("retry should be set");
3355 assert_eq!(retry.max_attempts, 5);
3356 assert_eq!(retry.initial_delay, Duration::from_millis(10));
3357 assert_eq!(retry.multiplier, 2.0);
3358 assert_eq!(retry.max_delay, Duration::from_millis(500));
3359 assert!((retry.jitter_factor - 0.3).abs() < f64::EPSILON);
3360 assert_eq!(policy.handled_by.as_deref(), Some("log:io-handler"));
3361 }
3362
3363 #[test]
3364 fn test_on_exception_jitter_clamped_to_valid_range() {
3365 let definition = RouteBuilder::from("direct:start")
3366 .route_id("jitter-clamp")
3367 .on_exception(|_e| true)
3368 .retry(1)
3369 .with_jitter(5.0)
3370 .end_on_exception()
3371 .to("mock:out")
3372 .build()
3373 .unwrap();
3374
3375 let cfg = definition.error_handler_config().unwrap();
3376 let retry = cfg.policies[0].retry.as_ref().unwrap();
3377 assert!((retry.jitter_factor - 1.0).abs() < f64::EPSILON);
3378 }
3379
3380 #[test]
3383 fn test_builder_process_fn_adds_processor_step() {
3384 use camel_api::BoxProcessorExt;
3385 let processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
3386 let definition = RouteBuilder::from("timer:tick")
3387 .route_id("process-fn-test")
3388 .process_fn(processor)
3389 .build()
3390 .unwrap();
3391
3392 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3393 }
3394
3395 #[test]
3396 fn test_builder_convert_body_to_adds_processor_step() {
3397 let definition = RouteBuilder::from("timer:tick")
3398 .route_id("convert-body-test")
3399 .convert_body_to(BodyType::Json)
3400 .build()
3401 .unwrap();
3402
3403 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3404 }
3405
3406 #[test]
3407 fn test_builder_bean_adds_bean_step() {
3408 let definition = RouteBuilder::from("timer:tick")
3409 .route_id("bean-test")
3410 .bean("myBean", "process")
3411 .build()
3412 .unwrap();
3413
3414 assert!(
3415 matches!(&definition.steps()[0], BuilderStep::Bean { name, method }
3416 if name == "myBean" && method == "process")
3417 );
3418 }
3419
3420 #[test]
3423 fn test_throttle_builder_delay_strategy() {
3424 let definition = RouteBuilder::from("timer:tick")
3425 .route_id("throttle-delay")
3426 .throttle(10, Duration::from_secs(1))
3427 .strategy(ThrottleStrategy::Delay)
3428 .to("mock:result")
3429 .end_throttle()
3430 .build()
3431 .unwrap();
3432
3433 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3434 assert_eq!(config.strategy, ThrottleStrategy::Delay);
3435 } else {
3436 panic!("Expected Throttle step");
3437 }
3438 }
3439
3440 #[test]
3441 fn test_throttle_builder_drop_strategy() {
3442 let definition = RouteBuilder::from("timer:tick")
3443 .route_id("throttle-drop")
3444 .throttle(10, Duration::from_secs(1))
3445 .strategy(ThrottleStrategy::Drop)
3446 .to("mock:result")
3447 .end_throttle()
3448 .build()
3449 .unwrap();
3450
3451 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3452 assert_eq!(config.strategy, ThrottleStrategy::Drop);
3453 } else {
3454 panic!("Expected Throttle step");
3455 }
3456 }
3457
3458 #[test]
3461 fn test_nested_loop_while_builder() {
3462 use camel_api::loop_eip::LoopMode;
3463
3464 let def = RouteBuilder::from("direct:start")
3465 .route_id("nested-loop-while")
3466 .loop_count(2)
3467 .to("mock:outer")
3468 .loop_while(|_ex| true)
3469 .to("mock:inner")
3470 .end_loop()
3471 .end_loop()
3472 .build()
3473 .unwrap();
3474
3475 assert_eq!(def.steps().len(), 1);
3476 if let BuilderStep::Loop { steps, .. } = &def.steps()[0] {
3477 assert_eq!(steps.len(), 2);
3478 if let BuilderStep::Loop { config, .. } = &steps[1] {
3479 assert!(matches!(config.mode, LoopMode::While(_)));
3480 } else {
3481 panic!("Expected inner Loop step");
3482 }
3483 } else {
3484 panic!("Expected outer Loop step");
3485 }
3486 }
3487
3488 #[test]
3491 fn test_choice_builder_multiple_whens_with_otherwise() {
3492 let definition = RouteBuilder::from("timer:tick")
3493 .route_id("choice-multi-otherwise")
3494 .choice()
3495 .when(|ex: &Exchange| ex.input.header("a").is_some())
3496 .to("mock:a")
3497 .end_when()
3498 .when(|ex: &Exchange| ex.input.header("b").is_some())
3499 .to("mock:b")
3500 .end_when()
3501 .when(|ex: &Exchange| ex.input.header("c").is_some())
3502 .to("mock:c")
3503 .end_when()
3504 .otherwise()
3505 .to("mock:fallback")
3506 .end_otherwise()
3507 .end_choice()
3508 .build()
3509 .unwrap();
3510
3511 if let BuilderStep::Choice { whens, otherwise } = &definition.steps()[0] {
3512 assert_eq!(whens.len(), 3);
3513 assert!(otherwise.is_some());
3514 assert_eq!(otherwise.as_ref().unwrap().len(), 1);
3515 } else {
3516 panic!("Expected Choice step");
3517 }
3518 }
3519
3520 #[test]
3523 fn test_multicast_builder_parallel_only() {
3524 let route = RouteBuilder::from("direct:start")
3525 .route_id("multicast-parallel")
3526 .multicast()
3527 .parallel(true)
3528 .to("mock:a")
3529 .end_multicast()
3530 .build()
3531 .unwrap();
3532
3533 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3534 assert!(config.parallel);
3535 assert_eq!(config.parallel_limit, None);
3536 } else {
3537 panic!("Expected Multicast step");
3538 }
3539 }
3540
3541 #[test]
3542 fn test_multicast_builder_timeout_only() {
3543 let route = RouteBuilder::from("direct:start")
3544 .route_id("multicast-timeout")
3545 .multicast()
3546 .timeout(Duration::from_secs(5))
3547 .to("mock:a")
3548 .end_multicast()
3549 .build()
3550 .unwrap();
3551
3552 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3553 assert_eq!(config.timeout, Some(Duration::from_secs(5)));
3554 } else {
3555 panic!("Expected Multicast step");
3556 }
3557 }
3558
3559 #[test]
3560 fn test_multicast_builder_aggregation_collect_all() {
3561 let route = RouteBuilder::from("direct:start")
3562 .route_id("multicast-collect")
3563 .multicast()
3564 .aggregation(MulticastStrategy::CollectAll)
3565 .to("mock:a")
3566 .end_multicast()
3567 .build()
3568 .unwrap();
3569
3570 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3571 assert!(matches!(config.aggregation, MulticastStrategy::CollectAll));
3572 } else {
3573 panic!("Expected Multicast step");
3574 }
3575 }
3576
3577 #[test]
3580 fn test_build_canonical_aggregate_any_completion_mode() {
3581 let spec = RouteBuilder::from("direct:start")
3582 .route_id("canonical-any-completion")
3583 .aggregate(
3584 AggregatorConfig::correlate_by("key")
3585 .complete_on_size_or_timeout(10, Duration::from_secs(30))
3586 .build()
3587 .unwrap(),
3588 )
3589 .build_canonical()
3590 .unwrap();
3591
3592 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3593 assert_eq!(agg.completion_size, Some(10));
3594 assert_eq!(agg.completion_timeout_ms, Some(30_000));
3595 } else {
3596 panic!("Expected Aggregate step");
3597 }
3598 }
3599
3600 #[test]
3601 fn test_build_canonical_aggregate_timeout_completion() {
3602 let spec = RouteBuilder::from("direct:start")
3603 .route_id("canonical-timeout-completion")
3604 .aggregate(
3605 AggregatorConfig::correlate_by("key")
3606 .complete_on_timeout(Duration::from_millis(500))
3607 .build()
3608 .unwrap(),
3609 )
3610 .build_canonical()
3611 .unwrap();
3612
3613 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3614 assert_eq!(agg.completion_size, None);
3615 assert_eq!(agg.completion_timeout_ms, Some(500));
3616 } else {
3617 panic!("Expected Aggregate step");
3618 }
3619 }
3620
3621 #[test]
3624 fn test_build_canonical_aggregate_discard_on_timeout() {
3625 use camel_api::aggregator::AggregatorConfig;
3626
3627 let spec = RouteBuilder::from("direct:start")
3628 .route_id("canonical-discard-timeout")
3629 .aggregate(
3630 AggregatorConfig::correlate_by("key")
3631 .complete_when_size(1)
3632 .discard_on_timeout(true)
3633 .build()
3634 .unwrap(),
3635 )
3636 .build_canonical()
3637 .unwrap();
3638
3639 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3640 assert_eq!(agg.discard_on_timeout, Some(true));
3641 } else {
3642 panic!("Expected Aggregate step");
3643 }
3644 }
3645
3646 #[test]
3647 fn test_build_canonical_aggregate_force_completion_on_stop() {
3648 use camel_api::aggregator::AggregatorConfig;
3649
3650 let spec = RouteBuilder::from("direct:start")
3651 .route_id("canonical-force-stop")
3652 .aggregate(
3653 AggregatorConfig::correlate_by("key")
3654 .complete_when_size(1)
3655 .force_completion_on_stop(true)
3656 .build()
3657 .unwrap(),
3658 )
3659 .build_canonical()
3660 .unwrap();
3661
3662 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3663 assert_eq!(agg.force_completion_on_stop, Some(true));
3664 } else {
3665 panic!("Expected Aggregate step");
3666 }
3667 }
3668
3669 #[test]
3672 fn test_build_canonical_aggregate_max_buckets_and_ttl() {
3673 use camel_api::aggregator::AggregatorConfig;
3674
3675 let spec = RouteBuilder::from("direct:start")
3676 .route_id("canonical-buckets-ttl")
3677 .aggregate(
3678 AggregatorConfig::correlate_by("key")
3679 .complete_when_size(1)
3680 .max_buckets(100)
3681 .bucket_ttl(Duration::from_secs(60))
3682 .build()
3683 .unwrap(),
3684 )
3685 .build_canonical()
3686 .unwrap();
3687
3688 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3689 assert_eq!(agg.max_buckets, Some(100));
3690 assert_eq!(agg.bucket_ttl_ms, Some(60_000));
3691 } else {
3692 panic!("Expected Aggregate step");
3693 }
3694 }
3695
3696 #[test]
3699 fn test_split_builder_with_filter_inside() {
3700 use camel_api::splitter::{SplitterConfig, split_body_lines};
3701
3702 let definition = RouteBuilder::from("timer:test")
3703 .route_id("split-with-filter")
3704 .split(SplitterConfig::new(split_body_lines()))
3705 .filter(|_ex| true)
3706 .to("mock:filtered-frag")
3707 .end_filter()
3708 .end_split()
3709 .build()
3710 .unwrap();
3711
3712 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3713 assert_eq!(steps.len(), 1);
3714 assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3715 } else {
3716 panic!("Expected Split step");
3717 }
3718 }
3719
3720 #[test]
3723 fn test_wire_tap_multiple_taps() {
3724 let definition = RouteBuilder::from("timer:tick")
3725 .route_id("multi-wire-tap")
3726 .wire_tap("mock:tap1")
3727 .wire_tap("mock:tap2")
3728 .to("mock:result")
3729 .build()
3730 .unwrap();
3731
3732 assert_eq!(definition.steps().len(), 3);
3733 assert!(
3734 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap1")
3735 );
3736 assert!(
3737 matches!(&definition.steps()[1], BuilderStep::WireTap { uri } if uri == "mock:tap2")
3738 );
3739 }
3740
3741 #[test]
3744 fn test_builder_shorthand_then_explicit_mixed_mode() {
3745 let result = RouteBuilder::from("direct:start")
3746 .route_id("mixed-mode-2")
3747 .dead_letter_channel("log:dlc")
3748 .error_handler(ErrorHandlerConfig::log_only())
3749 .to("mock:out")
3750 .build();
3751
3752 let err = result.err().expect("mixed mode should fail");
3753 assert!(format!("{err}").contains("mixed error handler modes"));
3754 }
3755
3756 #[test]
3759 fn test_build_canonical_empty_from_uri_errors() {
3760 let result = RouteBuilder::from("").route_id("test").build_canonical();
3761 assert!(result.is_err());
3762 }
3763
3764 #[test]
3765 fn test_build_canonical_missing_route_id_errors() {
3766 let result = RouteBuilder::from("direct:start").build_canonical();
3767 assert!(result.is_err());
3768 let err = result.unwrap_err().to_string();
3769 assert!(err.contains("route_id"));
3770 }
3771
3772 #[test]
3775 fn test_split_builder_with_aggregate_inside() {
3776 use camel_api::aggregator::AggregatorConfig;
3777 use camel_api::splitter::{SplitterConfig, split_body_lines};
3778
3779 let definition = RouteBuilder::from("timer:test")
3780 .route_id("split-agg")
3781 .split(SplitterConfig::new(split_body_lines()))
3782 .aggregate(
3783 AggregatorConfig::correlate_by("frag-key")
3784 .complete_when_size(3)
3785 .build()
3786 .unwrap(),
3787 )
3788 .end_split()
3789 .build()
3790 .unwrap();
3791
3792 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3793 assert_eq!(steps.len(), 1);
3794 assert!(matches!(&steps[0], BuilderStep::Aggregate { .. }));
3795 } else {
3796 panic!("Expected Split step");
3797 }
3798 }
3799
3800 #[test]
3803 fn test_throttle_builder_with_steps_inside() {
3804 let definition = RouteBuilder::from("timer:tick")
3805 .route_id("throttle-steps")
3806 .throttle(10, Duration::from_secs(1))
3807 .set_header("throttled", Value::Bool(true))
3808 .to("mock:throttled")
3809 .end_throttle()
3810 .build()
3811 .unwrap();
3812
3813 if let BuilderStep::Throttle { steps, .. } = &definition.steps()[0] {
3814 assert_eq!(steps.len(), 2);
3815 } else {
3816 panic!("Expected Throttle step");
3817 }
3818 }
3819
3820 #[test]
3823 fn test_load_balance_builder_with_steps_inside() {
3824 let definition = RouteBuilder::from("timer:tick")
3825 .route_id("lb-steps")
3826 .load_balance()
3827 .round_robin()
3828 .set_header("lb", Value::Bool(true))
3829 .to("mock:lb")
3830 .end_load_balance()
3831 .build()
3832 .unwrap();
3833
3834 if let BuilderStep::LoadBalance { steps, .. } = &definition.steps()[0] {
3835 assert_eq!(steps.len(), 2);
3836 } else {
3837 panic!("Expected LoadBalance step");
3838 }
3839 }
3840
3841 #[test]
3844 fn test_multicast_builder_with_steps_inside() {
3845 let definition = RouteBuilder::from("timer:tick")
3846 .route_id("multicast-steps")
3847 .multicast()
3848 .set_header("mc", Value::Bool(true))
3849 .to("mock:multicast")
3850 .end_multicast()
3851 .build()
3852 .unwrap();
3853
3854 if let BuilderStep::Multicast { steps, .. } = &definition.steps()[0] {
3855 assert_eq!(steps.len(), 2);
3856 } else {
3857 panic!("Expected Multicast step");
3858 }
3859 }
3860
3861 #[test]
3864 fn test_loop_builder_with_steps_inside() {
3865 let definition = RouteBuilder::from("timer:tick")
3866 .route_id("loop-steps")
3867 .loop_count(3)
3868 .set_header("loop", Value::Bool(true))
3869 .to("mock:loop")
3870 .end_loop()
3871 .build()
3872 .unwrap();
3873
3874 if let BuilderStep::Loop { steps, .. } = &definition.steps()[0] {
3875 assert_eq!(steps.len(), 2);
3876 } else {
3877 panic!("Expected Loop step");
3878 }
3879 }
3880
3881 #[test]
3884 fn test_build_canonical_rejects_loop_step() {
3885 let err = RouteBuilder::from("direct:start")
3886 .route_id("canonical-loop")
3887 .loop_count(3)
3888 .to("mock:loop")
3889 .end_loop()
3890 .build_canonical()
3891 .unwrap_err();
3892
3893 assert!(format!("{err}").contains("does not support step `loop`"));
3894 }
3895
3896 #[test]
3897 fn test_build_canonical_rejects_multicast_step() {
3898 let err = RouteBuilder::from("direct:start")
3899 .route_id("canonical-multicast")
3900 .multicast()
3901 .to("mock:a")
3902 .end_multicast()
3903 .build_canonical()
3904 .unwrap_err();
3905
3906 assert!(format!("{err}").contains("does not support step `multicast`"));
3907 }
3908
3909 #[test]
3910 fn test_build_canonical_rejects_throttle_step() {
3911 let err = RouteBuilder::from("direct:start")
3912 .route_id("canonical-throttle")
3913 .throttle(10, Duration::from_secs(1))
3914 .to("mock:result")
3915 .end_throttle()
3916 .build_canonical()
3917 .unwrap_err();
3918
3919 assert!(format!("{err}").contains("does not support step `throttle`"));
3920 }
3921
3922 #[test]
3923 fn test_build_canonical_rejects_load_balancer_step() {
3924 let err = RouteBuilder::from("direct:start")
3925 .route_id("canonical-lb")
3926 .load_balance()
3927 .round_robin()
3928 .to("mock:result")
3929 .end_load_balance()
3930 .build_canonical()
3931 .unwrap_err();
3932
3933 assert!(format!("{err}").contains("does not support step `load_balancer`"));
3934 }
3935
3936 #[test]
3937 fn test_build_canonical_rejects_bean_step() {
3938 let err = RouteBuilder::from("direct:start")
3939 .route_id("canonical-bean")
3940 .bean("myBean", "process")
3941 .build_canonical()
3942 .unwrap_err();
3943
3944 assert!(format!("{err}").contains("does not support step `bean`"));
3945 }
3946
3947 #[test]
3948 fn test_build_canonical_rejects_script_step() {
3949 let err = RouteBuilder::from("direct:start")
3950 .route_id("canonical-script")
3951 .script("rhai", "x = 1")
3952 .build_canonical()
3953 .unwrap_err();
3954
3955 assert!(format!("{err}").contains("does not support step `script`"));
3956 }
3957
3958 #[test]
3959 fn test_build_canonical_accepts_delay_step() {
3960 let spec = RouteBuilder::from("direct:start")
3961 .route_id("canonical-delay")
3962 .delay(Duration::from_millis(100))
3963 .build_canonical()
3964 .unwrap();
3965
3966 assert!(
3967 spec.steps.iter().any(
3968 |s| matches!(s, CanonicalStepSpec::Delay { delay_ms, .. } if *delay_ms == 100)
3969 )
3970 );
3971 }
3972
3973 #[test]
3974 fn test_build_canonical_accepts_wire_tap_step() {
3975 let spec = RouteBuilder::from("direct:start")
3976 .route_id("canonical-wiretap")
3977 .wire_tap("mock:tap")
3978 .build_canonical()
3979 .unwrap();
3980
3981 assert!(
3982 spec.steps
3983 .iter()
3984 .any(|s| matches!(s, CanonicalStepSpec::WireTap { uri } if uri == "mock:tap"))
3985 );
3986 }
3987
3988 #[test]
3989 fn test_build_canonical_rejects_dynamic_router_step() {
3990 let err = RouteBuilder::from("direct:start")
3991 .route_id("canonical-dyn-router")
3992 .dynamic_router(Arc::new(|_| Some("mock:a".to_string())))
3993 .build_canonical()
3994 .unwrap_err();
3995
3996 assert!(format!("{err}").contains("does not support step `dynamic_router`"));
3997 }
3998
3999 #[test]
4000 fn test_build_canonical_rejects_routing_slip_step() {
4001 let err = RouteBuilder::from("direct:start")
4002 .route_id("canonical-routing-slip")
4003 .routing_slip(Arc::new(|_| Some("mock:a".to_string())))
4004 .build_canonical()
4005 .unwrap_err();
4006
4007 assert!(format!("{err}").contains("does not support step `routing_slip`"));
4008 }
4009
4010 #[test]
4011 fn test_build_canonical_rejects_recipient_list_step() {
4012 let err = RouteBuilder::from("direct:start")
4013 .route_id("canonical-recipient")
4014 .recipient_list(Arc::new(|_| "mock:a".to_string()))
4015 .build_canonical()
4016 .unwrap_err();
4017
4018 assert!(format!("{err}").contains("does not support step `recipient_list`"));
4019 }
4020
4021 #[test]
4024 fn test_build_canonical_rejects_any_mode_with_predicate() {
4025 let err = RouteBuilder::from("direct:start")
4026 .route_id("canonical-any-pred")
4027 .aggregate(AggregatorConfig {
4028 header_name: "key".to_string(),
4029 completion: CompletionMode::Any(vec![
4030 CompletionCondition::Size(5),
4031 CompletionCondition::Predicate(Arc::new(|_| false)),
4032 ]),
4033 correlation: CorrelationStrategy::HeaderName("key".to_string()),
4034 strategy: AggregationStrategy::CollectAll,
4035 max_buckets: None,
4036 bucket_ttl: None,
4037 force_completion_on_stop: false,
4038 discard_on_timeout: false,
4039 })
4040 .build_canonical()
4041 .unwrap_err();
4042
4043 assert!(format!("{err}").contains("predicate completion"));
4044 }
4045
4046 #[test]
4049 fn test_builder_validation_missing_from_uri() {
4050 let result = RouteBuilder::from("")
4051 .route_id("missing-uri-route")
4052 .to("log:info")
4053 .build();
4054 assert!(result.is_err(), "empty from URI should fail validation");
4055 let err = result.err().unwrap().to_string();
4056 assert!(
4057 err.contains("'from'") || err.contains("URI"),
4058 "error should mention from/URI, got: {err}"
4059 );
4060 }
4061
4062 #[test]
4063 fn test_builder_validation_invalid_step_uri_scheme() {
4064 let result = RouteBuilder::from("timer:tick")
4065 .route_id("bad-step-route")
4066 .to("not-a-valid-uri") .build();
4068 assert!(
4071 result.is_ok(),
4072 "builder should accept opaque step URIs; resolution happens later"
4073 );
4074 }
4075
4076 #[test]
4079 fn test_builder_duplicate_route_ids_produce_identical_definitions() {
4080 let route1 = RouteBuilder::from("direct:a")
4083 .route_id("dup-route")
4084 .to("mock:out")
4085 .build();
4086 let route2 = RouteBuilder::from("direct:b")
4087 .route_id("dup-route")
4088 .to("mock:out")
4089 .build();
4090
4091 assert!(route1.is_ok());
4092 assert!(route2.is_ok());
4093 assert_eq!(route1.unwrap().route_id(), route2.unwrap().route_id());
4094 }
4095}