1use camel_api::DelayConfig;
7use camel_api::aggregator::{
8 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
9};
10use camel_api::body::Body;
11use camel_api::body_converter::BodyType;
12use camel_api::circuit_breaker::CircuitBreakerConfig;
13use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
14use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
15use camel_api::load_balancer::LoadBalancerConfig;
16use camel_api::loop_eip::{LoopConfig, LoopMode};
17use camel_api::multicast::{MulticastConfig, MulticastStrategy};
18use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
19use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
20use camel_api::splitter::SplitterConfig;
21use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
22use camel_api::{
23 BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
24 ProcessorFn, Value,
25 runtime::{
26 CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
27 CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
28 CanonicalWhenSpec,
29 },
30};
31use camel_component_api::ConcurrencyModel;
32use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
33use camel_processor::{
34 ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
35 StreamCacheService, UnmarshalService, builtin_data_format,
36};
37
38pub trait StepAccumulator: Sized {
44 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
45
46 fn to(mut self, endpoint: impl Into<String>) -> Self {
47 self.steps_mut().push(BuilderStep::To(endpoint.into()));
48 self
49 }
50
51 fn process<F, Fut>(mut self, f: F) -> Self
52 where
53 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
54 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
55 {
56 let svc = ProcessorFn::new(f);
57 self.steps_mut()
58 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
59 self
60 }
61
62 fn process_fn(mut self, processor: BoxProcessor) -> Self {
63 self.steps_mut().push(BuilderStep::Processor(processor));
64 self
65 }
66
67 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
68 let svc = SetHeader::new(IdentityProcessor, key, value);
69 self.steps_mut()
70 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
71 self
72 }
73
74 fn map_body<F>(mut self, mapper: F) -> Self
75 where
76 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
77 {
78 let svc = MapBody::new(IdentityProcessor, mapper);
79 self.steps_mut()
80 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
81 self
82 }
83
84 fn set_body<B>(mut self, body: B) -> Self
85 where
86 B: Into<Body> + Clone + Send + Sync + 'static,
87 {
88 let body: Body = body.into();
89 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
90 self.steps_mut()
91 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
92 self
93 }
94
95 fn transform<B>(self, body: B) -> Self
100 where
101 B: Into<Body> + Clone + Send + Sync + 'static,
102 {
103 self.set_body(body)
104 }
105
106 fn set_body_fn<F>(mut self, expr: F) -> Self
107 where
108 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
109 {
110 let svc = SetBody::new(IdentityProcessor, expr);
111 self.steps_mut()
112 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
113 self
114 }
115
116 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
117 where
118 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
119 {
120 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
121 self.steps_mut()
122 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
123 self
124 }
125
126 fn aggregate(mut self, config: AggregatorConfig) -> Self {
127 self.steps_mut().push(BuilderStep::Aggregate { config });
128 self
129 }
130
131 fn stop(mut self) -> Self {
137 self.steps_mut().push(BuilderStep::Stop);
138 self
139 }
140
141 fn delay(mut self, duration: std::time::Duration) -> Self {
142 self.steps_mut().push(BuilderStep::Delay {
143 config: DelayConfig::from_duration(duration),
144 });
145 self
146 }
147
148 fn delay_with_header(
149 mut self,
150 duration: std::time::Duration,
151 header: impl Into<String>,
152 ) -> Self {
153 self.steps_mut().push(BuilderStep::Delay {
154 config: DelayConfig::from_duration_with_header(duration, header),
155 });
156 self
157 }
158
159 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
163 self.steps_mut().push(BuilderStep::Log {
164 level,
165 message: message.into(),
166 });
167 self
168 }
169
170 fn convert_body_to(mut self, target: BodyType) -> Self {
182 let svc = ConvertBodyTo::new(IdentityProcessor, target);
183 self.steps_mut()
184 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
185 self
186 }
187
188 fn stream_cache(mut self, threshold: usize) -> Self {
189 let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
190 let svc = StreamCacheService::new(IdentityProcessor, config);
191 self.steps_mut()
192 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
193 self
194 }
195
196 fn stream_cache_default(self) -> Self {
200 self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
201 }
202
203 fn marshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
214 let name = format.into();
215 let df = builtin_data_format(&name)
216 .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
217 let svc = MarshalService::new(IdentityProcessor, df);
218 self.steps_mut()
219 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
220 Ok(self)
221 }
222
223 fn unmarshal(mut self, format: impl Into<String>) -> Result<Self, CamelError> {
234 let name = format.into();
235 let df = builtin_data_format(&name)
236 .ok_or_else(|| CamelError::Config(format!("unknown data format: '{name}'")))?;
237 let svc = UnmarshalService::new(IdentityProcessor, df);
238 self.steps_mut()
239 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
240 Ok(self)
241 }
242
243 fn validate(mut self, schema_path: impl Into<String>) -> Self {
252 let path = schema_path.into();
253 let uri = if path.starts_with("validator:") {
254 path
255 } else {
256 format!("validator:{path}")
257 };
258 self.steps_mut().push(BuilderStep::To(uri));
259 self
260 }
261
262 fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
273 self.steps_mut().push(BuilderStep::Script {
274 language: language.into(),
275 script: script.into(),
276 });
277 self
278 }
279
280 fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
281 self.steps_mut().push(BuilderStep::Bean {
282 name: name.into(),
283 method: method.into(),
284 });
285 self
286 }
287}
288
289pub struct RouteBuilder {
304 from_uri: String,
305 steps: Vec<BuilderStep>,
306 error_handler: Option<ErrorHandlerConfig>,
307 error_handler_mode: ErrorHandlerMode,
308 circuit_breaker_config: Option<CircuitBreakerConfig>,
309 concurrency: Option<ConcurrencyModel>,
310 route_id: Option<String>,
311 auto_startup: Option<bool>,
312 startup_order: Option<i32>,
313}
314
315#[derive(Default)]
316enum ErrorHandlerMode {
317 #[default]
318 None,
319 ExplicitConfig,
320 Shorthand {
321 dlc_uri: Option<String>,
322 specs: Vec<OnExceptionSpec>,
323 },
324 Mixed,
325}
326
327#[derive(Clone)]
328struct OnExceptionSpec {
329 matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
330 retry: Option<RedeliveryPolicy>,
331 handled_by: Option<String>,
332}
333
334impl RouteBuilder {
335 pub fn from(endpoint: &str) -> Self {
337 Self {
338 from_uri: endpoint.to_string(),
339 steps: Vec::new(),
340 error_handler: None,
341 error_handler_mode: ErrorHandlerMode::None,
342 circuit_breaker_config: None,
343 concurrency: None,
344 route_id: None,
345 auto_startup: None,
346 startup_order: None,
347 }
348 }
349
350 pub fn filter<F>(self, predicate: F) -> FilterBuilder
354 where
355 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
356 {
357 FilterBuilder {
358 parent: self,
359 predicate: std::sync::Arc::new(predicate),
360 steps: vec![],
361 }
362 }
363
364 pub fn choice(self) -> ChoiceBuilder {
370 ChoiceBuilder {
371 parent: self,
372 whens: vec![],
373 _otherwise: None,
374 }
375 }
376
377 pub fn wire_tap(mut self, endpoint: &str) -> Self {
381 self.steps.push(BuilderStep::WireTap {
382 uri: endpoint.to_string(),
383 });
384 self
385 }
386
387 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
389 self.error_handler_mode = match self.error_handler_mode {
390 ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
391 ErrorHandlerMode::ExplicitConfig
392 }
393 ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
394 };
395 self.error_handler = Some(config);
396 self
397 }
398
399 pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
401 let uri = uri.into();
402 self.error_handler_mode = match self.error_handler_mode {
403 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
404 dlc_uri: Some(uri),
405 specs: Vec::new(),
406 },
407 ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
408 dlc_uri: Some(uri),
409 specs,
410 },
411 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
412 };
413 self
414 }
415
416 pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
418 where
419 F: Fn(&CamelError) -> bool + Send + Sync + 'static,
420 {
421 self.error_handler_mode = match self.error_handler_mode {
422 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
423 dlc_uri: None,
424 specs: Vec::new(),
425 },
426 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
427 shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
428 };
429
430 OnExceptionBuilder {
431 parent: self,
432 policy: OnExceptionSpec {
433 matches: std::sync::Arc::new(matches),
434 retry: None,
435 handled_by: None,
436 },
437 }
438 }
439
440 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
442 self.circuit_breaker_config = Some(config);
443 self
444 }
445
446 pub fn concurrent(mut self, max: usize) -> Self {
460 let max = if max == 0 { None } else { Some(max) };
461 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
462 self
463 }
464
465 pub fn sequential(mut self) -> Self {
470 self.concurrency = Some(ConcurrencyModel::Sequential);
471 self
472 }
473
474 pub fn route_id(mut self, id: impl Into<String>) -> Self {
478 self.route_id = Some(id.into());
479 self
480 }
481
482 pub fn auto_startup(mut self, auto: bool) -> Self {
486 self.auto_startup = Some(auto);
487 self
488 }
489
490 pub fn startup_order(mut self, order: i32) -> Self {
494 self.startup_order = Some(order);
495 self
496 }
497
498 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
504 SplitBuilder {
505 parent: self,
506 config,
507 steps: Vec::new(),
508 }
509 }
510
511 pub fn multicast(self) -> MulticastBuilder {
517 MulticastBuilder {
518 parent: self,
519 steps: Vec::new(),
520 config: MulticastConfig::new(),
521 }
522 }
523
524 pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
531 ThrottleBuilder {
532 parent: self,
533 config: ThrottlerConfig::new(max_requests, period),
534 steps: Vec::new(),
535 }
536 }
537
538 pub fn loop_count(self, count: usize) -> LoopBuilder {
540 LoopBuilder {
541 parent: self,
542 config: LoopConfig {
543 mode: LoopMode::Count(count),
544 },
545 steps: vec![],
546 }
547 }
548
549 pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
551 where
552 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
553 {
554 LoopBuilder {
555 parent: self,
556 config: LoopConfig {
557 mode: LoopMode::While(std::sync::Arc::new(predicate)),
558 },
559 steps: vec![],
560 }
561 }
562
563 pub fn load_balance(self) -> LoadBalancerBuilder {
569 LoadBalancerBuilder {
570 parent: self,
571 config: LoadBalancerConfig::round_robin(),
572 steps: Vec::new(),
573 }
574 }
575
576 pub fn dynamic_router(self, expression: RouterExpression) -> Self {
592 self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
593 }
594
595 pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
599 self.steps.push(BuilderStep::DynamicRouter { config });
600 self
601 }
602
603 pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
604 self.routing_slip_with_config(RoutingSlipConfig::new(expression))
605 }
606
607 pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
608 self.steps.push(BuilderStep::RoutingSlip { config });
609 self
610 }
611
612 pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
613 self.recipient_list_with_config(RecipientListConfig::new(expression))
614 }
615
616 pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
617 self.steps.push(BuilderStep::RecipientList { config });
618 self
619 }
620
621 pub fn build(self) -> Result<RouteDefinition, CamelError> {
627 validate_uri(&self.from_uri)?;
628 let route_id = self
629 .route_id
630 .filter(|s| !s.trim().is_empty())
631 .ok_or_else(|| {
632 CamelError::RouteError(
633 "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
634 .to_string(),
635 )
636 })?;
637 let resolved_error_handler = match self.error_handler_mode {
638 ErrorHandlerMode::None => self.error_handler,
639 ErrorHandlerMode::ExplicitConfig => self.error_handler,
640 ErrorHandlerMode::Mixed => {
641 return Err(CamelError::RouteError(
642 "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
643 ));
644 }
645 ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
646 let mut config = if let Some(uri) = dlc_uri {
647 ErrorHandlerConfig::dead_letter_channel(uri)
648 } else {
649 ErrorHandlerConfig::log_only()
650 };
651
652 for spec in specs {
653 let matcher = spec.matches.clone();
654 let mut builder = config.on_exception(move |e| matcher(e));
655
656 if let Some(retry) = spec.retry {
657 builder = builder.retry(retry.max_attempts).with_backoff(
658 retry.initial_delay,
659 retry.multiplier,
660 retry.max_delay,
661 );
662 if retry.jitter_factor > 0.0 {
663 builder = builder.with_jitter(retry.jitter_factor);
664 }
665 }
666
667 if let Some(uri) = spec.handled_by {
668 builder = builder.handled_by(uri);
669 }
670
671 config = builder.build();
672 }
673
674 Some(config)
675 }
676 };
677
678 let definition = RouteDefinition::new(self.from_uri, self.steps);
679 let definition = if let Some(eh) = resolved_error_handler {
680 definition.with_error_handler(eh)
681 } else {
682 definition
683 };
684 let definition = if let Some(cb) = self.circuit_breaker_config {
685 definition.with_circuit_breaker(cb)
686 } else {
687 definition
688 };
689 let definition = if let Some(concurrency) = self.concurrency {
690 definition.with_concurrency(concurrency)
691 } else {
692 definition
693 };
694 let definition = definition.with_route_id(route_id);
695 let definition = if let Some(auto) = self.auto_startup {
696 definition.with_auto_startup(auto)
697 } else {
698 definition
699 };
700 let definition = if let Some(order) = self.startup_order {
701 definition.with_startup_order(order)
702 } else {
703 definition
704 };
705 Ok(definition)
706 }
707
708 pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
710 validate_uri(&self.from_uri)?;
711 let route_id = self
712 .route_id
713 .filter(|s| !s.trim().is_empty())
714 .ok_or_else(|| {
715 CamelError::RouteError(
716 "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
717 .to_string(),
718 )
719 })?;
720
721 let steps = canonicalize_steps(self.steps)?;
722 let circuit_breaker = self
723 .circuit_breaker_config
724 .map(canonicalize_circuit_breaker);
725
726 let spec = CanonicalRouteSpec {
727 route_id,
728 from: self.from_uri,
729 steps,
730 circuit_breaker,
731 version: camel_api::CANONICAL_CONTRACT_VERSION,
732 };
733 spec.validate_contract()?;
734 Ok(spec)
735 }
736}
737
738pub struct OnExceptionBuilder {
739 parent: RouteBuilder,
740 policy: OnExceptionSpec,
741}
742
743impl OnExceptionBuilder {
744 pub fn retry(mut self, max_attempts: u32) -> Self {
745 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
746 self
747 }
748
749 pub fn with_backoff(
750 mut self,
751 initial: std::time::Duration,
752 multiplier: f64,
753 max: std::time::Duration,
754 ) -> Self {
755 if let Some(ref mut retry) = self.policy.retry {
756 retry.initial_delay = initial;
757 retry.multiplier = multiplier;
758 retry.max_delay = max;
759 } else {
760 tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
761 }
762 self
763 }
764
765 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
766 if let Some(ref mut retry) = self.policy.retry {
767 retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
768 } else {
769 tracing::warn!("backoff/jitter configuration has no effect when retry_count is 0");
770 }
771 self
772 }
773
774 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
775 self.policy.handled_by = Some(uri.into());
776 self
777 }
778
779 pub fn end_on_exception(mut self) -> RouteBuilder {
780 if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
781 specs.push(self.policy);
782 }
783 self.parent
784 }
785}
786
787fn validate_uri(uri: &str) -> Result<(), CamelError> {
789 let trimmed = uri.trim();
790 if trimmed.is_empty() {
791 return Err(CamelError::RouteError(
792 "route must have a 'from' URI".to_string(),
793 ));
794 }
795 if !trimmed.contains(':') {
796 return Err(CamelError::RouteError(
797 "URI must have a scheme (e.g. 'timer:tick')".to_string(),
798 ));
799 }
800 let scheme = trimmed.split(':').next().unwrap_or("");
801 if scheme.trim().is_empty() {
802 return Err(CamelError::RouteError(
803 "URI scheme must not be empty".to_string(),
804 ));
805 }
806 Ok(())
807}
808
809fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
810 let mut canonical = Vec::with_capacity(steps.len());
811 for step in steps {
812 canonical.push(canonicalize_step(step)?);
813 }
814 Ok(canonical)
815}
816
817fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
818 match step {
819 BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
820 BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
821 BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
822 BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
823 BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
824 delay_ms: config.delay_ms,
825 dynamic_header: config.dynamic_header,
826 }),
827 BuilderStep::DeclarativeScript { expression } => {
828 Ok(CanonicalStepSpec::Script { expression })
829 }
830 BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
831 predicate,
832 steps: canonicalize_steps(steps)?,
833 }),
834 BuilderStep::DeclarativeChoice { whens, otherwise } => {
835 let mut canonical_whens = Vec::with_capacity(whens.len());
836 for DeclarativeWhenStep { predicate, steps } in whens {
837 canonical_whens.push(CanonicalWhenSpec {
838 predicate,
839 steps: canonicalize_steps(steps)?,
840 });
841 }
842 let otherwise = match otherwise {
843 Some(steps) => Some(canonicalize_steps(steps)?),
844 None => None,
845 };
846 Ok(CanonicalStepSpec::Choice {
847 whens: canonical_whens,
848 otherwise,
849 })
850 }
851 BuilderStep::DeclarativeSplit {
852 expression,
853 aggregation,
854 parallel,
855 parallel_limit,
856 stop_on_exception,
857 steps,
858 } => Ok(CanonicalStepSpec::Split {
859 expression: CanonicalSplitExpressionSpec::Language(expression),
860 aggregation: canonicalize_split_aggregation(aggregation)?,
861 parallel,
862 parallel_limit,
863 stop_on_exception,
864 steps: canonicalize_steps(steps)?,
865 }),
866 BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate(
867 canonicalize_aggregate(config)?,
868 )),
869 other => {
870 let step_name = canonical_step_name(&other);
871 let detail = camel_api::canonical_contract_rejection_reason(step_name)
872 .unwrap_or("not included in canonical v1");
873 Err(CamelError::RouteError(format!(
874 "canonical v1 does not support step `{step_name}`: {detail}"
875 )))
876 }
877 }
878}
879
880fn canonicalize_split_aggregation(
881 strategy: camel_api::splitter::AggregationStrategy,
882) -> Result<CanonicalSplitAggregationSpec, CamelError> {
883 match strategy {
884 camel_api::splitter::AggregationStrategy::LastWins => {
885 Ok(CanonicalSplitAggregationSpec::LastWins)
886 }
887 camel_api::splitter::AggregationStrategy::CollectAll => {
888 Ok(CanonicalSplitAggregationSpec::CollectAll)
889 }
890 camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
891 "canonical v1 does not support custom split aggregation".to_string(),
892 )),
893 camel_api::splitter::AggregationStrategy::Original => {
894 Ok(CanonicalSplitAggregationSpec::Original)
895 }
896 }
897}
898
899fn extract_completion_fields(
900 mode: &CompletionMode,
901) -> Result<(Option<usize>, Option<u64>), CamelError> {
902 match mode {
903 CompletionMode::Single(cond) => match cond {
904 CompletionCondition::Size(n) => Ok((Some(*n), None)),
905 CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
906 CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
907 "canonical v1 does not support aggregate predicate completion".to_string(),
908 )),
909 },
910 CompletionMode::Any(conds) => {
911 let mut size = None;
912 let mut timeout_ms = None;
913 for cond in conds {
914 match cond {
915 CompletionCondition::Size(n) => size = Some(*n),
916 CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
917 CompletionCondition::Predicate(_) => {
918 return Err(CamelError::RouteError(
919 "canonical v1 does not support aggregate predicate completion"
920 .to_string(),
921 ));
922 }
923 }
924 }
925 Ok((size, timeout_ms))
926 }
927 }
928}
929
930fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
931 let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
932
933 let header = match &config.correlation {
934 CorrelationStrategy::HeaderName(h) => h.clone(),
935 CorrelationStrategy::Expression { expr, .. } => expr.clone(),
936 CorrelationStrategy::Fn(_) => {
937 return Err(CamelError::RouteError(
938 "canonical v1 does not support Fn correlation strategy".to_string(),
939 ));
940 }
941 };
942
943 let correlation_key = match &config.correlation {
944 CorrelationStrategy::HeaderName(_) => None,
945 CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
946 CorrelationStrategy::Fn(_) => unreachable!(),
947 };
948
949 let strategy = match config.strategy {
950 AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
951 AggregationStrategy::Custom(_) => {
952 return Err(CamelError::RouteError(
953 "canonical v1 does not support custom aggregate strategy".to_string(),
954 ));
955 }
956 };
957 let bucket_ttl_ms = config
958 .bucket_ttl
959 .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
960
961 Ok(CanonicalAggregateSpec {
962 header,
963 completion_size,
964 completion_timeout_ms,
965 correlation_key,
966 force_completion_on_stop: if config.force_completion_on_stop {
967 Some(true)
968 } else {
969 None
970 },
971 discard_on_timeout: if config.discard_on_timeout {
972 Some(true)
973 } else {
974 None
975 },
976 strategy,
977 max_buckets: config.max_buckets,
978 bucket_ttl_ms,
979 })
980}
981
982fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
983 CanonicalCircuitBreakerSpec {
984 failure_threshold: config.failure_threshold,
985 open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
986 }
987}
988
989fn canonical_step_name(step: &BuilderStep) -> &'static str {
990 match step {
991 BuilderStep::Processor(_) => "processor",
992 BuilderStep::To(_) => "to",
993 BuilderStep::Stop => "stop",
994 BuilderStep::Log { .. } => "log",
995 BuilderStep::DeclarativeSetHeader { .. } => "set_header",
996 BuilderStep::DeclarativeSetBody { .. } => "set_body",
997 BuilderStep::DeclarativeFilter { .. } => "filter",
998 BuilderStep::DeclarativeChoice { .. } => "choice",
999 BuilderStep::DeclarativeScript { .. } => "script",
1000 BuilderStep::DeclarativeFunction { .. } => "function",
1001 BuilderStep::DeclarativeSplit { .. } => "split",
1002 BuilderStep::Split { .. } => "split",
1003 BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
1004 BuilderStep::Aggregate { .. } => "aggregate",
1005 BuilderStep::Filter { .. } => "filter",
1006 BuilderStep::Choice { .. } => "choice",
1007 BuilderStep::WireTap { .. } => "wire_tap",
1008 BuilderStep::Delay { .. } => "delay",
1009 BuilderStep::Multicast { .. } => "multicast",
1010 BuilderStep::DeclarativeLog { .. } => "log",
1011 BuilderStep::Bean { .. } => "bean",
1012 BuilderStep::Script { .. } => "script",
1013 BuilderStep::Throttle { .. } => "throttle",
1014 BuilderStep::LoadBalance { .. } => "load_balancer",
1015 BuilderStep::DynamicRouter { .. } => "dynamic_router",
1016 BuilderStep::RoutingSlip { .. } => "routing_slip",
1017 BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
1018 BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
1019 BuilderStep::RecipientList { .. } => "recipient_list",
1020 BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
1021 BuilderStep::DeclarativeSetProperty { .. } => "set_property",
1022 }
1023}
1024
1025impl StepAccumulator for RouteBuilder {
1026 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1027 &mut self.steps
1028 }
1029}
1030
1031pub struct SplitBuilder {
1039 parent: RouteBuilder,
1040 config: SplitterConfig,
1041 steps: Vec<BuilderStep>,
1042}
1043
1044impl SplitBuilder {
1045 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1047 where
1048 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1049 {
1050 FilterInSplitBuilder {
1051 parent: self,
1052 predicate: std::sync::Arc::new(predicate),
1053 steps: vec![],
1054 }
1055 }
1056
1057 pub fn end_split(mut self) -> RouteBuilder {
1060 let split_step = BuilderStep::Split {
1061 config: self.config,
1062 steps: self.steps,
1063 };
1064 self.parent.steps.push(split_step);
1065 self.parent
1066 }
1067}
1068
1069impl StepAccumulator for SplitBuilder {
1070 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1071 &mut self.steps
1072 }
1073}
1074
1075pub struct FilterBuilder {
1077 parent: RouteBuilder,
1078 predicate: FilterPredicate,
1079 steps: Vec<BuilderStep>,
1080}
1081
1082impl FilterBuilder {
1083 pub fn end_filter(mut self) -> RouteBuilder {
1086 let step = BuilderStep::Filter {
1087 predicate: self.predicate,
1088 steps: self.steps,
1089 };
1090 self.parent.steps.push(step);
1091 self.parent
1092 }
1093}
1094
1095impl StepAccumulator for FilterBuilder {
1096 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1097 &mut self.steps
1098 }
1099}
1100
1101pub struct FilterInSplitBuilder {
1103 parent: SplitBuilder,
1104 predicate: FilterPredicate,
1105 steps: Vec<BuilderStep>,
1106}
1107
1108impl FilterInSplitBuilder {
1109 pub fn end_filter(mut self) -> SplitBuilder {
1111 let step = BuilderStep::Filter {
1112 predicate: self.predicate,
1113 steps: self.steps,
1114 };
1115 self.parent.steps.push(step);
1116 self.parent
1117 }
1118}
1119
1120impl StepAccumulator for FilterInSplitBuilder {
1121 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1122 &mut self.steps
1123 }
1124}
1125
1126pub struct ChoiceBuilder {
1133 parent: RouteBuilder,
1134 whens: Vec<WhenStep>,
1135 _otherwise: Option<Vec<BuilderStep>>,
1136}
1137
1138impl ChoiceBuilder {
1139 pub fn when<F>(self, predicate: F) -> WhenBuilder
1142 where
1143 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1144 {
1145 WhenBuilder {
1146 parent: self,
1147 predicate: std::sync::Arc::new(predicate),
1148 steps: vec![],
1149 }
1150 }
1151
1152 pub fn otherwise(self) -> OtherwiseBuilder {
1156 OtherwiseBuilder {
1157 parent: self,
1158 steps: vec![],
1159 }
1160 }
1161
1162 pub fn end_choice(mut self) -> RouteBuilder {
1166 let step = BuilderStep::Choice {
1167 whens: self.whens,
1168 otherwise: self._otherwise,
1169 };
1170 self.parent.steps.push(step);
1171 self.parent
1172 }
1173}
1174
1175pub struct WhenBuilder {
1177 parent: ChoiceBuilder,
1178 predicate: camel_api::FilterPredicate,
1179 steps: Vec<BuilderStep>,
1180}
1181
1182impl WhenBuilder {
1183 pub fn end_when(mut self) -> ChoiceBuilder {
1186 self.parent.whens.push(WhenStep {
1187 predicate: self.predicate,
1188 steps: self.steps,
1189 });
1190 self.parent
1191 }
1192}
1193
1194impl StepAccumulator for WhenBuilder {
1195 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1196 &mut self.steps
1197 }
1198}
1199
1200pub struct OtherwiseBuilder {
1202 parent: ChoiceBuilder,
1203 steps: Vec<BuilderStep>,
1204}
1205
1206impl OtherwiseBuilder {
1207 pub fn end_otherwise(self) -> ChoiceBuilder {
1209 let OtherwiseBuilder { mut parent, steps } = self;
1210 parent._otherwise = Some(steps);
1211 parent
1212 }
1213}
1214
1215impl StepAccumulator for OtherwiseBuilder {
1216 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1217 &mut self.steps
1218 }
1219}
1220
1221pub struct MulticastBuilder {
1229 parent: RouteBuilder,
1230 steps: Vec<BuilderStep>,
1231 config: MulticastConfig,
1232}
1233
1234impl MulticastBuilder {
1235 pub fn parallel(mut self, parallel: bool) -> Self {
1236 self.config = self.config.parallel(parallel);
1237 self
1238 }
1239
1240 pub fn parallel_limit(mut self, limit: usize) -> Self {
1241 self.config = self.config.parallel_limit(limit);
1242 self
1243 }
1244
1245 pub fn stop_on_exception(mut self, stop: bool) -> Self {
1246 self.config = self.config.stop_on_exception(stop);
1247 self
1248 }
1249
1250 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1251 self.config = self.config.timeout(duration);
1252 self
1253 }
1254
1255 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1256 self.config = self.config.aggregation(strategy);
1257 self
1258 }
1259
1260 pub fn end_multicast(mut self) -> RouteBuilder {
1261 let step = BuilderStep::Multicast {
1262 steps: self.steps,
1263 config: self.config,
1264 };
1265 self.parent.steps.push(step);
1266 self.parent
1267 }
1268}
1269
1270impl StepAccumulator for MulticastBuilder {
1271 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1272 &mut self.steps
1273 }
1274}
1275
1276pub struct ThrottleBuilder {
1284 parent: RouteBuilder,
1285 config: ThrottlerConfig,
1286 steps: Vec<BuilderStep>,
1287}
1288
1289impl ThrottleBuilder {
1290 pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1296 self.config = self.config.strategy(strategy);
1297 self
1298 }
1299
1300 pub fn end_throttle(mut self) -> RouteBuilder {
1303 let step = BuilderStep::Throttle {
1304 config: self.config,
1305 steps: self.steps,
1306 };
1307 self.parent.steps.push(step);
1308 self.parent
1309 }
1310}
1311
1312impl StepAccumulator for ThrottleBuilder {
1313 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1314 &mut self.steps
1315 }
1316}
1317
1318pub struct LoopBuilder {
1320 parent: RouteBuilder,
1321 config: LoopConfig,
1322 steps: Vec<BuilderStep>,
1323}
1324
1325impl LoopBuilder {
1326 pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1327 LoopInLoopBuilder {
1328 parent: self,
1329 config: LoopConfig {
1330 mode: LoopMode::Count(count),
1331 },
1332 steps: vec![],
1333 }
1334 }
1335
1336 pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1337 where
1338 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1339 {
1340 LoopInLoopBuilder {
1341 parent: self,
1342 config: LoopConfig {
1343 mode: LoopMode::While(std::sync::Arc::new(predicate)),
1344 },
1345 steps: vec![],
1346 }
1347 }
1348
1349 pub fn end_loop(mut self) -> RouteBuilder {
1350 let step = BuilderStep::Loop {
1351 config: self.config,
1352 steps: self.steps,
1353 };
1354 self.parent.steps.push(step);
1355 self.parent
1356 }
1357}
1358
1359impl StepAccumulator for LoopBuilder {
1360 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1361 &mut self.steps
1362 }
1363}
1364
1365pub struct LoopInLoopBuilder {
1366 parent: LoopBuilder,
1367 config: LoopConfig,
1368 steps: Vec<BuilderStep>,
1369}
1370
1371impl LoopInLoopBuilder {
1372 pub fn end_loop(mut self) -> LoopBuilder {
1373 let step = BuilderStep::Loop {
1374 config: self.config,
1375 steps: self.steps,
1376 };
1377 self.parent.steps.push(step);
1378 self.parent
1379 }
1380}
1381
1382impl StepAccumulator for LoopInLoopBuilder {
1383 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1384 &mut self.steps
1385 }
1386}
1387
1388pub struct LoadBalancerBuilder {
1396 parent: RouteBuilder,
1397 config: LoadBalancerConfig,
1398 steps: Vec<BuilderStep>,
1399}
1400
1401impl LoadBalancerBuilder {
1402 pub fn round_robin(mut self) -> Self {
1404 self.config = LoadBalancerConfig::round_robin();
1405 self
1406 }
1407
1408 pub fn random(mut self) -> Self {
1410 self.config = LoadBalancerConfig::random();
1411 self
1412 }
1413
1414 pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1419 self.config = LoadBalancerConfig::weighted(weights);
1420 self
1421 }
1422
1423 pub fn failover(mut self) -> Self {
1428 self.config = LoadBalancerConfig::failover();
1429 self
1430 }
1431
1432 pub fn parallel(mut self, parallel: bool) -> Self {
1437 self.config = self.config.parallel(parallel);
1438 self
1439 }
1440
1441 pub fn end_load_balance(mut self) -> RouteBuilder {
1444 let step = BuilderStep::LoadBalance {
1445 config: self.config,
1446 steps: self.steps,
1447 };
1448 self.parent.steps.push(step);
1449 self.parent
1450 }
1451}
1452
1453impl StepAccumulator for LoadBalancerBuilder {
1454 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1455 &mut self.steps
1456 }
1457}
1458
1459#[cfg(test)]
1464mod tests {
1465 use super::*;
1466 use camel_api::error_handler::ErrorHandlerConfig;
1467 use camel_api::load_balancer::LoadBalanceStrategy;
1468 use camel_api::{Exchange, Message};
1469 use camel_core::route::BuilderStep;
1470 use std::sync::Arc;
1471 use std::time::Duration;
1472 use tower::{Service, ServiceExt};
1473
1474 #[test]
1475 fn test_builder_from_creates_definition() {
1476 let definition = RouteBuilder::from("timer:tick")
1477 .route_id("test-route")
1478 .build()
1479 .unwrap();
1480 assert_eq!(definition.from_uri(), "timer:tick");
1481 }
1482
1483 #[test]
1484 fn test_builder_empty_from_uri_errors() {
1485 let result = RouteBuilder::from("").route_id("test-route").build();
1486 assert!(result.is_err());
1487 }
1488
1489 #[test]
1490 fn test_build_rejects_schemeless_uri() {
1491 let result = RouteBuilder::from("no-scheme-here")
1492 .route_id("test-route")
1493 .build();
1494 match result {
1495 Err(err) => {
1496 let err_msg = format!("{err}");
1497 assert!(
1498 err_msg.contains("scheme"),
1499 "expected scheme-related error, got: {err_msg}"
1500 );
1501 }
1502 Ok(_) => panic!("schemeless URI should fail"),
1503 }
1504 }
1505
1506 #[test]
1507 fn test_build_rejects_empty_scheme_uri() {
1508 let result = RouteBuilder::from(":missing-scheme")
1509 .route_id("test-route")
1510 .build();
1511 match result {
1512 Err(err) => {
1513 let err_msg = format!("{err}");
1514 assert!(
1515 err_msg.contains("scheme"),
1516 "expected scheme-related error, got: {err_msg}"
1517 );
1518 }
1519 Ok(_) => panic!("empty-scheme URI should fail"),
1520 }
1521 }
1522
1523 #[test]
1524 fn test_build_accepts_valid_uri() {
1525 let result = RouteBuilder::from("timer:tick")
1526 .route_id("test-route")
1527 .build();
1528 assert!(result.is_ok());
1529 }
1530
1531 #[test]
1532 fn test_build_canonical_rejects_schemeless_uri() {
1533 let result = RouteBuilder::from("no-scheme-here")
1534 .route_id("test-route")
1535 .build_canonical();
1536 assert!(result.is_err());
1537 }
1538
1539 #[test]
1540 fn test_builder_to_adds_step() {
1541 let definition = RouteBuilder::from("timer:tick")
1542 .route_id("test-route")
1543 .to("log:info")
1544 .build()
1545 .unwrap();
1546
1547 assert_eq!(definition.from_uri(), "timer:tick");
1548 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1550 }
1551
1552 #[test]
1553 fn test_builder_filter_adds_filter_step() {
1554 let definition = RouteBuilder::from("timer:tick")
1555 .route_id("test-route")
1556 .filter(|_ex| true)
1557 .to("mock:result")
1558 .end_filter()
1559 .build()
1560 .unwrap();
1561
1562 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1563 }
1564
1565 #[test]
1566 fn test_builder_set_header_adds_processor_step() {
1567 let definition = RouteBuilder::from("timer:tick")
1568 .route_id("test-route")
1569 .set_header("key", Value::String("value".into()))
1570 .build()
1571 .unwrap();
1572
1573 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1574 }
1575
1576 #[test]
1577 fn test_builder_map_body_adds_processor_step() {
1578 let definition = RouteBuilder::from("timer:tick")
1579 .route_id("test-route")
1580 .map_body(|body| body)
1581 .build()
1582 .unwrap();
1583
1584 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1585 }
1586
1587 #[test]
1588 fn test_builder_process_adds_processor_step() {
1589 let definition = RouteBuilder::from("timer:tick")
1590 .route_id("test-route")
1591 .process(|ex| async move { Ok(ex) })
1592 .build()
1593 .unwrap();
1594
1595 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1596 }
1597
1598 #[test]
1599 fn test_builder_chain_multiple_steps() {
1600 let definition = RouteBuilder::from("timer:tick")
1601 .route_id("test-route")
1602 .set_header("source", Value::String("timer".into()))
1603 .filter(|ex| ex.input.header("source").is_some())
1604 .to("log:info")
1605 .end_filter()
1606 .to("mock:result")
1607 .build()
1608 .unwrap();
1609
1610 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1614 }
1615
1616 #[test]
1617 fn test_loop_count_builder() {
1618 use camel_api::loop_eip::LoopMode;
1619
1620 let def = RouteBuilder::from("direct:start")
1621 .route_id("loop-test")
1622 .loop_count(3)
1623 .to("mock:inside")
1624 .end_loop()
1625 .to("mock:after")
1626 .build()
1627 .unwrap();
1628
1629 assert_eq!(def.steps().len(), 2);
1630 match &def.steps()[0] {
1631 BuilderStep::Loop { config, steps } => {
1632 assert!(matches!(config.mode, LoopMode::Count(3)));
1633 assert_eq!(steps.len(), 1);
1634 }
1635 other => panic!("Expected Loop, got {:?}", other),
1636 }
1637 assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1638 }
1639
1640 #[test]
1641 fn test_loop_while_builder() {
1642 use camel_api::loop_eip::LoopMode;
1643
1644 let def = RouteBuilder::from("direct:start")
1645 .route_id("loop-while-test")
1646 .loop_while(|_ex| true)
1647 .to("mock:retry")
1648 .end_loop()
1649 .build()
1650 .unwrap();
1651
1652 assert_eq!(def.steps().len(), 1);
1653 match &def.steps()[0] {
1654 BuilderStep::Loop { config, steps } => {
1655 assert!(matches!(config.mode, LoopMode::While(_)));
1656 assert_eq!(steps.len(), 1);
1657 }
1658 other => panic!("Expected Loop, got {:?}", other),
1659 }
1660 }
1661
1662 #[test]
1663 fn test_nested_loop_builder() {
1664 use camel_api::loop_eip::LoopMode;
1665
1666 let def = RouteBuilder::from("direct:start")
1667 .route_id("nested-loop-test")
1668 .loop_count(2)
1669 .to("mock:outer")
1670 .loop_count(3)
1671 .to("mock:inner")
1672 .end_loop()
1673 .end_loop()
1674 .to("mock:after")
1675 .build()
1676 .unwrap();
1677
1678 assert_eq!(def.steps().len(), 2);
1679 match &def.steps()[0] {
1680 BuilderStep::Loop { steps, .. } => {
1681 assert_eq!(steps.len(), 2);
1682 match &steps[1] {
1683 BuilderStep::Loop {
1684 config,
1685 steps: inner_steps,
1686 } => {
1687 assert!(matches!(config.mode, LoopMode::Count(3)));
1688 assert_eq!(inner_steps.len(), 1);
1689 }
1690 other => panic!("Expected nested Loop, got {:?}", other),
1691 }
1692 }
1693 other => panic!("Expected outer Loop, got {:?}", other),
1694 }
1695 }
1696
1697 #[tokio::test]
1702 async fn test_set_header_processor_works() {
1703 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1704 let exchange = Exchange::new(Message::new("test"));
1705 let result = svc.call(exchange).await.unwrap();
1706 assert_eq!(
1707 result.input.header("greeting"),
1708 Some(&Value::String("hello".into()))
1709 );
1710 }
1711
1712 #[tokio::test]
1713 async fn test_filter_processor_passes() {
1714 use camel_api::BoxProcessorExt;
1715 use camel_processor::FilterService;
1716
1717 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1718 let mut svc =
1719 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1720 let exchange = Exchange::new(Message::new("pass"));
1721 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1722 assert_eq!(result.input.body.as_text(), Some("pass"));
1723 }
1724
1725 #[tokio::test]
1726 async fn test_filter_processor_blocks() {
1727 use camel_api::BoxProcessorExt;
1728 use camel_processor::FilterService;
1729
1730 let sub = BoxProcessor::from_fn(|_ex| {
1731 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1732 });
1733 let mut svc =
1734 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1735 let exchange = Exchange::new(Message::new("reject"));
1736 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1737 assert_eq!(result.input.body.as_text(), Some("reject"));
1738 }
1739
1740 #[tokio::test]
1741 async fn test_map_body_processor_works() {
1742 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1743 if let Some(text) = body.as_text() {
1744 Body::Text(text.to_uppercase())
1745 } else {
1746 body
1747 }
1748 });
1749 let exchange = Exchange::new(Message::new("hello"));
1750 let result = mapper.oneshot(exchange).await.unwrap();
1751 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1752 }
1753
1754 #[tokio::test]
1755 async fn test_process_custom_processor_works() {
1756 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1757 ex.set_property("custom", Value::Bool(true));
1758 Ok(ex)
1759 });
1760 let exchange = Exchange::new(Message::default());
1761 let result = processor.oneshot(exchange).await.unwrap();
1762 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1763 }
1764
1765 #[tokio::test]
1770 async fn test_compose_pipeline_runs_steps_in_order() {
1771 use camel_core::route::compose_pipeline;
1772
1773 let processors = vec![
1774 BoxProcessor::new(SetHeader::new(
1775 IdentityProcessor,
1776 "step",
1777 Value::String("one".into()),
1778 )),
1779 BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1780 if let Some(text) = body.as_text() {
1781 Body::Text(format!("{}-processed", text))
1782 } else {
1783 body
1784 }
1785 })),
1786 ];
1787
1788 let pipeline = compose_pipeline(processors);
1789 let exchange = Exchange::new(Message::new("hello"));
1790 let result = pipeline.oneshot(exchange).await.unwrap();
1791
1792 assert_eq!(
1793 result.input.header("step"),
1794 Some(&Value::String("one".into()))
1795 );
1796 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1797 }
1798
1799 #[tokio::test]
1800 async fn test_compose_pipeline_empty_is_identity() {
1801 use camel_core::route::compose_pipeline;
1802
1803 let pipeline = compose_pipeline(vec![]);
1804 let exchange = Exchange::new(Message::new("unchanged"));
1805 let result = pipeline.oneshot(exchange).await.unwrap();
1806 assert_eq!(result.input.body.as_text(), Some("unchanged"));
1807 }
1808
1809 #[test]
1814 fn test_builder_circuit_breaker_sets_config() {
1815 use camel_api::circuit_breaker::CircuitBreakerConfig;
1816
1817 let config = CircuitBreakerConfig::new().failure_threshold(5);
1818 let definition = RouteBuilder::from("timer:tick")
1819 .route_id("test-route")
1820 .circuit_breaker(config)
1821 .build()
1822 .unwrap();
1823
1824 let cb = definition
1825 .circuit_breaker_config()
1826 .expect("circuit breaker should be set");
1827 assert_eq!(cb.failure_threshold, 5);
1828 }
1829
1830 #[test]
1831 fn test_builder_circuit_breaker_with_error_handler() {
1832 use camel_api::circuit_breaker::CircuitBreakerConfig;
1833 use camel_api::error_handler::ErrorHandlerConfig;
1834
1835 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1836 let eh_config = ErrorHandlerConfig::log_only();
1837
1838 let definition = RouteBuilder::from("timer:tick")
1839 .route_id("test-route")
1840 .to("log:info")
1841 .circuit_breaker(cb_config)
1842 .error_handler(eh_config)
1843 .build()
1844 .unwrap();
1845
1846 assert!(
1847 definition.circuit_breaker_config().is_some(),
1848 "circuit breaker config should be set"
1849 );
1850 }
1852
1853 #[test]
1854 fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1855 let definition = RouteBuilder::from("direct:start")
1856 .route_id("test-route")
1857 .dead_letter_channel("log:dlc")
1858 .on_exception(|e| matches!(e, CamelError::Io(_)))
1859 .retry(3)
1860 .handled_by("log:io")
1861 .end_on_exception()
1862 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1863 .retry(1)
1864 .end_on_exception()
1865 .to("mock:out")
1866 .build()
1867 .expect("route should build");
1868
1869 let cfg = definition
1870 .error_handler_config()
1871 .expect("error handler should be set");
1872 assert_eq!(cfg.policies.len(), 2);
1873 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1874 assert_eq!(
1875 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1876 Some(3)
1877 );
1878 assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1879 assert_eq!(
1880 cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1881 Some(1)
1882 );
1883 }
1884
1885 #[test]
1886 fn test_builder_on_exception_mixed_mode_rejected() {
1887 let result = RouteBuilder::from("direct:start")
1888 .route_id("test-route")
1889 .error_handler(ErrorHandlerConfig::log_only())
1890 .on_exception(|_e| true)
1891 .end_on_exception()
1892 .to("mock:out")
1893 .build();
1894
1895 let err = result.err().expect("mixed mode should fail with an error");
1896
1897 assert!(
1898 format!("{err}").contains("mixed error handler modes"),
1899 "unexpected error: {err}"
1900 );
1901 }
1902
1903 #[test]
1904 fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1905 let definition = RouteBuilder::from("direct:start")
1906 .route_id("test-route")
1907 .on_exception(|_e| true)
1908 .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1909 .with_jitter(0.5)
1910 .end_on_exception()
1911 .to("mock:out")
1912 .build()
1913 .expect("route should build");
1914
1915 let cfg = definition
1916 .error_handler_config()
1917 .expect("error handler should be set");
1918 assert_eq!(cfg.policies.len(), 1);
1919 assert!(cfg.policies[0].retry.is_none());
1920 }
1921
1922 #[test]
1923 fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1924 let definition = RouteBuilder::from("direct:start")
1925 .route_id("test-route")
1926 .dead_letter_channel("log:dlc")
1927 .to("mock:out")
1928 .build()
1929 .expect("route should build");
1930
1931 let cfg = definition
1932 .error_handler_config()
1933 .expect("error handler should be set");
1934 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1935 assert!(cfg.policies.is_empty());
1936 }
1937
1938 #[test]
1939 fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1940 let definition = RouteBuilder::from("direct:start")
1941 .route_id("test-route")
1942 .dead_letter_channel("log:first")
1943 .on_exception(|e| matches!(e, CamelError::Io(_)))
1944 .retry(2)
1945 .end_on_exception()
1946 .dead_letter_channel("log:second")
1947 .to("mock:out")
1948 .build()
1949 .expect("route should build");
1950
1951 let cfg = definition
1952 .error_handler_config()
1953 .expect("error handler should be set");
1954 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1955 assert_eq!(cfg.policies.len(), 1);
1956 assert_eq!(
1957 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1958 Some(2)
1959 );
1960 }
1961
1962 #[test]
1963 fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
1964 let definition = RouteBuilder::from("direct:start")
1965 .route_id("test-route")
1966 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1967 .retry(1)
1968 .end_on_exception()
1969 .to("mock:out")
1970 .build()
1971 .expect("route should build");
1972
1973 let cfg = definition
1974 .error_handler_config()
1975 .expect("error handler should be set");
1976 assert!(cfg.dlc_uri.is_none());
1977 assert_eq!(cfg.policies.len(), 1);
1978 }
1979
1980 #[test]
1981 fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
1982 let first = ErrorHandlerConfig::dead_letter_channel("log:first");
1983 let second = ErrorHandlerConfig::dead_letter_channel("log:second");
1984
1985 let definition = RouteBuilder::from("direct:start")
1986 .route_id("test-route")
1987 .error_handler(first)
1988 .error_handler(second)
1989 .to("mock:out")
1990 .build()
1991 .expect("route should build");
1992
1993 let cfg = definition
1994 .error_handler_config()
1995 .expect("error handler should be set");
1996 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1997 }
1998
1999 #[test]
2002 fn test_split_builder_typestate() {
2003 use camel_api::splitter::{SplitterConfig, split_body_lines};
2004
2005 let definition = RouteBuilder::from("timer:test?period=1000")
2007 .route_id("test-route")
2008 .split(SplitterConfig::new(split_body_lines()))
2009 .to("mock:per-fragment")
2010 .end_split()
2011 .to("mock:final")
2012 .build()
2013 .unwrap();
2014
2015 assert_eq!(definition.steps().len(), 2);
2017 }
2018
2019 #[test]
2020 fn test_split_builder_steps_collected() {
2021 use camel_api::splitter::{SplitterConfig, split_body_lines};
2022
2023 let definition = RouteBuilder::from("timer:test?period=1000")
2024 .route_id("test-route")
2025 .split(SplitterConfig::new(split_body_lines()))
2026 .set_header("fragment", Value::String("yes".into()))
2027 .to("mock:per-fragment")
2028 .end_split()
2029 .build()
2030 .unwrap();
2031
2032 assert_eq!(definition.steps().len(), 1);
2034 match &definition.steps()[0] {
2035 BuilderStep::Split { steps, .. } => {
2036 assert_eq!(steps.len(), 2); }
2038 other => panic!("Expected Split, got {:?}", other),
2039 }
2040 }
2041
2042 #[test]
2043 fn test_split_builder_config_propagated() {
2044 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
2045
2046 let definition = RouteBuilder::from("timer:test?period=1000")
2047 .route_id("test-route")
2048 .split(
2049 SplitterConfig::new(split_body_lines())
2050 .parallel(true)
2051 .parallel_limit(4)
2052 .aggregation(AggregationStrategy::CollectAll),
2053 )
2054 .to("mock:per-fragment")
2055 .end_split()
2056 .build()
2057 .unwrap();
2058
2059 match &definition.steps()[0] {
2060 BuilderStep::Split { config, .. } => {
2061 assert!(config.parallel);
2062 assert_eq!(config.parallel_limit, Some(4));
2063 assert!(matches!(
2064 config.aggregation,
2065 AggregationStrategy::CollectAll
2066 ));
2067 }
2068 other => panic!("Expected Split, got {:?}", other),
2069 }
2070 }
2071
2072 #[test]
2073 fn test_aggregate_builder_adds_step() {
2074 use camel_api::aggregator::AggregatorConfig;
2075 use camel_core::route::BuilderStep;
2076
2077 let definition = RouteBuilder::from("timer:tick")
2078 .route_id("test-route")
2079 .aggregate(
2080 AggregatorConfig::correlate_by("key")
2081 .complete_when_size(2)
2082 .build()
2083 .unwrap(),
2084 )
2085 .build()
2086 .unwrap();
2087
2088 assert_eq!(definition.steps().len(), 1);
2089 assert!(matches!(
2090 definition.steps()[0],
2091 BuilderStep::Aggregate { .. }
2092 ));
2093 }
2094
2095 #[test]
2096 fn test_aggregate_in_split_builder() {
2097 use camel_api::aggregator::AggregatorConfig;
2098 use camel_api::splitter::{SplitterConfig, split_body_lines};
2099 use camel_core::route::BuilderStep;
2100
2101 let definition = RouteBuilder::from("timer:tick")
2102 .route_id("test-route")
2103 .split(SplitterConfig::new(split_body_lines()))
2104 .aggregate(
2105 AggregatorConfig::correlate_by("key")
2106 .complete_when_size(1)
2107 .build()
2108 .unwrap(),
2109 )
2110 .end_split()
2111 .build()
2112 .unwrap();
2113
2114 assert_eq!(definition.steps().len(), 1);
2115 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2116 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2117 } else {
2118 panic!("expected Split step");
2119 }
2120 }
2121
2122 #[test]
2125 fn test_builder_set_body_static_adds_processor() {
2126 let definition = RouteBuilder::from("timer:tick")
2127 .route_id("test-route")
2128 .set_body("fixed")
2129 .build()
2130 .unwrap();
2131 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2132 }
2133
2134 #[test]
2135 fn test_builder_set_body_fn_adds_processor() {
2136 let definition = RouteBuilder::from("timer:tick")
2137 .route_id("test-route")
2138 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2139 .build()
2140 .unwrap();
2141 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2142 }
2143
2144 #[test]
2145 fn transform_alias_produces_same_as_set_body() {
2146 let route_transform = RouteBuilder::from("timer:tick")
2147 .route_id("test-route")
2148 .transform("hello")
2149 .build()
2150 .unwrap();
2151
2152 let route_set_body = RouteBuilder::from("timer:tick")
2153 .route_id("test-route")
2154 .set_body("hello")
2155 .build()
2156 .unwrap();
2157
2158 assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2159 }
2160
2161 #[test]
2162 fn test_builder_set_header_fn_adds_processor() {
2163 let definition = RouteBuilder::from("timer:tick")
2164 .route_id("test-route")
2165 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2166 .build()
2167 .unwrap();
2168 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2169 }
2170
2171 #[tokio::test]
2172 async fn test_set_body_static_processor_works() {
2173 use camel_core::route::compose_pipeline;
2174 let def = RouteBuilder::from("t:t")
2175 .route_id("test-route")
2176 .set_body("replaced")
2177 .build()
2178 .unwrap();
2179 let pipeline = compose_pipeline(
2180 def.steps()
2181 .iter()
2182 .filter_map(|s| {
2183 if let BuilderStep::Processor(p) = s {
2184 Some(p.clone())
2185 } else {
2186 None
2187 }
2188 })
2189 .collect(),
2190 );
2191 let exchange = Exchange::new(Message::new("original"));
2192 let result = pipeline.oneshot(exchange).await.unwrap();
2193 assert_eq!(result.input.body.as_text(), Some("replaced"));
2194 }
2195
2196 #[tokio::test]
2197 async fn test_set_body_fn_processor_works() {
2198 use camel_core::route::compose_pipeline;
2199 let def = RouteBuilder::from("t:t")
2200 .route_id("test-route")
2201 .set_body_fn(|ex: &Exchange| {
2202 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2203 })
2204 .build()
2205 .unwrap();
2206 let pipeline = compose_pipeline(
2207 def.steps()
2208 .iter()
2209 .filter_map(|s| {
2210 if let BuilderStep::Processor(p) = s {
2211 Some(p.clone())
2212 } else {
2213 None
2214 }
2215 })
2216 .collect(),
2217 );
2218 let exchange = Exchange::new(Message::new("hello"));
2219 let result = pipeline.oneshot(exchange).await.unwrap();
2220 assert_eq!(result.input.body.as_text(), Some("HELLO"));
2221 }
2222
2223 #[tokio::test]
2224 async fn test_set_header_fn_processor_works() {
2225 use camel_core::route::compose_pipeline;
2226 let def = RouteBuilder::from("t:t")
2227 .route_id("test-route")
2228 .set_header_fn("echo", |ex: &Exchange| {
2229 ex.input
2230 .body
2231 .as_text()
2232 .map(|t| Value::String(t.into()))
2233 .unwrap_or(Value::Null)
2234 })
2235 .build()
2236 .unwrap();
2237 let pipeline = compose_pipeline(
2238 def.steps()
2239 .iter()
2240 .filter_map(|s| {
2241 if let BuilderStep::Processor(p) = s {
2242 Some(p.clone())
2243 } else {
2244 None
2245 }
2246 })
2247 .collect(),
2248 );
2249 let exchange = Exchange::new(Message::new("ping"));
2250 let result = pipeline.oneshot(exchange).await.unwrap();
2251 assert_eq!(
2252 result.input.header("echo"),
2253 Some(&Value::String("ping".into()))
2254 );
2255 }
2256
2257 #[test]
2260 fn test_filter_builder_typestate() {
2261 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2262 .route_id("test-route")
2263 .filter(|_ex| true)
2264 .to("mock:inner")
2265 .end_filter()
2266 .to("mock:outer")
2267 .build();
2268 assert!(result.is_ok());
2269 }
2270
2271 #[test]
2272 fn test_filter_builder_steps_collected() {
2273 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2274 .route_id("test-route")
2275 .filter(|_ex| true)
2276 .to("mock:inner")
2277 .end_filter()
2278 .build()
2279 .unwrap();
2280
2281 assert_eq!(definition.steps().len(), 1);
2282 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2283 }
2284
2285 #[test]
2286 fn test_wire_tap_builder_adds_step() {
2287 let definition = RouteBuilder::from("timer:tick")
2288 .route_id("test-route")
2289 .wire_tap("mock:tap")
2290 .to("mock:result")
2291 .build()
2292 .unwrap();
2293
2294 assert_eq!(definition.steps().len(), 2);
2295 assert!(
2296 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2297 );
2298 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2299 }
2300
2301 #[test]
2304 fn test_multicast_builder_typestate() {
2305 let definition = RouteBuilder::from("timer:tick")
2306 .route_id("test-route")
2307 .multicast()
2308 .to("direct:a")
2309 .to("direct:b")
2310 .end_multicast()
2311 .to("mock:result")
2312 .build()
2313 .unwrap();
2314
2315 assert_eq!(definition.steps().len(), 2); }
2317
2318 #[test]
2319 fn test_multicast_builder_steps_collected() {
2320 let definition = RouteBuilder::from("timer:tick")
2321 .route_id("test-route")
2322 .multicast()
2323 .to("direct:a")
2324 .to("direct:b")
2325 .end_multicast()
2326 .build()
2327 .unwrap();
2328
2329 match &definition.steps()[0] {
2330 BuilderStep::Multicast { steps, .. } => {
2331 assert_eq!(steps.len(), 2);
2332 }
2333 other => panic!("Expected Multicast, got {:?}", other),
2334 }
2335 }
2336
2337 #[test]
2340 fn test_builder_concurrent_sets_concurrency() {
2341 use camel_component_api::ConcurrencyModel;
2342
2343 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2344 .route_id("test-route")
2345 .concurrent(16)
2346 .to("log:info")
2347 .build()
2348 .unwrap();
2349
2350 assert_eq!(
2351 definition.concurrency_override(),
2352 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2353 );
2354 }
2355
2356 #[test]
2357 fn test_builder_concurrent_zero_means_unbounded() {
2358 use camel_component_api::ConcurrencyModel;
2359
2360 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2361 .route_id("test-route")
2362 .concurrent(0)
2363 .to("log:info")
2364 .build()
2365 .unwrap();
2366
2367 assert_eq!(
2368 definition.concurrency_override(),
2369 Some(&ConcurrencyModel::Concurrent { max: None })
2370 );
2371 }
2372
2373 #[test]
2374 fn test_builder_sequential_sets_concurrency() {
2375 use camel_component_api::ConcurrencyModel;
2376
2377 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2378 .route_id("test-route")
2379 .sequential()
2380 .to("log:info")
2381 .build()
2382 .unwrap();
2383
2384 assert_eq!(
2385 definition.concurrency_override(),
2386 Some(&ConcurrencyModel::Sequential)
2387 );
2388 }
2389
2390 #[test]
2391 fn test_builder_default_concurrency_is_none() {
2392 let definition = RouteBuilder::from("timer:tick")
2393 .route_id("test-route")
2394 .to("log:info")
2395 .build()
2396 .unwrap();
2397
2398 assert_eq!(definition.concurrency_override(), None);
2399 }
2400
2401 #[test]
2404 fn test_builder_route_id_sets_id() {
2405 let definition = RouteBuilder::from("timer:tick")
2406 .route_id("my-route")
2407 .build()
2408 .unwrap();
2409
2410 assert_eq!(definition.route_id(), "my-route");
2411 }
2412
2413 #[test]
2414 fn test_build_without_route_id_fails() {
2415 let result = RouteBuilder::from("timer:tick?period=1000")
2416 .to("log:info")
2417 .build();
2418 let err = match result {
2419 Err(e) => e.to_string(),
2420 Ok(_) => panic!("build() should fail without route_id"),
2421 };
2422 assert!(
2423 err.contains("route_id"),
2424 "error should mention route_id, got: {}",
2425 err
2426 );
2427 }
2428
2429 #[test]
2430 fn test_builder_empty_route_id_rejected() {
2431 let result = RouteBuilder::from("timer:tick").route_id("").build();
2432 let err = result.err().expect("empty route_id should be rejected");
2433 assert!(matches!(err, CamelError::RouteError(_)));
2434 }
2435
2436 #[test]
2437 fn test_builder_whitespace_route_id_rejected() {
2438 let result = RouteBuilder::from("timer:tick").route_id(" ").build();
2439 assert!(result.is_err());
2440 }
2441
2442 #[test]
2443 fn test_builder_auto_startup_false() {
2444 let definition = RouteBuilder::from("timer:tick")
2445 .route_id("test-route")
2446 .auto_startup(false)
2447 .build()
2448 .unwrap();
2449
2450 assert!(!definition.auto_startup());
2451 }
2452
2453 #[test]
2454 fn test_builder_startup_order_custom() {
2455 let definition = RouteBuilder::from("timer:tick")
2456 .route_id("test-route")
2457 .startup_order(50)
2458 .build()
2459 .unwrap();
2460
2461 assert_eq!(definition.startup_order(), 50);
2462 }
2463
2464 #[test]
2465 fn test_builder_defaults() {
2466 let definition = RouteBuilder::from("timer:tick")
2467 .route_id("test-route")
2468 .build()
2469 .unwrap();
2470
2471 assert_eq!(definition.route_id(), "test-route");
2472 assert!(definition.auto_startup());
2473 assert_eq!(definition.startup_order(), 1000);
2474 }
2475
2476 #[test]
2479 fn test_choice_builder_single_when() {
2480 let definition = RouteBuilder::from("timer:tick")
2481 .route_id("test-route")
2482 .choice()
2483 .when(|ex: &Exchange| ex.input.header("type").is_some())
2484 .to("mock:typed")
2485 .end_when()
2486 .end_choice()
2487 .build()
2488 .unwrap();
2489 assert_eq!(definition.steps().len(), 1);
2490 assert!(
2491 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2492 if whens.len() == 1 && otherwise.is_none())
2493 );
2494 }
2495
2496 #[test]
2497 fn test_choice_builder_when_otherwise() {
2498 let definition = RouteBuilder::from("timer:tick")
2499 .route_id("test-route")
2500 .choice()
2501 .when(|ex: &Exchange| ex.input.header("a").is_some())
2502 .to("mock:a")
2503 .end_when()
2504 .otherwise()
2505 .to("mock:fallback")
2506 .end_otherwise()
2507 .end_choice()
2508 .build()
2509 .unwrap();
2510 assert!(
2511 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2512 if whens.len() == 1 && otherwise.is_some())
2513 );
2514 }
2515
2516 #[test]
2517 fn test_choice_builder_multiple_whens() {
2518 let definition = RouteBuilder::from("timer:tick")
2519 .route_id("test-route")
2520 .choice()
2521 .when(|ex: &Exchange| ex.input.header("a").is_some())
2522 .to("mock:a")
2523 .end_when()
2524 .when(|ex: &Exchange| ex.input.header("b").is_some())
2525 .to("mock:b")
2526 .end_when()
2527 .end_choice()
2528 .build()
2529 .unwrap();
2530 assert!(
2531 matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2532 if whens.len() == 2)
2533 );
2534 }
2535
2536 #[test]
2537 fn test_choice_step_after_choice() {
2538 let definition = RouteBuilder::from("timer:tick")
2540 .route_id("test-route")
2541 .choice()
2542 .when(|_ex: &Exchange| true)
2543 .to("mock:inner")
2544 .end_when()
2545 .end_choice()
2546 .to("mock:outer") .build()
2548 .unwrap();
2549 assert_eq!(definition.steps().len(), 2);
2550 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2551 }
2552
2553 #[test]
2556 fn test_throttle_builder_typestate() {
2557 let definition = RouteBuilder::from("timer:tick")
2558 .route_id("test-route")
2559 .throttle(10, std::time::Duration::from_secs(1))
2560 .to("mock:result")
2561 .end_throttle()
2562 .build()
2563 .unwrap();
2564
2565 assert_eq!(definition.steps().len(), 1);
2566 assert!(matches!(
2567 &definition.steps()[0],
2568 BuilderStep::Throttle { .. }
2569 ));
2570 }
2571
2572 #[test]
2573 fn test_throttle_builder_with_strategy() {
2574 let definition = RouteBuilder::from("timer:tick")
2575 .route_id("test-route")
2576 .throttle(10, std::time::Duration::from_secs(1))
2577 .strategy(ThrottleStrategy::Reject)
2578 .to("mock:result")
2579 .end_throttle()
2580 .build()
2581 .unwrap();
2582
2583 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2584 assert_eq!(config.strategy, ThrottleStrategy::Reject);
2585 } else {
2586 panic!("Expected Throttle step");
2587 }
2588 }
2589
2590 #[test]
2591 fn test_throttle_builder_steps_collected() {
2592 let definition = RouteBuilder::from("timer:tick")
2593 .route_id("test-route")
2594 .throttle(5, std::time::Duration::from_secs(1))
2595 .set_header("throttled", Value::Bool(true))
2596 .to("mock:throttled")
2597 .end_throttle()
2598 .build()
2599 .unwrap();
2600
2601 match &definition.steps()[0] {
2602 BuilderStep::Throttle { steps, .. } => {
2603 assert_eq!(steps.len(), 2); }
2605 other => panic!("Expected Throttle, got {:?}", other),
2606 }
2607 }
2608
2609 #[test]
2610 fn test_throttle_step_after_throttle() {
2611 let definition = RouteBuilder::from("timer:tick")
2613 .route_id("test-route")
2614 .throttle(10, std::time::Duration::from_secs(1))
2615 .to("mock:inner")
2616 .end_throttle()
2617 .to("mock:outer")
2618 .build()
2619 .unwrap();
2620
2621 assert_eq!(definition.steps().len(), 2);
2622 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2623 }
2624
2625 #[test]
2628 fn test_load_balance_builder_typestate() {
2629 let definition = RouteBuilder::from("timer:tick")
2630 .route_id("test-route")
2631 .load_balance()
2632 .round_robin()
2633 .to("mock:a")
2634 .to("mock:b")
2635 .end_load_balance()
2636 .build()
2637 .unwrap();
2638
2639 assert_eq!(definition.steps().len(), 1);
2640 assert!(matches!(
2641 &definition.steps()[0],
2642 BuilderStep::LoadBalance { .. }
2643 ));
2644 }
2645
2646 #[test]
2647 fn test_load_balance_builder_with_strategy() {
2648 let definition = RouteBuilder::from("timer:tick")
2649 .route_id("test-route")
2650 .load_balance()
2651 .random()
2652 .to("mock:result")
2653 .end_load_balance()
2654 .build()
2655 .unwrap();
2656
2657 if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2658 assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2659 } else {
2660 panic!("Expected LoadBalance step");
2661 }
2662 }
2663
2664 #[test]
2665 fn test_load_balance_builder_steps_collected() {
2666 let definition = RouteBuilder::from("timer:tick")
2667 .route_id("test-route")
2668 .load_balance()
2669 .set_header("lb", Value::Bool(true))
2670 .to("mock:a")
2671 .end_load_balance()
2672 .build()
2673 .unwrap();
2674
2675 match &definition.steps()[0] {
2676 BuilderStep::LoadBalance { steps, .. } => {
2677 assert_eq!(steps.len(), 2); }
2679 other => panic!("Expected LoadBalance, got {:?}", other),
2680 }
2681 }
2682
2683 #[test]
2684 fn test_load_balance_step_after_load_balance() {
2685 let definition = RouteBuilder::from("timer:tick")
2687 .route_id("test-route")
2688 .load_balance()
2689 .to("mock:inner")
2690 .end_load_balance()
2691 .to("mock:outer")
2692 .build()
2693 .unwrap();
2694
2695 assert_eq!(definition.steps().len(), 2);
2696 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2697 }
2698
2699 #[test]
2702 fn test_dynamic_router_builder() {
2703 let definition = RouteBuilder::from("timer:tick")
2704 .route_id("test-route")
2705 .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2706 .build()
2707 .unwrap();
2708
2709 assert_eq!(definition.steps().len(), 1);
2710 assert!(matches!(
2711 &definition.steps()[0],
2712 BuilderStep::DynamicRouter { .. }
2713 ));
2714 }
2715
2716 #[test]
2717 fn test_dynamic_router_builder_with_config() {
2718 let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2719 .max_iterations(100)
2720 .cache_size(500);
2721
2722 let definition = RouteBuilder::from("timer:tick")
2723 .route_id("test-route")
2724 .dynamic_router_with_config(config)
2725 .build()
2726 .unwrap();
2727
2728 assert_eq!(definition.steps().len(), 1);
2729 if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2730 assert_eq!(config.max_iterations, 100);
2731 assert_eq!(config.cache_size, 500);
2732 } else {
2733 panic!("Expected DynamicRouter step");
2734 }
2735 }
2736
2737 #[test]
2738 fn test_dynamic_router_step_after_router() {
2739 let definition = RouteBuilder::from("timer:tick")
2741 .route_id("test-route")
2742 .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2743 .to("mock:outer")
2744 .build()
2745 .unwrap();
2746
2747 assert_eq!(definition.steps().len(), 2);
2748 assert!(matches!(
2749 &definition.steps()[0],
2750 BuilderStep::DynamicRouter { .. }
2751 ));
2752 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2753 }
2754
2755 #[test]
2756 fn routing_slip_builder_creates_step() {
2757 use camel_api::RoutingSlipExpression;
2758
2759 let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2760
2761 let route = RouteBuilder::from("direct:start")
2762 .route_id("routing-slip-test")
2763 .routing_slip(expression)
2764 .build()
2765 .unwrap();
2766
2767 assert!(
2768 matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2769 "Expected RoutingSlip step"
2770 );
2771 }
2772
2773 #[test]
2774 fn routing_slip_with_config_builder_creates_step() {
2775 use camel_api::RoutingSlipConfig;
2776
2777 let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2778 .uri_delimiter("|")
2779 .cache_size(50)
2780 .ignore_invalid_endpoints(true);
2781
2782 let route = RouteBuilder::from("direct:start")
2783 .route_id("routing-slip-config-test")
2784 .routing_slip_with_config(config)
2785 .build()
2786 .unwrap();
2787
2788 if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2789 assert_eq!(config.uri_delimiter, "|");
2790 assert_eq!(config.cache_size, 50);
2791 assert!(config.ignore_invalid_endpoints);
2792 } else {
2793 panic!("Expected RoutingSlip step");
2794 }
2795 }
2796
2797 #[test]
2798 fn test_builder_marshal_adds_processor_step() {
2799 let definition = RouteBuilder::from("timer:tick")
2800 .route_id("test-route")
2801 .marshal("json")
2802 .unwrap()
2803 .build()
2804 .unwrap();
2805 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2806 }
2807
2808 #[test]
2809 fn test_builder_unmarshal_adds_processor_step() {
2810 let definition = RouteBuilder::from("timer:tick")
2811 .route_id("test-route")
2812 .unmarshal("json")
2813 .unwrap()
2814 .build()
2815 .unwrap();
2816 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2817 }
2818
2819 #[test]
2820 fn test_builder_stream_cache_adds_processor_step() {
2821 let definition = RouteBuilder::from("timer:tick")
2822 .route_id("test-route")
2823 .stream_cache(1024)
2824 .build()
2825 .unwrap();
2826 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2827 }
2828
2829 #[test]
2830 fn validate_adds_to_step_with_validator_uri() {
2831 let def = RouteBuilder::from("direct:in")
2832 .route_id("test")
2833 .validate("schemas/order.xsd")
2834 .build()
2835 .unwrap();
2836 let steps = def.steps();
2837 assert_eq!(steps.len(), 1);
2838 assert!(
2839 matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2840 "got: {:?}",
2841 steps[0]
2842 );
2843 }
2844
2845 #[test]
2846 fn test_builder_marshal_returns_err_for_unknown_format() {
2847 let result = RouteBuilder::from("timer:tick")
2848 .route_id("test-route")
2849 .marshal("csv");
2850 let err = match result {
2851 Err(e) => e,
2852 Ok(_) => panic!("marshal with unknown format should return Err"),
2853 };
2854 let msg = err.to_string();
2855 assert!(
2856 msg.contains("unknown data format"),
2857 "error should mention unknown format, got: {msg}"
2858 );
2859 assert!(
2860 msg.contains("csv"),
2861 "error should mention format name, got: {msg}"
2862 );
2863 }
2864
2865 #[test]
2866 fn test_builder_unmarshal_returns_err_for_unknown_format() {
2867 let result = RouteBuilder::from("timer:tick")
2868 .route_id("test-route")
2869 .unmarshal("csv");
2870 let err = match result {
2871 Err(e) => e,
2872 Ok(_) => panic!("unmarshal with unknown format should return Err"),
2873 };
2874 let msg = err.to_string();
2875 assert!(
2876 msg.contains("unknown data format"),
2877 "error should mention unknown format, got: {msg}"
2878 );
2879 assert!(
2880 msg.contains("csv"),
2881 "error should mention format name, got: {msg}"
2882 );
2883 }
2884
2885 #[test]
2886 fn test_builder_recipient_list_creates_step() {
2887 let route = RouteBuilder::from("direct:start")
2888 .route_id("recipient-list-test")
2889 .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2890 .build()
2891 .unwrap();
2892
2893 assert!(matches!(
2894 &route.steps()[0],
2895 BuilderStep::RecipientList { .. }
2896 ));
2897 }
2898
2899 #[test]
2900 fn test_builder_recipient_list_with_config_creates_step() {
2901 let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2902
2903 let route = RouteBuilder::from("direct:start")
2904 .route_id("recipient-list-config-test")
2905 .recipient_list_with_config(config)
2906 .build()
2907 .unwrap();
2908
2909 assert!(matches!(
2910 &route.steps()[0],
2911 BuilderStep::RecipientList { .. }
2912 ));
2913 }
2914
2915 #[test]
2916 fn test_builder_script_adds_script_step() {
2917 let route = RouteBuilder::from("direct:start")
2918 .route_id("script-test")
2919 .script("rhai", "headers[\"x\"] = \"y\"")
2920 .build()
2921 .unwrap();
2922
2923 assert!(matches!(
2924 &route.steps()[0],
2925 BuilderStep::Script { language, script }
2926 if language == "rhai" && script == "headers[\"x\"] = \"y\""
2927 ));
2928 }
2929
2930 #[test]
2931 fn test_builder_delay_and_delay_with_header_add_steps() {
2932 let route = RouteBuilder::from("direct:start")
2933 .route_id("delay-test")
2934 .delay(Duration::from_millis(250))
2935 .delay_with_header(Duration::from_millis(500), "x-delay")
2936 .build()
2937 .unwrap();
2938
2939 assert_eq!(route.steps().len(), 2);
2940 assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
2941 assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
2942 }
2943
2944 #[test]
2945 fn test_builder_log_and_stop_add_steps_in_order() {
2946 let route = RouteBuilder::from("direct:start")
2947 .route_id("log-stop-test")
2948 .log("hello", LogLevel::Info)
2949 .stop()
2950 .to("mock:after")
2951 .build()
2952 .unwrap();
2953
2954 assert_eq!(route.steps().len(), 3);
2955 assert!(matches!(
2956 &route.steps()[0],
2957 BuilderStep::Log { message, .. } if message == "hello"
2958 ));
2959 assert!(matches!(&route.steps()[1], BuilderStep::Stop));
2960 assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
2961 }
2962
2963 #[test]
2964 fn test_builder_stream_cache_default_adds_processor_step() {
2965 let route = RouteBuilder::from("direct:start")
2966 .route_id("stream-cache-default-test")
2967 .stream_cache_default()
2968 .build()
2969 .unwrap();
2970
2971 assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
2972 }
2973
2974 #[test]
2975 fn test_validate_preserves_existing_validator_prefix() {
2976 let route = RouteBuilder::from("direct:in")
2977 .route_id("validate-prefix-test")
2978 .validate("validator:schemas/order.xsd")
2979 .build()
2980 .unwrap();
2981
2982 assert!(matches!(
2983 &route.steps()[0],
2984 BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
2985 ));
2986 }
2987
2988 #[test]
2989 fn test_load_balance_builder_weighted_failover_parallel_config() {
2990 let route = RouteBuilder::from("direct:start")
2991 .route_id("lb-weighted-failover-parallel")
2992 .load_balance()
2993 .weighted(vec![
2994 ("direct:a".to_string(), 3),
2995 ("direct:b".to_string(), 1),
2996 ])
2997 .failover()
2998 .parallel(true)
2999 .to("mock:result")
3000 .end_load_balance()
3001 .build()
3002 .unwrap();
3003
3004 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3005 assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3006 assert!(config.parallel);
3007 } else {
3008 panic!("Expected LoadBalance step");
3009 }
3010 }
3011
3012 #[test]
3013 fn test_multicast_builder_all_config_setters() {
3014 let route = RouteBuilder::from("direct:start")
3015 .route_id("multicast-config-test")
3016 .multicast()
3017 .parallel(true)
3018 .parallel_limit(4)
3019 .stop_on_exception(true)
3020 .timeout(Duration::from_millis(300))
3021 .aggregation(MulticastStrategy::Original)
3022 .to("mock:a")
3023 .end_multicast()
3024 .build()
3025 .unwrap();
3026
3027 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3028 assert!(config.parallel);
3029 assert_eq!(config.parallel_limit, Some(4));
3030 assert!(config.stop_on_exception);
3031 assert_eq!(config.timeout, Some(Duration::from_millis(300)));
3032 assert!(matches!(config.aggregation, MulticastStrategy::Original));
3033 } else {
3034 panic!("Expected Multicast step");
3035 }
3036 }
3037
3038 #[test]
3039 fn test_build_canonical_rejects_unsupported_processor_step() {
3040 let err = RouteBuilder::from("direct:start")
3041 .route_id("canonical-reject")
3042 .set_header("k", Value::String("v".into()))
3043 .build_canonical()
3044 .unwrap_err();
3045
3046 assert!(format!("{err}").contains("does not support step `processor`"));
3047 }
3048
3049 #[test]
3052 fn test_load_balance_builder_weighted_strategy() {
3053 let route = RouteBuilder::from("direct:start")
3054 .route_id("lb-weighted")
3055 .load_balance()
3056 .weighted(vec![
3057 ("direct:a".to_string(), 5),
3058 ("direct:b".to_string(), 2),
3059 ("direct:c".to_string(), 1),
3060 ])
3061 .to("mock:result")
3062 .end_load_balance()
3063 .build()
3064 .unwrap();
3065
3066 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3067 assert!(matches!(config.strategy, LoadBalanceStrategy::Weighted(_)));
3068 } else {
3069 panic!("Expected LoadBalance step");
3070 }
3071 }
3072
3073 #[test]
3074 fn test_load_balance_builder_failover_strategy() {
3075 let route = RouteBuilder::from("direct:start")
3076 .route_id("lb-failover")
3077 .load_balance()
3078 .failover()
3079 .to("mock:primary")
3080 .end_load_balance()
3081 .build()
3082 .unwrap();
3083
3084 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3085 assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
3086 assert!(!config.parallel);
3087 } else {
3088 panic!("Expected LoadBalance step");
3089 }
3090 }
3091
3092 #[test]
3093 fn test_load_balance_builder_parallel_false_explicit() {
3094 let route = RouteBuilder::from("direct:start")
3095 .route_id("lb-parallel-false")
3096 .load_balance()
3097 .round_robin()
3098 .parallel(false)
3099 .to("mock:result")
3100 .end_load_balance()
3101 .build()
3102 .unwrap();
3103
3104 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3105 assert!(!config.parallel);
3106 } else {
3107 panic!("Expected LoadBalance step");
3108 }
3109 }
3110
3111 #[test]
3114 fn test_filter_in_split_builder_typestate() {
3115 use camel_api::splitter::{SplitterConfig, split_body_lines};
3116
3117 let definition = RouteBuilder::from("timer:test")
3118 .route_id("filter-in-split")
3119 .split(SplitterConfig::new(split_body_lines()))
3120 .filter(|_ex| true)
3121 .to("mock:filtered")
3122 .end_filter()
3123 .end_split()
3124 .build()
3125 .unwrap();
3126
3127 assert_eq!(definition.steps().len(), 1);
3128 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3129 assert_eq!(steps.len(), 1);
3130 assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3131 } else {
3132 panic!("Expected Split step");
3133 }
3134 }
3135
3136 #[test]
3137 fn test_filter_in_split_builder_multiple_steps() {
3138 use camel_api::splitter::{SplitterConfig, split_body_lines};
3139
3140 let definition = RouteBuilder::from("timer:test")
3141 .route_id("filter-in-split-multi")
3142 .split(SplitterConfig::new(split_body_lines()))
3143 .to("mock:before-filter")
3144 .filter(|_ex| true)
3145 .to("mock:inside-filter")
3146 .end_filter()
3147 .to("mock:after-filter")
3148 .end_split()
3149 .build()
3150 .unwrap();
3151
3152 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3153 assert_eq!(steps.len(), 3);
3155 } else {
3156 panic!("Expected Split step");
3157 }
3158 }
3159
3160 #[test]
3163 fn test_build_canonical_with_circuit_breaker() {
3164 use camel_api::circuit_breaker::CircuitBreakerConfig;
3165
3166 let spec = RouteBuilder::from("direct:start")
3167 .route_id("canonical-cb")
3168 .circuit_breaker(CircuitBreakerConfig::new().failure_threshold(10))
3169 .to("mock:result")
3170 .build_canonical()
3171 .unwrap();
3172
3173 let cb = spec.circuit_breaker.expect("circuit breaker should be set");
3174 assert_eq!(cb.failure_threshold, 10);
3175 }
3176
3177 #[test]
3178 fn test_build_canonical_rejects_custom_split_aggregation() {
3179 use camel_api::splitter::{SplitterConfig, split_body_lines};
3180
3181 let err = RouteBuilder::from("direct:start")
3182 .route_id("canonical-custom-split")
3183 .split(SplitterConfig::new(split_body_lines()).aggregation(
3184 camel_api::splitter::AggregationStrategy::Custom(Arc::new(|_, ex| ex)),
3185 ))
3186 .to("mock:frag")
3187 .end_split()
3188 .build_canonical()
3189 .unwrap_err();
3190
3191 assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3193 }
3194
3195 #[test]
3196 fn test_build_canonical_rejects_custom_aggregate_strategy() {
3197 let err = RouteBuilder::from("direct:start")
3198 .route_id("canonical-custom-agg")
3199 .aggregate(
3200 AggregatorConfig::correlate_by("key")
3201 .complete_when_size(2)
3202 .strategy(AggregationStrategy::Custom(Arc::new(|_, ex| ex)))
3203 .build()
3204 .unwrap(),
3205 )
3206 .build_canonical()
3207 .unwrap_err();
3208
3209 assert!(format!("{err}").contains("custom aggregate strategy"));
3210 }
3211
3212 #[test]
3213 fn test_build_canonical_rejects_fn_correlation_strategy() {
3214 let err = RouteBuilder::from("direct:start")
3215 .route_id("canonical-fn-corr")
3216 .aggregate(AggregatorConfig {
3217 header_name: "key".to_string(),
3218 completion: CompletionMode::Single(CompletionCondition::Size(1)),
3219 correlation: CorrelationStrategy::Fn(Arc::new(|_| Some("key".to_string()))),
3220 strategy: AggregationStrategy::CollectAll,
3221 max_buckets: None,
3222 bucket_ttl: None,
3223 force_completion_on_stop: false,
3224 discard_on_timeout: false,
3225 })
3226 .build_canonical()
3227 .unwrap_err();
3228
3229 assert!(format!("{err}").contains("Fn correlation strategy"));
3230 }
3231
3232 #[test]
3233 fn test_build_canonical_rejects_predicate_completion() {
3234 let err = RouteBuilder::from("direct:start")
3235 .route_id("canonical-pred-completion")
3236 .aggregate(AggregatorConfig {
3237 header_name: "key".to_string(),
3238 completion: CompletionMode::Single(CompletionCondition::Predicate(Arc::new(
3239 |_| false,
3240 ))),
3241 correlation: CorrelationStrategy::HeaderName("key".to_string()),
3242 strategy: AggregationStrategy::CollectAll,
3243 max_buckets: None,
3244 bucket_ttl: None,
3245 force_completion_on_stop: false,
3246 discard_on_timeout: false,
3247 })
3248 .build_canonical()
3249 .unwrap_err();
3250
3251 assert!(format!("{err}").contains("predicate completion"));
3252 }
3253
3254 #[test]
3255 fn test_build_canonical_with_expression_correlation() {
3256 let spec = RouteBuilder::from("direct:start")
3257 .route_id("canonical-expr-corr")
3258 .aggregate(AggregatorConfig {
3259 header_name: "key".to_string(),
3260 completion: CompletionMode::Single(CompletionCondition::Size(1)),
3261 correlation: CorrelationStrategy::Expression {
3262 expr: "header.key".to_string(),
3263 language: "simple".to_string(),
3264 },
3265 strategy: AggregationStrategy::CollectAll,
3266 max_buckets: None,
3267 bucket_ttl: None,
3268 force_completion_on_stop: false,
3269 discard_on_timeout: false,
3270 })
3271 .build_canonical()
3272 .unwrap();
3273
3274 assert!(spec.steps.iter().any(|s| matches!(s, CanonicalStepSpec::Aggregate(a) if a.correlation_key == Some("header.key".to_string()))));
3275 }
3276
3277 #[test]
3278 fn test_build_canonical_split_rejected_with_closure_expression() {
3279 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
3280
3281 let err = RouteBuilder::from("direct:start")
3283 .route_id("canonical-split-last")
3284 .split(
3285 SplitterConfig::new(split_body_lines()).aggregation(AggregationStrategy::LastWins),
3286 )
3287 .to("mock:frag")
3288 .end_split()
3289 .build_canonical()
3290 .unwrap_err();
3291
3292 assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3293 }
3294
3295 #[test]
3298 fn test_on_exception_full_chain_retry_backoff_jitter_handled_by() {
3299 let definition = RouteBuilder::from("direct:start")
3300 .route_id("on-exception-full")
3301 .dead_letter_channel("log:dlc")
3302 .on_exception(|e| matches!(e, CamelError::Io(_)))
3303 .retry(5)
3304 .with_backoff(Duration::from_millis(10), 2.0, Duration::from_millis(500))
3305 .with_jitter(0.3)
3306 .handled_by("log:io-handler")
3307 .end_on_exception()
3308 .to("mock:out")
3309 .build()
3310 .unwrap();
3311
3312 let cfg = definition
3313 .error_handler_config()
3314 .expect("error handler should be set");
3315 assert_eq!(cfg.policies.len(), 1);
3316 let policy = &cfg.policies[0];
3317 let retry = policy.retry.as_ref().expect("retry should be set");
3318 assert_eq!(retry.max_attempts, 5);
3319 assert_eq!(retry.initial_delay, Duration::from_millis(10));
3320 assert_eq!(retry.multiplier, 2.0);
3321 assert_eq!(retry.max_delay, Duration::from_millis(500));
3322 assert!((retry.jitter_factor - 0.3).abs() < f64::EPSILON);
3323 assert_eq!(policy.handled_by.as_deref(), Some("log:io-handler"));
3324 }
3325
3326 #[test]
3327 fn test_on_exception_jitter_clamped_to_valid_range() {
3328 let definition = RouteBuilder::from("direct:start")
3329 .route_id("jitter-clamp")
3330 .on_exception(|_e| true)
3331 .retry(1)
3332 .with_jitter(5.0)
3333 .end_on_exception()
3334 .to("mock:out")
3335 .build()
3336 .unwrap();
3337
3338 let cfg = definition.error_handler_config().unwrap();
3339 let retry = cfg.policies[0].retry.as_ref().unwrap();
3340 assert!((retry.jitter_factor - 1.0).abs() < f64::EPSILON);
3341 }
3342
3343 #[test]
3346 fn test_builder_process_fn_adds_processor_step() {
3347 use camel_api::BoxProcessorExt;
3348 let processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
3349 let definition = RouteBuilder::from("timer:tick")
3350 .route_id("process-fn-test")
3351 .process_fn(processor)
3352 .build()
3353 .unwrap();
3354
3355 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3356 }
3357
3358 #[test]
3359 fn test_builder_convert_body_to_adds_processor_step() {
3360 let definition = RouteBuilder::from("timer:tick")
3361 .route_id("convert-body-test")
3362 .convert_body_to(BodyType::Json)
3363 .build()
3364 .unwrap();
3365
3366 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3367 }
3368
3369 #[test]
3370 fn test_builder_bean_adds_bean_step() {
3371 let definition = RouteBuilder::from("timer:tick")
3372 .route_id("bean-test")
3373 .bean("myBean", "process")
3374 .build()
3375 .unwrap();
3376
3377 assert!(
3378 matches!(&definition.steps()[0], BuilderStep::Bean { name, method }
3379 if name == "myBean" && method == "process")
3380 );
3381 }
3382
3383 #[test]
3386 fn test_throttle_builder_delay_strategy() {
3387 let definition = RouteBuilder::from("timer:tick")
3388 .route_id("throttle-delay")
3389 .throttle(10, Duration::from_secs(1))
3390 .strategy(ThrottleStrategy::Delay)
3391 .to("mock:result")
3392 .end_throttle()
3393 .build()
3394 .unwrap();
3395
3396 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3397 assert_eq!(config.strategy, ThrottleStrategy::Delay);
3398 } else {
3399 panic!("Expected Throttle step");
3400 }
3401 }
3402
3403 #[test]
3404 fn test_throttle_builder_drop_strategy() {
3405 let definition = RouteBuilder::from("timer:tick")
3406 .route_id("throttle-drop")
3407 .throttle(10, Duration::from_secs(1))
3408 .strategy(ThrottleStrategy::Drop)
3409 .to("mock:result")
3410 .end_throttle()
3411 .build()
3412 .unwrap();
3413
3414 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3415 assert_eq!(config.strategy, ThrottleStrategy::Drop);
3416 } else {
3417 panic!("Expected Throttle step");
3418 }
3419 }
3420
3421 #[test]
3424 fn test_nested_loop_while_builder() {
3425 use camel_api::loop_eip::LoopMode;
3426
3427 let def = RouteBuilder::from("direct:start")
3428 .route_id("nested-loop-while")
3429 .loop_count(2)
3430 .to("mock:outer")
3431 .loop_while(|_ex| true)
3432 .to("mock:inner")
3433 .end_loop()
3434 .end_loop()
3435 .build()
3436 .unwrap();
3437
3438 assert_eq!(def.steps().len(), 1);
3439 if let BuilderStep::Loop { steps, .. } = &def.steps()[0] {
3440 assert_eq!(steps.len(), 2);
3441 if let BuilderStep::Loop { config, .. } = &steps[1] {
3442 assert!(matches!(config.mode, LoopMode::While(_)));
3443 } else {
3444 panic!("Expected inner Loop step");
3445 }
3446 } else {
3447 panic!("Expected outer Loop step");
3448 }
3449 }
3450
3451 #[test]
3454 fn test_choice_builder_multiple_whens_with_otherwise() {
3455 let definition = RouteBuilder::from("timer:tick")
3456 .route_id("choice-multi-otherwise")
3457 .choice()
3458 .when(|ex: &Exchange| ex.input.header("a").is_some())
3459 .to("mock:a")
3460 .end_when()
3461 .when(|ex: &Exchange| ex.input.header("b").is_some())
3462 .to("mock:b")
3463 .end_when()
3464 .when(|ex: &Exchange| ex.input.header("c").is_some())
3465 .to("mock:c")
3466 .end_when()
3467 .otherwise()
3468 .to("mock:fallback")
3469 .end_otherwise()
3470 .end_choice()
3471 .build()
3472 .unwrap();
3473
3474 if let BuilderStep::Choice { whens, otherwise } = &definition.steps()[0] {
3475 assert_eq!(whens.len(), 3);
3476 assert!(otherwise.is_some());
3477 assert_eq!(otherwise.as_ref().unwrap().len(), 1);
3478 } else {
3479 panic!("Expected Choice step");
3480 }
3481 }
3482
3483 #[test]
3486 fn test_multicast_builder_parallel_only() {
3487 let route = RouteBuilder::from("direct:start")
3488 .route_id("multicast-parallel")
3489 .multicast()
3490 .parallel(true)
3491 .to("mock:a")
3492 .end_multicast()
3493 .build()
3494 .unwrap();
3495
3496 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3497 assert!(config.parallel);
3498 assert_eq!(config.parallel_limit, None);
3499 } else {
3500 panic!("Expected Multicast step");
3501 }
3502 }
3503
3504 #[test]
3505 fn test_multicast_builder_timeout_only() {
3506 let route = RouteBuilder::from("direct:start")
3507 .route_id("multicast-timeout")
3508 .multicast()
3509 .timeout(Duration::from_secs(5))
3510 .to("mock:a")
3511 .end_multicast()
3512 .build()
3513 .unwrap();
3514
3515 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3516 assert_eq!(config.timeout, Some(Duration::from_secs(5)));
3517 } else {
3518 panic!("Expected Multicast step");
3519 }
3520 }
3521
3522 #[test]
3523 fn test_multicast_builder_aggregation_collect_all() {
3524 let route = RouteBuilder::from("direct:start")
3525 .route_id("multicast-collect")
3526 .multicast()
3527 .aggregation(MulticastStrategy::CollectAll)
3528 .to("mock:a")
3529 .end_multicast()
3530 .build()
3531 .unwrap();
3532
3533 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3534 assert!(matches!(config.aggregation, MulticastStrategy::CollectAll));
3535 } else {
3536 panic!("Expected Multicast step");
3537 }
3538 }
3539
3540 #[test]
3543 fn test_build_canonical_aggregate_any_completion_mode() {
3544 let spec = RouteBuilder::from("direct:start")
3545 .route_id("canonical-any-completion")
3546 .aggregate(
3547 AggregatorConfig::correlate_by("key")
3548 .complete_on_size_or_timeout(10, Duration::from_secs(30))
3549 .build()
3550 .unwrap(),
3551 )
3552 .build_canonical()
3553 .unwrap();
3554
3555 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3556 assert_eq!(agg.completion_size, Some(10));
3557 assert_eq!(agg.completion_timeout_ms, Some(30_000));
3558 } else {
3559 panic!("Expected Aggregate step");
3560 }
3561 }
3562
3563 #[test]
3564 fn test_build_canonical_aggregate_timeout_completion() {
3565 let spec = RouteBuilder::from("direct:start")
3566 .route_id("canonical-timeout-completion")
3567 .aggregate(
3568 AggregatorConfig::correlate_by("key")
3569 .complete_on_timeout(Duration::from_millis(500))
3570 .build()
3571 .unwrap(),
3572 )
3573 .build_canonical()
3574 .unwrap();
3575
3576 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3577 assert_eq!(agg.completion_size, None);
3578 assert_eq!(agg.completion_timeout_ms, Some(500));
3579 } else {
3580 panic!("Expected Aggregate step");
3581 }
3582 }
3583
3584 #[test]
3587 fn test_build_canonical_aggregate_discard_on_timeout() {
3588 use camel_api::aggregator::AggregatorConfig;
3589
3590 let spec = RouteBuilder::from("direct:start")
3591 .route_id("canonical-discard-timeout")
3592 .aggregate(
3593 AggregatorConfig::correlate_by("key")
3594 .complete_when_size(1)
3595 .discard_on_timeout(true)
3596 .build()
3597 .unwrap(),
3598 )
3599 .build_canonical()
3600 .unwrap();
3601
3602 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3603 assert_eq!(agg.discard_on_timeout, Some(true));
3604 } else {
3605 panic!("Expected Aggregate step");
3606 }
3607 }
3608
3609 #[test]
3610 fn test_build_canonical_aggregate_force_completion_on_stop() {
3611 use camel_api::aggregator::AggregatorConfig;
3612
3613 let spec = RouteBuilder::from("direct:start")
3614 .route_id("canonical-force-stop")
3615 .aggregate(
3616 AggregatorConfig::correlate_by("key")
3617 .complete_when_size(1)
3618 .force_completion_on_stop(true)
3619 .build()
3620 .unwrap(),
3621 )
3622 .build_canonical()
3623 .unwrap();
3624
3625 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3626 assert_eq!(agg.force_completion_on_stop, Some(true));
3627 } else {
3628 panic!("Expected Aggregate step");
3629 }
3630 }
3631
3632 #[test]
3635 fn test_build_canonical_aggregate_max_buckets_and_ttl() {
3636 use camel_api::aggregator::AggregatorConfig;
3637
3638 let spec = RouteBuilder::from("direct:start")
3639 .route_id("canonical-buckets-ttl")
3640 .aggregate(
3641 AggregatorConfig::correlate_by("key")
3642 .complete_when_size(1)
3643 .max_buckets(100)
3644 .bucket_ttl(Duration::from_secs(60))
3645 .build()
3646 .unwrap(),
3647 )
3648 .build_canonical()
3649 .unwrap();
3650
3651 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3652 assert_eq!(agg.max_buckets, Some(100));
3653 assert_eq!(agg.bucket_ttl_ms, Some(60_000));
3654 } else {
3655 panic!("Expected Aggregate step");
3656 }
3657 }
3658
3659 #[test]
3662 fn test_split_builder_with_filter_inside() {
3663 use camel_api::splitter::{SplitterConfig, split_body_lines};
3664
3665 let definition = RouteBuilder::from("timer:test")
3666 .route_id("split-with-filter")
3667 .split(SplitterConfig::new(split_body_lines()))
3668 .filter(|_ex| true)
3669 .to("mock:filtered-frag")
3670 .end_filter()
3671 .end_split()
3672 .build()
3673 .unwrap();
3674
3675 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3676 assert_eq!(steps.len(), 1);
3677 assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3678 } else {
3679 panic!("Expected Split step");
3680 }
3681 }
3682
3683 #[test]
3686 fn test_wire_tap_multiple_taps() {
3687 let definition = RouteBuilder::from("timer:tick")
3688 .route_id("multi-wire-tap")
3689 .wire_tap("mock:tap1")
3690 .wire_tap("mock:tap2")
3691 .to("mock:result")
3692 .build()
3693 .unwrap();
3694
3695 assert_eq!(definition.steps().len(), 3);
3696 assert!(
3697 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap1")
3698 );
3699 assert!(
3700 matches!(&definition.steps()[1], BuilderStep::WireTap { uri } if uri == "mock:tap2")
3701 );
3702 }
3703
3704 #[test]
3707 fn test_builder_shorthand_then_explicit_mixed_mode() {
3708 let result = RouteBuilder::from("direct:start")
3709 .route_id("mixed-mode-2")
3710 .dead_letter_channel("log:dlc")
3711 .error_handler(ErrorHandlerConfig::log_only())
3712 .to("mock:out")
3713 .build();
3714
3715 let err = result.err().expect("mixed mode should fail");
3716 assert!(format!("{err}").contains("mixed error handler modes"));
3717 }
3718
3719 #[test]
3722 fn test_build_canonical_empty_from_uri_errors() {
3723 let result = RouteBuilder::from("").route_id("test").build_canonical();
3724 assert!(result.is_err());
3725 }
3726
3727 #[test]
3728 fn test_build_canonical_missing_route_id_errors() {
3729 let result = RouteBuilder::from("direct:start").build_canonical();
3730 assert!(result.is_err());
3731 let err = result.unwrap_err().to_string();
3732 assert!(err.contains("route_id"));
3733 }
3734
3735 #[test]
3738 fn test_split_builder_with_aggregate_inside() {
3739 use camel_api::aggregator::AggregatorConfig;
3740 use camel_api::splitter::{SplitterConfig, split_body_lines};
3741
3742 let definition = RouteBuilder::from("timer:test")
3743 .route_id("split-agg")
3744 .split(SplitterConfig::new(split_body_lines()))
3745 .aggregate(
3746 AggregatorConfig::correlate_by("frag-key")
3747 .complete_when_size(3)
3748 .build()
3749 .unwrap(),
3750 )
3751 .end_split()
3752 .build()
3753 .unwrap();
3754
3755 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3756 assert_eq!(steps.len(), 1);
3757 assert!(matches!(&steps[0], BuilderStep::Aggregate { .. }));
3758 } else {
3759 panic!("Expected Split step");
3760 }
3761 }
3762
3763 #[test]
3766 fn test_throttle_builder_with_steps_inside() {
3767 let definition = RouteBuilder::from("timer:tick")
3768 .route_id("throttle-steps")
3769 .throttle(10, Duration::from_secs(1))
3770 .set_header("throttled", Value::Bool(true))
3771 .to("mock:throttled")
3772 .end_throttle()
3773 .build()
3774 .unwrap();
3775
3776 if let BuilderStep::Throttle { steps, .. } = &definition.steps()[0] {
3777 assert_eq!(steps.len(), 2);
3778 } else {
3779 panic!("Expected Throttle step");
3780 }
3781 }
3782
3783 #[test]
3786 fn test_load_balance_builder_with_steps_inside() {
3787 let definition = RouteBuilder::from("timer:tick")
3788 .route_id("lb-steps")
3789 .load_balance()
3790 .round_robin()
3791 .set_header("lb", Value::Bool(true))
3792 .to("mock:lb")
3793 .end_load_balance()
3794 .build()
3795 .unwrap();
3796
3797 if let BuilderStep::LoadBalance { steps, .. } = &definition.steps()[0] {
3798 assert_eq!(steps.len(), 2);
3799 } else {
3800 panic!("Expected LoadBalance step");
3801 }
3802 }
3803
3804 #[test]
3807 fn test_multicast_builder_with_steps_inside() {
3808 let definition = RouteBuilder::from("timer:tick")
3809 .route_id("multicast-steps")
3810 .multicast()
3811 .set_header("mc", Value::Bool(true))
3812 .to("mock:multicast")
3813 .end_multicast()
3814 .build()
3815 .unwrap();
3816
3817 if let BuilderStep::Multicast { steps, .. } = &definition.steps()[0] {
3818 assert_eq!(steps.len(), 2);
3819 } else {
3820 panic!("Expected Multicast step");
3821 }
3822 }
3823
3824 #[test]
3827 fn test_loop_builder_with_steps_inside() {
3828 let definition = RouteBuilder::from("timer:tick")
3829 .route_id("loop-steps")
3830 .loop_count(3)
3831 .set_header("loop", Value::Bool(true))
3832 .to("mock:loop")
3833 .end_loop()
3834 .build()
3835 .unwrap();
3836
3837 if let BuilderStep::Loop { steps, .. } = &definition.steps()[0] {
3838 assert_eq!(steps.len(), 2);
3839 } else {
3840 panic!("Expected Loop step");
3841 }
3842 }
3843
3844 #[test]
3847 fn test_build_canonical_rejects_loop_step() {
3848 let err = RouteBuilder::from("direct:start")
3849 .route_id("canonical-loop")
3850 .loop_count(3)
3851 .to("mock:loop")
3852 .end_loop()
3853 .build_canonical()
3854 .unwrap_err();
3855
3856 assert!(format!("{err}").contains("does not support step `loop`"));
3857 }
3858
3859 #[test]
3860 fn test_build_canonical_rejects_multicast_step() {
3861 let err = RouteBuilder::from("direct:start")
3862 .route_id("canonical-multicast")
3863 .multicast()
3864 .to("mock:a")
3865 .end_multicast()
3866 .build_canonical()
3867 .unwrap_err();
3868
3869 assert!(format!("{err}").contains("does not support step `multicast`"));
3870 }
3871
3872 #[test]
3873 fn test_build_canonical_rejects_throttle_step() {
3874 let err = RouteBuilder::from("direct:start")
3875 .route_id("canonical-throttle")
3876 .throttle(10, Duration::from_secs(1))
3877 .to("mock:result")
3878 .end_throttle()
3879 .build_canonical()
3880 .unwrap_err();
3881
3882 assert!(format!("{err}").contains("does not support step `throttle`"));
3883 }
3884
3885 #[test]
3886 fn test_build_canonical_rejects_load_balancer_step() {
3887 let err = RouteBuilder::from("direct:start")
3888 .route_id("canonical-lb")
3889 .load_balance()
3890 .round_robin()
3891 .to("mock:result")
3892 .end_load_balance()
3893 .build_canonical()
3894 .unwrap_err();
3895
3896 assert!(format!("{err}").contains("does not support step `load_balancer`"));
3897 }
3898
3899 #[test]
3900 fn test_build_canonical_rejects_bean_step() {
3901 let err = RouteBuilder::from("direct:start")
3902 .route_id("canonical-bean")
3903 .bean("myBean", "process")
3904 .build_canonical()
3905 .unwrap_err();
3906
3907 assert!(format!("{err}").contains("does not support step `bean`"));
3908 }
3909
3910 #[test]
3911 fn test_build_canonical_rejects_script_step() {
3912 let err = RouteBuilder::from("direct:start")
3913 .route_id("canonical-script")
3914 .script("rhai", "x = 1")
3915 .build_canonical()
3916 .unwrap_err();
3917
3918 assert!(format!("{err}").contains("does not support step `script`"));
3919 }
3920
3921 #[test]
3922 fn test_build_canonical_accepts_delay_step() {
3923 let spec = RouteBuilder::from("direct:start")
3924 .route_id("canonical-delay")
3925 .delay(Duration::from_millis(100))
3926 .build_canonical()
3927 .unwrap();
3928
3929 assert!(
3930 spec.steps.iter().any(
3931 |s| matches!(s, CanonicalStepSpec::Delay { delay_ms, .. } if *delay_ms == 100)
3932 )
3933 );
3934 }
3935
3936 #[test]
3937 fn test_build_canonical_accepts_wire_tap_step() {
3938 let spec = RouteBuilder::from("direct:start")
3939 .route_id("canonical-wiretap")
3940 .wire_tap("mock:tap")
3941 .build_canonical()
3942 .unwrap();
3943
3944 assert!(
3945 spec.steps
3946 .iter()
3947 .any(|s| matches!(s, CanonicalStepSpec::WireTap { uri } if uri == "mock:tap"))
3948 );
3949 }
3950
3951 #[test]
3952 fn test_build_canonical_rejects_dynamic_router_step() {
3953 let err = RouteBuilder::from("direct:start")
3954 .route_id("canonical-dyn-router")
3955 .dynamic_router(Arc::new(|_| Some("mock:a".to_string())))
3956 .build_canonical()
3957 .unwrap_err();
3958
3959 assert!(format!("{err}").contains("does not support step `dynamic_router`"));
3960 }
3961
3962 #[test]
3963 fn test_build_canonical_rejects_routing_slip_step() {
3964 let err = RouteBuilder::from("direct:start")
3965 .route_id("canonical-routing-slip")
3966 .routing_slip(Arc::new(|_| Some("mock:a".to_string())))
3967 .build_canonical()
3968 .unwrap_err();
3969
3970 assert!(format!("{err}").contains("does not support step `routing_slip`"));
3971 }
3972
3973 #[test]
3974 fn test_build_canonical_rejects_recipient_list_step() {
3975 let err = RouteBuilder::from("direct:start")
3976 .route_id("canonical-recipient")
3977 .recipient_list(Arc::new(|_| "mock:a".to_string()))
3978 .build_canonical()
3979 .unwrap_err();
3980
3981 assert!(format!("{err}").contains("does not support step `recipient_list`"));
3982 }
3983
3984 #[test]
3987 fn test_build_canonical_rejects_any_mode_with_predicate() {
3988 let err = RouteBuilder::from("direct:start")
3989 .route_id("canonical-any-pred")
3990 .aggregate(AggregatorConfig {
3991 header_name: "key".to_string(),
3992 completion: CompletionMode::Any(vec![
3993 CompletionCondition::Size(5),
3994 CompletionCondition::Predicate(Arc::new(|_| false)),
3995 ]),
3996 correlation: CorrelationStrategy::HeaderName("key".to_string()),
3997 strategy: AggregationStrategy::CollectAll,
3998 max_buckets: None,
3999 bucket_ttl: None,
4000 force_completion_on_stop: false,
4001 discard_on_timeout: false,
4002 })
4003 .build_canonical()
4004 .unwrap_err();
4005
4006 assert!(format!("{err}").contains("predicate completion"));
4007 }
4008
4009 #[test]
4012 fn test_builder_validation_missing_from_uri() {
4013 let result = RouteBuilder::from("")
4014 .route_id("missing-uri-route")
4015 .to("log:info")
4016 .build();
4017 assert!(result.is_err(), "empty from URI should fail validation");
4018 let err = result.err().unwrap().to_string();
4019 assert!(
4020 err.contains("'from'") || err.contains("URI"),
4021 "error should mention from/URI, got: {err}"
4022 );
4023 }
4024
4025 #[test]
4026 fn test_builder_validation_invalid_step_uri_scheme() {
4027 let result = RouteBuilder::from("timer:tick")
4028 .route_id("bad-step-route")
4029 .to("not-a-valid-uri") .build();
4031 assert!(
4034 result.is_ok(),
4035 "builder should accept opaque step URIs; resolution happens later"
4036 );
4037 }
4038
4039 #[test]
4042 fn test_builder_duplicate_route_ids_produce_identical_definitions() {
4043 let route1 = RouteBuilder::from("direct:a")
4046 .route_id("dup-route")
4047 .to("mock:out")
4048 .build();
4049 let route2 = RouteBuilder::from("direct:b")
4050 .route_id("dup-route")
4051 .to("mock:out")
4052 .build();
4053
4054 assert!(route1.is_ok());
4055 assert!(route2.is_ok());
4056 assert_eq!(route1.unwrap().route_id(), route2.unwrap().route_id());
4057 }
4058}