1use camel_api::DelayConfig;
7use camel_api::aggregator::{
8 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
9};
10use camel_api::body::Body;
11use camel_api::body_converter::BodyType;
12use camel_api::circuit_breaker::CircuitBreakerConfig;
13use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
14use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
15use camel_api::load_balancer::LoadBalancerConfig;
16use camel_api::loop_eip::{LoopConfig, LoopMode};
17use camel_api::multicast::{MulticastConfig, MulticastStrategy};
18use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
19use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
20use camel_api::splitter::SplitterConfig;
21use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
22use camel_api::{
23 BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
24 ProcessorFn, Value,
25 runtime::{
26 CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
27 CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
28 CanonicalWhenSpec,
29 },
30};
31use camel_component_api::ConcurrencyModel;
32use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
33use camel_processor::{
34 ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
35 StreamCacheService, UnmarshalService, builtin_data_format,
36};
37
38pub trait StepAccumulator: Sized {
44 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
45
46 fn to(mut self, endpoint: impl Into<String>) -> Self {
47 self.steps_mut().push(BuilderStep::To(endpoint.into()));
48 self
49 }
50
51 fn process<F, Fut>(mut self, f: F) -> Self
52 where
53 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
54 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
55 {
56 let svc = ProcessorFn::new(f);
57 self.steps_mut()
58 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
59 self
60 }
61
62 fn process_fn(mut self, processor: BoxProcessor) -> Self {
63 self.steps_mut().push(BuilderStep::Processor(processor));
64 self
65 }
66
67 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
68 let svc = SetHeader::new(IdentityProcessor, key, value);
69 self.steps_mut()
70 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
71 self
72 }
73
74 fn map_body<F>(mut self, mapper: F) -> Self
75 where
76 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
77 {
78 let svc = MapBody::new(IdentityProcessor, mapper);
79 self.steps_mut()
80 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
81 self
82 }
83
84 fn set_body<B>(mut self, body: B) -> Self
85 where
86 B: Into<Body> + Clone + Send + Sync + 'static,
87 {
88 let body: Body = body.into();
89 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
90 self.steps_mut()
91 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
92 self
93 }
94
95 fn transform<B>(self, body: B) -> Self
100 where
101 B: Into<Body> + Clone + Send + Sync + 'static,
102 {
103 self.set_body(body)
104 }
105
106 fn set_body_fn<F>(mut self, expr: F) -> Self
107 where
108 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
109 {
110 let svc = SetBody::new(IdentityProcessor, expr);
111 self.steps_mut()
112 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
113 self
114 }
115
116 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
117 where
118 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
119 {
120 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
121 self.steps_mut()
122 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
123 self
124 }
125
126 fn aggregate(mut self, config: AggregatorConfig) -> Self {
127 self.steps_mut().push(BuilderStep::Aggregate { config });
128 self
129 }
130
131 fn stop(mut self) -> Self {
137 self.steps_mut().push(BuilderStep::Stop);
138 self
139 }
140
141 fn delay(mut self, duration: std::time::Duration) -> Self {
142 self.steps_mut().push(BuilderStep::Delay {
143 config: DelayConfig::from_duration(duration),
144 });
145 self
146 }
147
148 fn delay_with_header(
149 mut self,
150 duration: std::time::Duration,
151 header: impl Into<String>,
152 ) -> Self {
153 self.steps_mut().push(BuilderStep::Delay {
154 config: DelayConfig::from_duration_with_header(duration, header),
155 });
156 self
157 }
158
159 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
163 self.steps_mut().push(BuilderStep::Log {
164 level,
165 message: message.into(),
166 });
167 self
168 }
169
170 fn convert_body_to(mut self, target: BodyType) -> Self {
182 let svc = ConvertBodyTo::new(IdentityProcessor, target);
183 self.steps_mut()
184 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
185 self
186 }
187
188 fn stream_cache(mut self, threshold: usize) -> Self {
189 let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
190 let svc = StreamCacheService::new(IdentityProcessor, config);
191 self.steps_mut()
192 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
193 self
194 }
195
196 fn stream_cache_default(self) -> Self {
200 self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
201 }
202
203 fn marshal(mut self, format: impl Into<String>) -> Self {
213 let name = format.into();
214 let df =
215 builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
216 let svc = MarshalService::new(IdentityProcessor, df);
217 self.steps_mut()
218 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
219 self
220 }
221
222 fn unmarshal(mut self, format: impl Into<String>) -> Self {
232 let name = format.into();
233 let df =
234 builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
235 let svc = UnmarshalService::new(IdentityProcessor, df);
236 self.steps_mut()
237 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
238 self
239 }
240
241 fn validate(mut self, schema_path: impl Into<String>) -> Self {
250 let path = schema_path.into();
251 let uri = if path.starts_with("validator:") {
252 path
253 } else {
254 format!("validator:{path}")
255 };
256 self.steps_mut().push(BuilderStep::To(uri));
257 self
258 }
259
260 fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
271 self.steps_mut().push(BuilderStep::Script {
272 language: language.into(),
273 script: script.into(),
274 });
275 self
276 }
277
278 fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
279 self.steps_mut().push(BuilderStep::Bean {
280 name: name.into(),
281 method: method.into(),
282 });
283 self
284 }
285}
286
287pub struct RouteBuilder {
299 from_uri: String,
300 steps: Vec<BuilderStep>,
301 error_handler: Option<ErrorHandlerConfig>,
302 error_handler_mode: ErrorHandlerMode,
303 circuit_breaker_config: Option<CircuitBreakerConfig>,
304 concurrency: Option<ConcurrencyModel>,
305 route_id: Option<String>,
306 auto_startup: Option<bool>,
307 startup_order: Option<i32>,
308}
309
310#[derive(Default)]
311enum ErrorHandlerMode {
312 #[default]
313 None,
314 ExplicitConfig,
315 Shorthand {
316 dlc_uri: Option<String>,
317 specs: Vec<OnExceptionSpec>,
318 },
319 Mixed,
320}
321
322#[derive(Clone)]
323struct OnExceptionSpec {
324 matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
325 retry: Option<RedeliveryPolicy>,
326 handled_by: Option<String>,
327}
328
329impl RouteBuilder {
330 pub fn from(endpoint: &str) -> Self {
332 Self {
333 from_uri: endpoint.to_string(),
334 steps: Vec::new(),
335 error_handler: None,
336 error_handler_mode: ErrorHandlerMode::None,
337 circuit_breaker_config: None,
338 concurrency: None,
339 route_id: None,
340 auto_startup: None,
341 startup_order: None,
342 }
343 }
344
345 pub fn filter<F>(self, predicate: F) -> FilterBuilder
349 where
350 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
351 {
352 FilterBuilder {
353 parent: self,
354 predicate: std::sync::Arc::new(predicate),
355 steps: vec![],
356 }
357 }
358
359 pub fn choice(self) -> ChoiceBuilder {
365 ChoiceBuilder {
366 parent: self,
367 whens: vec![],
368 _otherwise: None,
369 }
370 }
371
372 pub fn wire_tap(mut self, endpoint: &str) -> Self {
376 self.steps.push(BuilderStep::WireTap {
377 uri: endpoint.to_string(),
378 });
379 self
380 }
381
382 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
384 self.error_handler_mode = match self.error_handler_mode {
385 ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
386 ErrorHandlerMode::ExplicitConfig
387 }
388 ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
389 };
390 self.error_handler = Some(config);
391 self
392 }
393
394 pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
396 let uri = uri.into();
397 self.error_handler_mode = match self.error_handler_mode {
398 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
399 dlc_uri: Some(uri),
400 specs: Vec::new(),
401 },
402 ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
403 dlc_uri: Some(uri),
404 specs,
405 },
406 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
407 };
408 self
409 }
410
411 pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
413 where
414 F: Fn(&CamelError) -> bool + Send + Sync + 'static,
415 {
416 self.error_handler_mode = match self.error_handler_mode {
417 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
418 dlc_uri: None,
419 specs: Vec::new(),
420 },
421 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
422 shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
423 };
424
425 OnExceptionBuilder {
426 parent: self,
427 policy: OnExceptionSpec {
428 matches: std::sync::Arc::new(matches),
429 retry: None,
430 handled_by: None,
431 },
432 }
433 }
434
435 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
437 self.circuit_breaker_config = Some(config);
438 self
439 }
440
441 pub fn concurrent(mut self, max: usize) -> Self {
455 let max = if max == 0 { None } else { Some(max) };
456 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
457 self
458 }
459
460 pub fn sequential(mut self) -> Self {
465 self.concurrency = Some(ConcurrencyModel::Sequential);
466 self
467 }
468
469 pub fn route_id(mut self, id: impl Into<String>) -> Self {
473 self.route_id = Some(id.into());
474 self
475 }
476
477 pub fn auto_startup(mut self, auto: bool) -> Self {
481 self.auto_startup = Some(auto);
482 self
483 }
484
485 pub fn startup_order(mut self, order: i32) -> Self {
489 self.startup_order = Some(order);
490 self
491 }
492
493 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
499 SplitBuilder {
500 parent: self,
501 config,
502 steps: Vec::new(),
503 }
504 }
505
506 pub fn multicast(self) -> MulticastBuilder {
512 MulticastBuilder {
513 parent: self,
514 steps: Vec::new(),
515 config: MulticastConfig::new(),
516 }
517 }
518
519 pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
526 ThrottleBuilder {
527 parent: self,
528 config: ThrottlerConfig::new(max_requests, period),
529 steps: Vec::new(),
530 }
531 }
532
533 pub fn loop_count(self, count: usize) -> LoopBuilder {
535 LoopBuilder {
536 parent: self,
537 config: LoopConfig {
538 mode: LoopMode::Count(count),
539 },
540 steps: vec![],
541 }
542 }
543
544 pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
546 where
547 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
548 {
549 LoopBuilder {
550 parent: self,
551 config: LoopConfig {
552 mode: LoopMode::While(std::sync::Arc::new(predicate)),
553 },
554 steps: vec![],
555 }
556 }
557
558 pub fn load_balance(self) -> LoadBalancerBuilder {
564 LoadBalancerBuilder {
565 parent: self,
566 config: LoadBalancerConfig::round_robin(),
567 steps: Vec::new(),
568 }
569 }
570
571 pub fn dynamic_router(self, expression: RouterExpression) -> Self {
587 self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
588 }
589
590 pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
594 self.steps.push(BuilderStep::DynamicRouter { config });
595 self
596 }
597
598 pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
599 self.routing_slip_with_config(RoutingSlipConfig::new(expression))
600 }
601
602 pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
603 self.steps.push(BuilderStep::RoutingSlip { config });
604 self
605 }
606
607 pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
608 self.recipient_list_with_config(RecipientListConfig::new(expression))
609 }
610
611 pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
612 self.steps.push(BuilderStep::RecipientList { config });
613 self
614 }
615
616 pub fn build(self) -> Result<RouteDefinition, CamelError> {
618 if self.from_uri.is_empty() {
619 return Err(CamelError::RouteError(
620 "route must have a 'from' URI".to_string(),
621 ));
622 }
623 let route_id = self
624 .route_id
625 .filter(|s| !s.trim().is_empty())
626 .ok_or_else(|| {
627 CamelError::RouteError(
628 "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
629 .to_string(),
630 )
631 })?;
632 let resolved_error_handler = match self.error_handler_mode {
633 ErrorHandlerMode::None => self.error_handler,
634 ErrorHandlerMode::ExplicitConfig => self.error_handler,
635 ErrorHandlerMode::Mixed => {
636 return Err(CamelError::RouteError(
637 "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
638 ));
639 }
640 ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
641 let mut config = if let Some(uri) = dlc_uri {
642 ErrorHandlerConfig::dead_letter_channel(uri)
643 } else {
644 ErrorHandlerConfig::log_only()
645 };
646
647 for spec in specs {
648 let matcher = spec.matches.clone();
649 let mut builder = config.on_exception(move |e| matcher(e));
650
651 if let Some(retry) = spec.retry {
652 builder = builder.retry(retry.max_attempts).with_backoff(
653 retry.initial_delay,
654 retry.multiplier,
655 retry.max_delay,
656 );
657 if retry.jitter_factor > 0.0 {
658 builder = builder.with_jitter(retry.jitter_factor);
659 }
660 }
661
662 if let Some(uri) = spec.handled_by {
663 builder = builder.handled_by(uri);
664 }
665
666 config = builder.build();
667 }
668
669 Some(config)
670 }
671 };
672
673 let definition = RouteDefinition::new(self.from_uri, self.steps);
674 let definition = if let Some(eh) = resolved_error_handler {
675 definition.with_error_handler(eh)
676 } else {
677 definition
678 };
679 let definition = if let Some(cb) = self.circuit_breaker_config {
680 definition.with_circuit_breaker(cb)
681 } else {
682 definition
683 };
684 let definition = if let Some(concurrency) = self.concurrency {
685 definition.with_concurrency(concurrency)
686 } else {
687 definition
688 };
689 let definition = definition.with_route_id(route_id);
690 let definition = if let Some(auto) = self.auto_startup {
691 definition.with_auto_startup(auto)
692 } else {
693 definition
694 };
695 let definition = if let Some(order) = self.startup_order {
696 definition.with_startup_order(order)
697 } else {
698 definition
699 };
700 Ok(definition)
701 }
702
703 pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
705 if self.from_uri.is_empty() {
706 return Err(CamelError::RouteError(
707 "route must have a 'from' URI".to_string(),
708 ));
709 }
710 let route_id = self
711 .route_id
712 .filter(|s| !s.trim().is_empty())
713 .ok_or_else(|| {
714 CamelError::RouteError(
715 "route must have a non-empty 'route_id' — call .route_id(\"name\") on the builder"
716 .to_string(),
717 )
718 })?;
719
720 let steps = canonicalize_steps(self.steps)?;
721 let circuit_breaker = self
722 .circuit_breaker_config
723 .map(canonicalize_circuit_breaker);
724
725 let spec = CanonicalRouteSpec {
726 route_id,
727 from: self.from_uri,
728 steps,
729 circuit_breaker,
730 version: camel_api::CANONICAL_CONTRACT_VERSION,
731 };
732 spec.validate_contract()?;
733 Ok(spec)
734 }
735}
736
737pub struct OnExceptionBuilder {
738 parent: RouteBuilder,
739 policy: OnExceptionSpec,
740}
741
742impl OnExceptionBuilder {
743 pub fn retry(mut self, max_attempts: u32) -> Self {
744 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
745 self
746 }
747
748 pub fn with_backoff(
749 mut self,
750 initial: std::time::Duration,
751 multiplier: f64,
752 max: std::time::Duration,
753 ) -> Self {
754 if let Some(ref mut retry) = self.policy.retry {
755 retry.initial_delay = initial;
756 retry.multiplier = multiplier;
757 retry.max_delay = max;
758 }
759 self
760 }
761
762 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
763 if let Some(ref mut retry) = self.policy.retry {
764 retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
765 }
766 self
767 }
768
769 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
770 self.policy.handled_by = Some(uri.into());
771 self
772 }
773
774 pub fn end_on_exception(mut self) -> RouteBuilder {
775 if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
776 specs.push(self.policy);
777 }
778 self.parent
779 }
780}
781
782fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
783 let mut canonical = Vec::with_capacity(steps.len());
784 for step in steps {
785 canonical.push(canonicalize_step(step)?);
786 }
787 Ok(canonical)
788}
789
790fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
791 match step {
792 BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
793 BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
794 BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
795 BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
796 BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
797 delay_ms: config.delay_ms,
798 dynamic_header: config.dynamic_header,
799 }),
800 BuilderStep::DeclarativeScript { expression } => {
801 Ok(CanonicalStepSpec::Script { expression })
802 }
803 BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
804 predicate,
805 steps: canonicalize_steps(steps)?,
806 }),
807 BuilderStep::DeclarativeChoice { whens, otherwise } => {
808 let mut canonical_whens = Vec::with_capacity(whens.len());
809 for DeclarativeWhenStep { predicate, steps } in whens {
810 canonical_whens.push(CanonicalWhenSpec {
811 predicate,
812 steps: canonicalize_steps(steps)?,
813 });
814 }
815 let otherwise = match otherwise {
816 Some(steps) => Some(canonicalize_steps(steps)?),
817 None => None,
818 };
819 Ok(CanonicalStepSpec::Choice {
820 whens: canonical_whens,
821 otherwise,
822 })
823 }
824 BuilderStep::DeclarativeSplit {
825 expression,
826 aggregation,
827 parallel,
828 parallel_limit,
829 stop_on_exception,
830 steps,
831 } => Ok(CanonicalStepSpec::Split {
832 expression: CanonicalSplitExpressionSpec::Language(expression),
833 aggregation: canonicalize_split_aggregation(aggregation)?,
834 parallel,
835 parallel_limit,
836 stop_on_exception,
837 steps: canonicalize_steps(steps)?,
838 }),
839 BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate(
840 canonicalize_aggregate(config)?,
841 )),
842 other => {
843 let step_name = canonical_step_name(&other);
844 let detail = camel_api::canonical_contract_rejection_reason(step_name)
845 .unwrap_or("not included in canonical v1");
846 Err(CamelError::RouteError(format!(
847 "canonical v1 does not support step `{step_name}`: {detail}"
848 )))
849 }
850 }
851}
852
853fn canonicalize_split_aggregation(
854 strategy: camel_api::splitter::AggregationStrategy,
855) -> Result<CanonicalSplitAggregationSpec, CamelError> {
856 match strategy {
857 camel_api::splitter::AggregationStrategy::LastWins => {
858 Ok(CanonicalSplitAggregationSpec::LastWins)
859 }
860 camel_api::splitter::AggregationStrategy::CollectAll => {
861 Ok(CanonicalSplitAggregationSpec::CollectAll)
862 }
863 camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
864 "canonical v1 does not support custom split aggregation".to_string(),
865 )),
866 camel_api::splitter::AggregationStrategy::Original => {
867 Ok(CanonicalSplitAggregationSpec::Original)
868 }
869 }
870}
871
872fn extract_completion_fields(
873 mode: &CompletionMode,
874) -> Result<(Option<usize>, Option<u64>), CamelError> {
875 match mode {
876 CompletionMode::Single(cond) => match cond {
877 CompletionCondition::Size(n) => Ok((Some(*n), None)),
878 CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
879 CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
880 "canonical v1 does not support aggregate predicate completion".to_string(),
881 )),
882 },
883 CompletionMode::Any(conds) => {
884 let mut size = None;
885 let mut timeout_ms = None;
886 for cond in conds {
887 match cond {
888 CompletionCondition::Size(n) => size = Some(*n),
889 CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
890 CompletionCondition::Predicate(_) => {
891 return Err(CamelError::RouteError(
892 "canonical v1 does not support aggregate predicate completion"
893 .to_string(),
894 ));
895 }
896 }
897 }
898 Ok((size, timeout_ms))
899 }
900 }
901}
902
903fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
904 let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
905
906 let header = match &config.correlation {
907 CorrelationStrategy::HeaderName(h) => h.clone(),
908 CorrelationStrategy::Expression { expr, .. } => expr.clone(),
909 CorrelationStrategy::Fn(_) => {
910 return Err(CamelError::RouteError(
911 "canonical v1 does not support Fn correlation strategy".to_string(),
912 ));
913 }
914 };
915
916 let correlation_key = match &config.correlation {
917 CorrelationStrategy::HeaderName(_) => None,
918 CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
919 CorrelationStrategy::Fn(_) => unreachable!(),
920 };
921
922 let strategy = match config.strategy {
923 AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
924 AggregationStrategy::Custom(_) => {
925 return Err(CamelError::RouteError(
926 "canonical v1 does not support custom aggregate strategy".to_string(),
927 ));
928 }
929 };
930 let bucket_ttl_ms = config
931 .bucket_ttl
932 .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
933
934 Ok(CanonicalAggregateSpec {
935 header,
936 completion_size,
937 completion_timeout_ms,
938 correlation_key,
939 force_completion_on_stop: if config.force_completion_on_stop {
940 Some(true)
941 } else {
942 None
943 },
944 discard_on_timeout: if config.discard_on_timeout {
945 Some(true)
946 } else {
947 None
948 },
949 strategy,
950 max_buckets: config.max_buckets,
951 bucket_ttl_ms,
952 })
953}
954
955fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
956 CanonicalCircuitBreakerSpec {
957 failure_threshold: config.failure_threshold,
958 open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
959 }
960}
961
962fn canonical_step_name(step: &BuilderStep) -> &'static str {
963 match step {
964 BuilderStep::Processor(_) => "processor",
965 BuilderStep::To(_) => "to",
966 BuilderStep::Stop => "stop",
967 BuilderStep::Log { .. } => "log",
968 BuilderStep::DeclarativeSetHeader { .. } => "set_header",
969 BuilderStep::DeclarativeSetBody { .. } => "set_body",
970 BuilderStep::DeclarativeFilter { .. } => "filter",
971 BuilderStep::DeclarativeChoice { .. } => "choice",
972 BuilderStep::DeclarativeScript { .. } => "script",
973 BuilderStep::DeclarativeFunction { .. } => "function",
974 BuilderStep::DeclarativeSplit { .. } => "split",
975 BuilderStep::Split { .. } => "split",
976 BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
977 BuilderStep::Aggregate { .. } => "aggregate",
978 BuilderStep::Filter { .. } => "filter",
979 BuilderStep::Choice { .. } => "choice",
980 BuilderStep::WireTap { .. } => "wire_tap",
981 BuilderStep::Delay { .. } => "delay",
982 BuilderStep::Multicast { .. } => "multicast",
983 BuilderStep::DeclarativeLog { .. } => "log",
984 BuilderStep::Bean { .. } => "bean",
985 BuilderStep::Script { .. } => "script",
986 BuilderStep::Throttle { .. } => "throttle",
987 BuilderStep::LoadBalance { .. } => "load_balancer",
988 BuilderStep::DynamicRouter { .. } => "dynamic_router",
989 BuilderStep::RoutingSlip { .. } => "routing_slip",
990 BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
991 BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
992 BuilderStep::RecipientList { .. } => "recipient_list",
993 BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
994 BuilderStep::DeclarativeSetProperty { .. } => "set_property",
995 }
996}
997
998impl StepAccumulator for RouteBuilder {
999 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1000 &mut self.steps
1001 }
1002}
1003
1004pub struct SplitBuilder {
1012 parent: RouteBuilder,
1013 config: SplitterConfig,
1014 steps: Vec<BuilderStep>,
1015}
1016
1017impl SplitBuilder {
1018 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1020 where
1021 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1022 {
1023 FilterInSplitBuilder {
1024 parent: self,
1025 predicate: std::sync::Arc::new(predicate),
1026 steps: vec![],
1027 }
1028 }
1029
1030 pub fn end_split(mut self) -> RouteBuilder {
1033 let split_step = BuilderStep::Split {
1034 config: self.config,
1035 steps: self.steps,
1036 };
1037 self.parent.steps.push(split_step);
1038 self.parent
1039 }
1040}
1041
1042impl StepAccumulator for SplitBuilder {
1043 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1044 &mut self.steps
1045 }
1046}
1047
1048pub struct FilterBuilder {
1050 parent: RouteBuilder,
1051 predicate: FilterPredicate,
1052 steps: Vec<BuilderStep>,
1053}
1054
1055impl FilterBuilder {
1056 pub fn end_filter(mut self) -> RouteBuilder {
1059 let step = BuilderStep::Filter {
1060 predicate: self.predicate,
1061 steps: self.steps,
1062 };
1063 self.parent.steps.push(step);
1064 self.parent
1065 }
1066}
1067
1068impl StepAccumulator for FilterBuilder {
1069 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1070 &mut self.steps
1071 }
1072}
1073
1074pub struct FilterInSplitBuilder {
1076 parent: SplitBuilder,
1077 predicate: FilterPredicate,
1078 steps: Vec<BuilderStep>,
1079}
1080
1081impl FilterInSplitBuilder {
1082 pub fn end_filter(mut self) -> SplitBuilder {
1084 let step = BuilderStep::Filter {
1085 predicate: self.predicate,
1086 steps: self.steps,
1087 };
1088 self.parent.steps.push(step);
1089 self.parent
1090 }
1091}
1092
1093impl StepAccumulator for FilterInSplitBuilder {
1094 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1095 &mut self.steps
1096 }
1097}
1098
1099pub struct ChoiceBuilder {
1106 parent: RouteBuilder,
1107 whens: Vec<WhenStep>,
1108 _otherwise: Option<Vec<BuilderStep>>,
1109}
1110
1111impl ChoiceBuilder {
1112 pub fn when<F>(self, predicate: F) -> WhenBuilder
1115 where
1116 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1117 {
1118 WhenBuilder {
1119 parent: self,
1120 predicate: std::sync::Arc::new(predicate),
1121 steps: vec![],
1122 }
1123 }
1124
1125 pub fn otherwise(self) -> OtherwiseBuilder {
1129 OtherwiseBuilder {
1130 parent: self,
1131 steps: vec![],
1132 }
1133 }
1134
1135 pub fn end_choice(mut self) -> RouteBuilder {
1139 let step = BuilderStep::Choice {
1140 whens: self.whens,
1141 otherwise: self._otherwise,
1142 };
1143 self.parent.steps.push(step);
1144 self.parent
1145 }
1146}
1147
1148pub struct WhenBuilder {
1150 parent: ChoiceBuilder,
1151 predicate: camel_api::FilterPredicate,
1152 steps: Vec<BuilderStep>,
1153}
1154
1155impl WhenBuilder {
1156 pub fn end_when(mut self) -> ChoiceBuilder {
1159 self.parent.whens.push(WhenStep {
1160 predicate: self.predicate,
1161 steps: self.steps,
1162 });
1163 self.parent
1164 }
1165}
1166
1167impl StepAccumulator for WhenBuilder {
1168 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1169 &mut self.steps
1170 }
1171}
1172
1173pub struct OtherwiseBuilder {
1175 parent: ChoiceBuilder,
1176 steps: Vec<BuilderStep>,
1177}
1178
1179impl OtherwiseBuilder {
1180 pub fn end_otherwise(self) -> ChoiceBuilder {
1182 let OtherwiseBuilder { mut parent, steps } = self;
1183 parent._otherwise = Some(steps);
1184 parent
1185 }
1186}
1187
1188impl StepAccumulator for OtherwiseBuilder {
1189 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1190 &mut self.steps
1191 }
1192}
1193
1194pub struct MulticastBuilder {
1202 parent: RouteBuilder,
1203 steps: Vec<BuilderStep>,
1204 config: MulticastConfig,
1205}
1206
1207impl MulticastBuilder {
1208 pub fn parallel(mut self, parallel: bool) -> Self {
1209 self.config = self.config.parallel(parallel);
1210 self
1211 }
1212
1213 pub fn parallel_limit(mut self, limit: usize) -> Self {
1214 self.config = self.config.parallel_limit(limit);
1215 self
1216 }
1217
1218 pub fn stop_on_exception(mut self, stop: bool) -> Self {
1219 self.config = self.config.stop_on_exception(stop);
1220 self
1221 }
1222
1223 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1224 self.config = self.config.timeout(duration);
1225 self
1226 }
1227
1228 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1229 self.config = self.config.aggregation(strategy);
1230 self
1231 }
1232
1233 pub fn end_multicast(mut self) -> RouteBuilder {
1234 let step = BuilderStep::Multicast {
1235 steps: self.steps,
1236 config: self.config,
1237 };
1238 self.parent.steps.push(step);
1239 self.parent
1240 }
1241}
1242
1243impl StepAccumulator for MulticastBuilder {
1244 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1245 &mut self.steps
1246 }
1247}
1248
1249pub struct ThrottleBuilder {
1257 parent: RouteBuilder,
1258 config: ThrottlerConfig,
1259 steps: Vec<BuilderStep>,
1260}
1261
1262impl ThrottleBuilder {
1263 pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1269 self.config = self.config.strategy(strategy);
1270 self
1271 }
1272
1273 pub fn end_throttle(mut self) -> RouteBuilder {
1276 let step = BuilderStep::Throttle {
1277 config: self.config,
1278 steps: self.steps,
1279 };
1280 self.parent.steps.push(step);
1281 self.parent
1282 }
1283}
1284
1285impl StepAccumulator for ThrottleBuilder {
1286 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1287 &mut self.steps
1288 }
1289}
1290
1291pub struct LoopBuilder {
1293 parent: RouteBuilder,
1294 config: LoopConfig,
1295 steps: Vec<BuilderStep>,
1296}
1297
1298impl LoopBuilder {
1299 pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1300 LoopInLoopBuilder {
1301 parent: self,
1302 config: LoopConfig {
1303 mode: LoopMode::Count(count),
1304 },
1305 steps: vec![],
1306 }
1307 }
1308
1309 pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1310 where
1311 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1312 {
1313 LoopInLoopBuilder {
1314 parent: self,
1315 config: LoopConfig {
1316 mode: LoopMode::While(std::sync::Arc::new(predicate)),
1317 },
1318 steps: vec![],
1319 }
1320 }
1321
1322 pub fn end_loop(mut self) -> RouteBuilder {
1323 let step = BuilderStep::Loop {
1324 config: self.config,
1325 steps: self.steps,
1326 };
1327 self.parent.steps.push(step);
1328 self.parent
1329 }
1330}
1331
1332impl StepAccumulator for LoopBuilder {
1333 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1334 &mut self.steps
1335 }
1336}
1337
1338pub struct LoopInLoopBuilder {
1339 parent: LoopBuilder,
1340 config: LoopConfig,
1341 steps: Vec<BuilderStep>,
1342}
1343
1344impl LoopInLoopBuilder {
1345 pub fn end_loop(mut self) -> LoopBuilder {
1346 let step = BuilderStep::Loop {
1347 config: self.config,
1348 steps: self.steps,
1349 };
1350 self.parent.steps.push(step);
1351 self.parent
1352 }
1353}
1354
1355impl StepAccumulator for LoopInLoopBuilder {
1356 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1357 &mut self.steps
1358 }
1359}
1360
1361pub struct LoadBalancerBuilder {
1369 parent: RouteBuilder,
1370 config: LoadBalancerConfig,
1371 steps: Vec<BuilderStep>,
1372}
1373
1374impl LoadBalancerBuilder {
1375 pub fn round_robin(mut self) -> Self {
1377 self.config = LoadBalancerConfig::round_robin();
1378 self
1379 }
1380
1381 pub fn random(mut self) -> Self {
1383 self.config = LoadBalancerConfig::random();
1384 self
1385 }
1386
1387 pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1392 self.config = LoadBalancerConfig::weighted(weights);
1393 self
1394 }
1395
1396 pub fn failover(mut self) -> Self {
1401 self.config = LoadBalancerConfig::failover();
1402 self
1403 }
1404
1405 pub fn parallel(mut self, parallel: bool) -> Self {
1410 self.config = self.config.parallel(parallel);
1411 self
1412 }
1413
1414 pub fn end_load_balance(mut self) -> RouteBuilder {
1417 let step = BuilderStep::LoadBalance {
1418 config: self.config,
1419 steps: self.steps,
1420 };
1421 self.parent.steps.push(step);
1422 self.parent
1423 }
1424}
1425
1426impl StepAccumulator for LoadBalancerBuilder {
1427 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1428 &mut self.steps
1429 }
1430}
1431
1432#[cfg(test)]
1437mod tests {
1438 use super::*;
1439 use camel_api::error_handler::ErrorHandlerConfig;
1440 use camel_api::load_balancer::LoadBalanceStrategy;
1441 use camel_api::{Exchange, Message};
1442 use camel_core::route::BuilderStep;
1443 use std::sync::Arc;
1444 use std::time::Duration;
1445 use tower::{Service, ServiceExt};
1446
1447 #[test]
1448 fn test_builder_from_creates_definition() {
1449 let definition = RouteBuilder::from("timer:tick")
1450 .route_id("test-route")
1451 .build()
1452 .unwrap();
1453 assert_eq!(definition.from_uri(), "timer:tick");
1454 }
1455
1456 #[test]
1457 fn test_builder_empty_from_uri_errors() {
1458 let result = RouteBuilder::from("").route_id("test-route").build();
1459 assert!(result.is_err());
1460 }
1461
1462 #[test]
1463 fn test_builder_to_adds_step() {
1464 let definition = RouteBuilder::from("timer:tick")
1465 .route_id("test-route")
1466 .to("log:info")
1467 .build()
1468 .unwrap();
1469
1470 assert_eq!(definition.from_uri(), "timer:tick");
1471 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1473 }
1474
1475 #[test]
1476 fn test_builder_filter_adds_filter_step() {
1477 let definition = RouteBuilder::from("timer:tick")
1478 .route_id("test-route")
1479 .filter(|_ex| true)
1480 .to("mock:result")
1481 .end_filter()
1482 .build()
1483 .unwrap();
1484
1485 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1486 }
1487
1488 #[test]
1489 fn test_builder_set_header_adds_processor_step() {
1490 let definition = RouteBuilder::from("timer:tick")
1491 .route_id("test-route")
1492 .set_header("key", Value::String("value".into()))
1493 .build()
1494 .unwrap();
1495
1496 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1497 }
1498
1499 #[test]
1500 fn test_builder_map_body_adds_processor_step() {
1501 let definition = RouteBuilder::from("timer:tick")
1502 .route_id("test-route")
1503 .map_body(|body| body)
1504 .build()
1505 .unwrap();
1506
1507 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1508 }
1509
1510 #[test]
1511 fn test_builder_process_adds_processor_step() {
1512 let definition = RouteBuilder::from("timer:tick")
1513 .route_id("test-route")
1514 .process(|ex| async move { Ok(ex) })
1515 .build()
1516 .unwrap();
1517
1518 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1519 }
1520
1521 #[test]
1522 fn test_builder_chain_multiple_steps() {
1523 let definition = RouteBuilder::from("timer:tick")
1524 .route_id("test-route")
1525 .set_header("source", Value::String("timer".into()))
1526 .filter(|ex| ex.input.header("source").is_some())
1527 .to("log:info")
1528 .end_filter()
1529 .to("mock:result")
1530 .build()
1531 .unwrap();
1532
1533 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1537 }
1538
1539 #[test]
1540 fn test_loop_count_builder() {
1541 use camel_api::loop_eip::LoopMode;
1542
1543 let def = RouteBuilder::from("direct:start")
1544 .route_id("loop-test")
1545 .loop_count(3)
1546 .to("mock:inside")
1547 .end_loop()
1548 .to("mock:after")
1549 .build()
1550 .unwrap();
1551
1552 assert_eq!(def.steps().len(), 2);
1553 match &def.steps()[0] {
1554 BuilderStep::Loop { config, steps } => {
1555 assert!(matches!(config.mode, LoopMode::Count(3)));
1556 assert_eq!(steps.len(), 1);
1557 }
1558 other => panic!("Expected Loop, got {:?}", other),
1559 }
1560 assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1561 }
1562
1563 #[test]
1564 fn test_loop_while_builder() {
1565 use camel_api::loop_eip::LoopMode;
1566
1567 let def = RouteBuilder::from("direct:start")
1568 .route_id("loop-while-test")
1569 .loop_while(|_ex| true)
1570 .to("mock:retry")
1571 .end_loop()
1572 .build()
1573 .unwrap();
1574
1575 assert_eq!(def.steps().len(), 1);
1576 match &def.steps()[0] {
1577 BuilderStep::Loop { config, steps } => {
1578 assert!(matches!(config.mode, LoopMode::While(_)));
1579 assert_eq!(steps.len(), 1);
1580 }
1581 other => panic!("Expected Loop, got {:?}", other),
1582 }
1583 }
1584
1585 #[test]
1586 fn test_nested_loop_builder() {
1587 use camel_api::loop_eip::LoopMode;
1588
1589 let def = RouteBuilder::from("direct:start")
1590 .route_id("nested-loop-test")
1591 .loop_count(2)
1592 .to("mock:outer")
1593 .loop_count(3)
1594 .to("mock:inner")
1595 .end_loop()
1596 .end_loop()
1597 .to("mock:after")
1598 .build()
1599 .unwrap();
1600
1601 assert_eq!(def.steps().len(), 2);
1602 match &def.steps()[0] {
1603 BuilderStep::Loop { steps, .. } => {
1604 assert_eq!(steps.len(), 2);
1605 match &steps[1] {
1606 BuilderStep::Loop {
1607 config,
1608 steps: inner_steps,
1609 } => {
1610 assert!(matches!(config.mode, LoopMode::Count(3)));
1611 assert_eq!(inner_steps.len(), 1);
1612 }
1613 other => panic!("Expected nested Loop, got {:?}", other),
1614 }
1615 }
1616 other => panic!("Expected outer Loop, got {:?}", other),
1617 }
1618 }
1619
1620 #[tokio::test]
1625 async fn test_set_header_processor_works() {
1626 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1627 let exchange = Exchange::new(Message::new("test"));
1628 let result = svc.call(exchange).await.unwrap();
1629 assert_eq!(
1630 result.input.header("greeting"),
1631 Some(&Value::String("hello".into()))
1632 );
1633 }
1634
1635 #[tokio::test]
1636 async fn test_filter_processor_passes() {
1637 use camel_api::BoxProcessorExt;
1638 use camel_processor::FilterService;
1639
1640 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1641 let mut svc =
1642 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1643 let exchange = Exchange::new(Message::new("pass"));
1644 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1645 assert_eq!(result.input.body.as_text(), Some("pass"));
1646 }
1647
1648 #[tokio::test]
1649 async fn test_filter_processor_blocks() {
1650 use camel_api::BoxProcessorExt;
1651 use camel_processor::FilterService;
1652
1653 let sub = BoxProcessor::from_fn(|_ex| {
1654 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1655 });
1656 let mut svc =
1657 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1658 let exchange = Exchange::new(Message::new("reject"));
1659 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1660 assert_eq!(result.input.body.as_text(), Some("reject"));
1661 }
1662
1663 #[tokio::test]
1664 async fn test_map_body_processor_works() {
1665 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1666 if let Some(text) = body.as_text() {
1667 Body::Text(text.to_uppercase())
1668 } else {
1669 body
1670 }
1671 });
1672 let exchange = Exchange::new(Message::new("hello"));
1673 let result = mapper.oneshot(exchange).await.unwrap();
1674 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1675 }
1676
1677 #[tokio::test]
1678 async fn test_process_custom_processor_works() {
1679 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1680 ex.set_property("custom", Value::Bool(true));
1681 Ok(ex)
1682 });
1683 let exchange = Exchange::new(Message::default());
1684 let result = processor.oneshot(exchange).await.unwrap();
1685 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1686 }
1687
1688 #[tokio::test]
1693 async fn test_compose_pipeline_runs_steps_in_order() {
1694 use camel_core::route::compose_pipeline;
1695
1696 let processors = vec![
1697 BoxProcessor::new(SetHeader::new(
1698 IdentityProcessor,
1699 "step",
1700 Value::String("one".into()),
1701 )),
1702 BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1703 if let Some(text) = body.as_text() {
1704 Body::Text(format!("{}-processed", text))
1705 } else {
1706 body
1707 }
1708 })),
1709 ];
1710
1711 let pipeline = compose_pipeline(processors);
1712 let exchange = Exchange::new(Message::new("hello"));
1713 let result = pipeline.oneshot(exchange).await.unwrap();
1714
1715 assert_eq!(
1716 result.input.header("step"),
1717 Some(&Value::String("one".into()))
1718 );
1719 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1720 }
1721
1722 #[tokio::test]
1723 async fn test_compose_pipeline_empty_is_identity() {
1724 use camel_core::route::compose_pipeline;
1725
1726 let pipeline = compose_pipeline(vec![]);
1727 let exchange = Exchange::new(Message::new("unchanged"));
1728 let result = pipeline.oneshot(exchange).await.unwrap();
1729 assert_eq!(result.input.body.as_text(), Some("unchanged"));
1730 }
1731
1732 #[test]
1737 fn test_builder_circuit_breaker_sets_config() {
1738 use camel_api::circuit_breaker::CircuitBreakerConfig;
1739
1740 let config = CircuitBreakerConfig::new().failure_threshold(5);
1741 let definition = RouteBuilder::from("timer:tick")
1742 .route_id("test-route")
1743 .circuit_breaker(config)
1744 .build()
1745 .unwrap();
1746
1747 let cb = definition
1748 .circuit_breaker_config()
1749 .expect("circuit breaker should be set");
1750 assert_eq!(cb.failure_threshold, 5);
1751 }
1752
1753 #[test]
1754 fn test_builder_circuit_breaker_with_error_handler() {
1755 use camel_api::circuit_breaker::CircuitBreakerConfig;
1756 use camel_api::error_handler::ErrorHandlerConfig;
1757
1758 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1759 let eh_config = ErrorHandlerConfig::log_only();
1760
1761 let definition = RouteBuilder::from("timer:tick")
1762 .route_id("test-route")
1763 .to("log:info")
1764 .circuit_breaker(cb_config)
1765 .error_handler(eh_config)
1766 .build()
1767 .unwrap();
1768
1769 assert!(
1770 definition.circuit_breaker_config().is_some(),
1771 "circuit breaker config should be set"
1772 );
1773 }
1775
1776 #[test]
1777 fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1778 let definition = RouteBuilder::from("direct:start")
1779 .route_id("test-route")
1780 .dead_letter_channel("log:dlc")
1781 .on_exception(|e| matches!(e, CamelError::Io(_)))
1782 .retry(3)
1783 .handled_by("log:io")
1784 .end_on_exception()
1785 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1786 .retry(1)
1787 .end_on_exception()
1788 .to("mock:out")
1789 .build()
1790 .expect("route should build");
1791
1792 let cfg = definition
1793 .error_handler_config()
1794 .expect("error handler should be set");
1795 assert_eq!(cfg.policies.len(), 2);
1796 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1797 assert_eq!(
1798 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1799 Some(3)
1800 );
1801 assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1802 assert_eq!(
1803 cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1804 Some(1)
1805 );
1806 }
1807
1808 #[test]
1809 fn test_builder_on_exception_mixed_mode_rejected() {
1810 let result = RouteBuilder::from("direct:start")
1811 .route_id("test-route")
1812 .error_handler(ErrorHandlerConfig::log_only())
1813 .on_exception(|_e| true)
1814 .end_on_exception()
1815 .to("mock:out")
1816 .build();
1817
1818 let err = result.err().expect("mixed mode should fail with an error");
1819
1820 assert!(
1821 format!("{err}").contains("mixed error handler modes"),
1822 "unexpected error: {err}"
1823 );
1824 }
1825
1826 #[test]
1827 fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1828 let definition = RouteBuilder::from("direct:start")
1829 .route_id("test-route")
1830 .on_exception(|_e| true)
1831 .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1832 .with_jitter(0.5)
1833 .end_on_exception()
1834 .to("mock:out")
1835 .build()
1836 .expect("route should build");
1837
1838 let cfg = definition
1839 .error_handler_config()
1840 .expect("error handler should be set");
1841 assert_eq!(cfg.policies.len(), 1);
1842 assert!(cfg.policies[0].retry.is_none());
1843 }
1844
1845 #[test]
1846 fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1847 let definition = RouteBuilder::from("direct:start")
1848 .route_id("test-route")
1849 .dead_letter_channel("log:dlc")
1850 .to("mock:out")
1851 .build()
1852 .expect("route should build");
1853
1854 let cfg = definition
1855 .error_handler_config()
1856 .expect("error handler should be set");
1857 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1858 assert!(cfg.policies.is_empty());
1859 }
1860
1861 #[test]
1862 fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1863 let definition = RouteBuilder::from("direct:start")
1864 .route_id("test-route")
1865 .dead_letter_channel("log:first")
1866 .on_exception(|e| matches!(e, CamelError::Io(_)))
1867 .retry(2)
1868 .end_on_exception()
1869 .dead_letter_channel("log:second")
1870 .to("mock:out")
1871 .build()
1872 .expect("route should build");
1873
1874 let cfg = definition
1875 .error_handler_config()
1876 .expect("error handler should be set");
1877 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1878 assert_eq!(cfg.policies.len(), 1);
1879 assert_eq!(
1880 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1881 Some(2)
1882 );
1883 }
1884
1885 #[test]
1886 fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
1887 let definition = RouteBuilder::from("direct:start")
1888 .route_id("test-route")
1889 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1890 .retry(1)
1891 .end_on_exception()
1892 .to("mock:out")
1893 .build()
1894 .expect("route should build");
1895
1896 let cfg = definition
1897 .error_handler_config()
1898 .expect("error handler should be set");
1899 assert!(cfg.dlc_uri.is_none());
1900 assert_eq!(cfg.policies.len(), 1);
1901 }
1902
1903 #[test]
1904 fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
1905 let first = ErrorHandlerConfig::dead_letter_channel("log:first");
1906 let second = ErrorHandlerConfig::dead_letter_channel("log:second");
1907
1908 let definition = RouteBuilder::from("direct:start")
1909 .route_id("test-route")
1910 .error_handler(first)
1911 .error_handler(second)
1912 .to("mock:out")
1913 .build()
1914 .expect("route should build");
1915
1916 let cfg = definition
1917 .error_handler_config()
1918 .expect("error handler should be set");
1919 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1920 }
1921
1922 #[test]
1925 fn test_split_builder_typestate() {
1926 use camel_api::splitter::{SplitterConfig, split_body_lines};
1927
1928 let definition = RouteBuilder::from("timer:test?period=1000")
1930 .route_id("test-route")
1931 .split(SplitterConfig::new(split_body_lines()))
1932 .to("mock:per-fragment")
1933 .end_split()
1934 .to("mock:final")
1935 .build()
1936 .unwrap();
1937
1938 assert_eq!(definition.steps().len(), 2);
1940 }
1941
1942 #[test]
1943 fn test_split_builder_steps_collected() {
1944 use camel_api::splitter::{SplitterConfig, split_body_lines};
1945
1946 let definition = RouteBuilder::from("timer:test?period=1000")
1947 .route_id("test-route")
1948 .split(SplitterConfig::new(split_body_lines()))
1949 .set_header("fragment", Value::String("yes".into()))
1950 .to("mock:per-fragment")
1951 .end_split()
1952 .build()
1953 .unwrap();
1954
1955 assert_eq!(definition.steps().len(), 1);
1957 match &definition.steps()[0] {
1958 BuilderStep::Split { steps, .. } => {
1959 assert_eq!(steps.len(), 2); }
1961 other => panic!("Expected Split, got {:?}", other),
1962 }
1963 }
1964
1965 #[test]
1966 fn test_split_builder_config_propagated() {
1967 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1968
1969 let definition = RouteBuilder::from("timer:test?period=1000")
1970 .route_id("test-route")
1971 .split(
1972 SplitterConfig::new(split_body_lines())
1973 .parallel(true)
1974 .parallel_limit(4)
1975 .aggregation(AggregationStrategy::CollectAll),
1976 )
1977 .to("mock:per-fragment")
1978 .end_split()
1979 .build()
1980 .unwrap();
1981
1982 match &definition.steps()[0] {
1983 BuilderStep::Split { config, .. } => {
1984 assert!(config.parallel);
1985 assert_eq!(config.parallel_limit, Some(4));
1986 assert!(matches!(
1987 config.aggregation,
1988 AggregationStrategy::CollectAll
1989 ));
1990 }
1991 other => panic!("Expected Split, got {:?}", other),
1992 }
1993 }
1994
1995 #[test]
1996 fn test_aggregate_builder_adds_step() {
1997 use camel_api::aggregator::AggregatorConfig;
1998 use camel_core::route::BuilderStep;
1999
2000 let definition = RouteBuilder::from("timer:tick")
2001 .route_id("test-route")
2002 .aggregate(
2003 AggregatorConfig::correlate_by("key")
2004 .complete_when_size(2)
2005 .build(),
2006 )
2007 .build()
2008 .unwrap();
2009
2010 assert_eq!(definition.steps().len(), 1);
2011 assert!(matches!(
2012 definition.steps()[0],
2013 BuilderStep::Aggregate { .. }
2014 ));
2015 }
2016
2017 #[test]
2018 fn test_aggregate_in_split_builder() {
2019 use camel_api::aggregator::AggregatorConfig;
2020 use camel_api::splitter::{SplitterConfig, split_body_lines};
2021 use camel_core::route::BuilderStep;
2022
2023 let definition = RouteBuilder::from("timer:tick")
2024 .route_id("test-route")
2025 .split(SplitterConfig::new(split_body_lines()))
2026 .aggregate(
2027 AggregatorConfig::correlate_by("key")
2028 .complete_when_size(1)
2029 .build(),
2030 )
2031 .end_split()
2032 .build()
2033 .unwrap();
2034
2035 assert_eq!(definition.steps().len(), 1);
2036 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2037 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2038 } else {
2039 panic!("expected Split step");
2040 }
2041 }
2042
2043 #[test]
2046 fn test_builder_set_body_static_adds_processor() {
2047 let definition = RouteBuilder::from("timer:tick")
2048 .route_id("test-route")
2049 .set_body("fixed")
2050 .build()
2051 .unwrap();
2052 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2053 }
2054
2055 #[test]
2056 fn test_builder_set_body_fn_adds_processor() {
2057 let definition = RouteBuilder::from("timer:tick")
2058 .route_id("test-route")
2059 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2060 .build()
2061 .unwrap();
2062 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2063 }
2064
2065 #[test]
2066 fn transform_alias_produces_same_as_set_body() {
2067 let route_transform = RouteBuilder::from("timer:tick")
2068 .route_id("test-route")
2069 .transform("hello")
2070 .build()
2071 .unwrap();
2072
2073 let route_set_body = RouteBuilder::from("timer:tick")
2074 .route_id("test-route")
2075 .set_body("hello")
2076 .build()
2077 .unwrap();
2078
2079 assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2080 }
2081
2082 #[test]
2083 fn test_builder_set_header_fn_adds_processor() {
2084 let definition = RouteBuilder::from("timer:tick")
2085 .route_id("test-route")
2086 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2087 .build()
2088 .unwrap();
2089 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2090 }
2091
2092 #[tokio::test]
2093 async fn test_set_body_static_processor_works() {
2094 use camel_core::route::compose_pipeline;
2095 let def = RouteBuilder::from("t:t")
2096 .route_id("test-route")
2097 .set_body("replaced")
2098 .build()
2099 .unwrap();
2100 let pipeline = compose_pipeline(
2101 def.steps()
2102 .iter()
2103 .filter_map(|s| {
2104 if let BuilderStep::Processor(p) = s {
2105 Some(p.clone())
2106 } else {
2107 None
2108 }
2109 })
2110 .collect(),
2111 );
2112 let exchange = Exchange::new(Message::new("original"));
2113 let result = pipeline.oneshot(exchange).await.unwrap();
2114 assert_eq!(result.input.body.as_text(), Some("replaced"));
2115 }
2116
2117 #[tokio::test]
2118 async fn test_set_body_fn_processor_works() {
2119 use camel_core::route::compose_pipeline;
2120 let def = RouteBuilder::from("t:t")
2121 .route_id("test-route")
2122 .set_body_fn(|ex: &Exchange| {
2123 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2124 })
2125 .build()
2126 .unwrap();
2127 let pipeline = compose_pipeline(
2128 def.steps()
2129 .iter()
2130 .filter_map(|s| {
2131 if let BuilderStep::Processor(p) = s {
2132 Some(p.clone())
2133 } else {
2134 None
2135 }
2136 })
2137 .collect(),
2138 );
2139 let exchange = Exchange::new(Message::new("hello"));
2140 let result = pipeline.oneshot(exchange).await.unwrap();
2141 assert_eq!(result.input.body.as_text(), Some("HELLO"));
2142 }
2143
2144 #[tokio::test]
2145 async fn test_set_header_fn_processor_works() {
2146 use camel_core::route::compose_pipeline;
2147 let def = RouteBuilder::from("t:t")
2148 .route_id("test-route")
2149 .set_header_fn("echo", |ex: &Exchange| {
2150 ex.input
2151 .body
2152 .as_text()
2153 .map(|t| Value::String(t.into()))
2154 .unwrap_or(Value::Null)
2155 })
2156 .build()
2157 .unwrap();
2158 let pipeline = compose_pipeline(
2159 def.steps()
2160 .iter()
2161 .filter_map(|s| {
2162 if let BuilderStep::Processor(p) = s {
2163 Some(p.clone())
2164 } else {
2165 None
2166 }
2167 })
2168 .collect(),
2169 );
2170 let exchange = Exchange::new(Message::new("ping"));
2171 let result = pipeline.oneshot(exchange).await.unwrap();
2172 assert_eq!(
2173 result.input.header("echo"),
2174 Some(&Value::String("ping".into()))
2175 );
2176 }
2177
2178 #[test]
2181 fn test_filter_builder_typestate() {
2182 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2183 .route_id("test-route")
2184 .filter(|_ex| true)
2185 .to("mock:inner")
2186 .end_filter()
2187 .to("mock:outer")
2188 .build();
2189 assert!(result.is_ok());
2190 }
2191
2192 #[test]
2193 fn test_filter_builder_steps_collected() {
2194 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2195 .route_id("test-route")
2196 .filter(|_ex| true)
2197 .to("mock:inner")
2198 .end_filter()
2199 .build()
2200 .unwrap();
2201
2202 assert_eq!(definition.steps().len(), 1);
2203 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2204 }
2205
2206 #[test]
2207 fn test_wire_tap_builder_adds_step() {
2208 let definition = RouteBuilder::from("timer:tick")
2209 .route_id("test-route")
2210 .wire_tap("mock:tap")
2211 .to("mock:result")
2212 .build()
2213 .unwrap();
2214
2215 assert_eq!(definition.steps().len(), 2);
2216 assert!(
2217 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2218 );
2219 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2220 }
2221
2222 #[test]
2225 fn test_multicast_builder_typestate() {
2226 let definition = RouteBuilder::from("timer:tick")
2227 .route_id("test-route")
2228 .multicast()
2229 .to("direct:a")
2230 .to("direct:b")
2231 .end_multicast()
2232 .to("mock:result")
2233 .build()
2234 .unwrap();
2235
2236 assert_eq!(definition.steps().len(), 2); }
2238
2239 #[test]
2240 fn test_multicast_builder_steps_collected() {
2241 let definition = RouteBuilder::from("timer:tick")
2242 .route_id("test-route")
2243 .multicast()
2244 .to("direct:a")
2245 .to("direct:b")
2246 .end_multicast()
2247 .build()
2248 .unwrap();
2249
2250 match &definition.steps()[0] {
2251 BuilderStep::Multicast { steps, .. } => {
2252 assert_eq!(steps.len(), 2);
2253 }
2254 other => panic!("Expected Multicast, got {:?}", other),
2255 }
2256 }
2257
2258 #[test]
2261 fn test_builder_concurrent_sets_concurrency() {
2262 use camel_component_api::ConcurrencyModel;
2263
2264 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2265 .route_id("test-route")
2266 .concurrent(16)
2267 .to("log:info")
2268 .build()
2269 .unwrap();
2270
2271 assert_eq!(
2272 definition.concurrency_override(),
2273 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2274 );
2275 }
2276
2277 #[test]
2278 fn test_builder_concurrent_zero_means_unbounded() {
2279 use camel_component_api::ConcurrencyModel;
2280
2281 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2282 .route_id("test-route")
2283 .concurrent(0)
2284 .to("log:info")
2285 .build()
2286 .unwrap();
2287
2288 assert_eq!(
2289 definition.concurrency_override(),
2290 Some(&ConcurrencyModel::Concurrent { max: None })
2291 );
2292 }
2293
2294 #[test]
2295 fn test_builder_sequential_sets_concurrency() {
2296 use camel_component_api::ConcurrencyModel;
2297
2298 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2299 .route_id("test-route")
2300 .sequential()
2301 .to("log:info")
2302 .build()
2303 .unwrap();
2304
2305 assert_eq!(
2306 definition.concurrency_override(),
2307 Some(&ConcurrencyModel::Sequential)
2308 );
2309 }
2310
2311 #[test]
2312 fn test_builder_default_concurrency_is_none() {
2313 let definition = RouteBuilder::from("timer:tick")
2314 .route_id("test-route")
2315 .to("log:info")
2316 .build()
2317 .unwrap();
2318
2319 assert_eq!(definition.concurrency_override(), None);
2320 }
2321
2322 #[test]
2325 fn test_builder_route_id_sets_id() {
2326 let definition = RouteBuilder::from("timer:tick")
2327 .route_id("my-route")
2328 .build()
2329 .unwrap();
2330
2331 assert_eq!(definition.route_id(), "my-route");
2332 }
2333
2334 #[test]
2335 fn test_build_without_route_id_fails() {
2336 let result = RouteBuilder::from("timer:tick?period=1000")
2337 .to("log:info")
2338 .build();
2339 let err = match result {
2340 Err(e) => e.to_string(),
2341 Ok(_) => panic!("build() should fail without route_id"),
2342 };
2343 assert!(
2344 err.contains("route_id"),
2345 "error should mention route_id, got: {}",
2346 err
2347 );
2348 }
2349
2350 #[test]
2351 fn test_builder_empty_route_id_rejected() {
2352 let result = RouteBuilder::from("timer:tick").route_id("").build();
2353 let err = result.err().expect("empty route_id should be rejected");
2354 assert!(matches!(err, CamelError::RouteError(_)));
2355 }
2356
2357 #[test]
2358 fn test_builder_whitespace_route_id_rejected() {
2359 let result = RouteBuilder::from("timer:tick").route_id(" ").build();
2360 assert!(result.is_err());
2361 }
2362
2363 #[test]
2364 fn test_builder_auto_startup_false() {
2365 let definition = RouteBuilder::from("timer:tick")
2366 .route_id("test-route")
2367 .auto_startup(false)
2368 .build()
2369 .unwrap();
2370
2371 assert!(!definition.auto_startup());
2372 }
2373
2374 #[test]
2375 fn test_builder_startup_order_custom() {
2376 let definition = RouteBuilder::from("timer:tick")
2377 .route_id("test-route")
2378 .startup_order(50)
2379 .build()
2380 .unwrap();
2381
2382 assert_eq!(definition.startup_order(), 50);
2383 }
2384
2385 #[test]
2386 fn test_builder_defaults() {
2387 let definition = RouteBuilder::from("timer:tick")
2388 .route_id("test-route")
2389 .build()
2390 .unwrap();
2391
2392 assert_eq!(definition.route_id(), "test-route");
2393 assert!(definition.auto_startup());
2394 assert_eq!(definition.startup_order(), 1000);
2395 }
2396
2397 #[test]
2400 fn test_choice_builder_single_when() {
2401 let definition = RouteBuilder::from("timer:tick")
2402 .route_id("test-route")
2403 .choice()
2404 .when(|ex: &Exchange| ex.input.header("type").is_some())
2405 .to("mock:typed")
2406 .end_when()
2407 .end_choice()
2408 .build()
2409 .unwrap();
2410 assert_eq!(definition.steps().len(), 1);
2411 assert!(
2412 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2413 if whens.len() == 1 && otherwise.is_none())
2414 );
2415 }
2416
2417 #[test]
2418 fn test_choice_builder_when_otherwise() {
2419 let definition = RouteBuilder::from("timer:tick")
2420 .route_id("test-route")
2421 .choice()
2422 .when(|ex: &Exchange| ex.input.header("a").is_some())
2423 .to("mock:a")
2424 .end_when()
2425 .otherwise()
2426 .to("mock:fallback")
2427 .end_otherwise()
2428 .end_choice()
2429 .build()
2430 .unwrap();
2431 assert!(
2432 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2433 if whens.len() == 1 && otherwise.is_some())
2434 );
2435 }
2436
2437 #[test]
2438 fn test_choice_builder_multiple_whens() {
2439 let definition = RouteBuilder::from("timer:tick")
2440 .route_id("test-route")
2441 .choice()
2442 .when(|ex: &Exchange| ex.input.header("a").is_some())
2443 .to("mock:a")
2444 .end_when()
2445 .when(|ex: &Exchange| ex.input.header("b").is_some())
2446 .to("mock:b")
2447 .end_when()
2448 .end_choice()
2449 .build()
2450 .unwrap();
2451 assert!(
2452 matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2453 if whens.len() == 2)
2454 );
2455 }
2456
2457 #[test]
2458 fn test_choice_step_after_choice() {
2459 let definition = RouteBuilder::from("timer:tick")
2461 .route_id("test-route")
2462 .choice()
2463 .when(|_ex: &Exchange| true)
2464 .to("mock:inner")
2465 .end_when()
2466 .end_choice()
2467 .to("mock:outer") .build()
2469 .unwrap();
2470 assert_eq!(definition.steps().len(), 2);
2471 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2472 }
2473
2474 #[test]
2477 fn test_throttle_builder_typestate() {
2478 let definition = RouteBuilder::from("timer:tick")
2479 .route_id("test-route")
2480 .throttle(10, std::time::Duration::from_secs(1))
2481 .to("mock:result")
2482 .end_throttle()
2483 .build()
2484 .unwrap();
2485
2486 assert_eq!(definition.steps().len(), 1);
2487 assert!(matches!(
2488 &definition.steps()[0],
2489 BuilderStep::Throttle { .. }
2490 ));
2491 }
2492
2493 #[test]
2494 fn test_throttle_builder_with_strategy() {
2495 let definition = RouteBuilder::from("timer:tick")
2496 .route_id("test-route")
2497 .throttle(10, std::time::Duration::from_secs(1))
2498 .strategy(ThrottleStrategy::Reject)
2499 .to("mock:result")
2500 .end_throttle()
2501 .build()
2502 .unwrap();
2503
2504 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2505 assert_eq!(config.strategy, ThrottleStrategy::Reject);
2506 } else {
2507 panic!("Expected Throttle step");
2508 }
2509 }
2510
2511 #[test]
2512 fn test_throttle_builder_steps_collected() {
2513 let definition = RouteBuilder::from("timer:tick")
2514 .route_id("test-route")
2515 .throttle(5, std::time::Duration::from_secs(1))
2516 .set_header("throttled", Value::Bool(true))
2517 .to("mock:throttled")
2518 .end_throttle()
2519 .build()
2520 .unwrap();
2521
2522 match &definition.steps()[0] {
2523 BuilderStep::Throttle { steps, .. } => {
2524 assert_eq!(steps.len(), 2); }
2526 other => panic!("Expected Throttle, got {:?}", other),
2527 }
2528 }
2529
2530 #[test]
2531 fn test_throttle_step_after_throttle() {
2532 let definition = RouteBuilder::from("timer:tick")
2534 .route_id("test-route")
2535 .throttle(10, std::time::Duration::from_secs(1))
2536 .to("mock:inner")
2537 .end_throttle()
2538 .to("mock:outer")
2539 .build()
2540 .unwrap();
2541
2542 assert_eq!(definition.steps().len(), 2);
2543 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2544 }
2545
2546 #[test]
2549 fn test_load_balance_builder_typestate() {
2550 let definition = RouteBuilder::from("timer:tick")
2551 .route_id("test-route")
2552 .load_balance()
2553 .round_robin()
2554 .to("mock:a")
2555 .to("mock:b")
2556 .end_load_balance()
2557 .build()
2558 .unwrap();
2559
2560 assert_eq!(definition.steps().len(), 1);
2561 assert!(matches!(
2562 &definition.steps()[0],
2563 BuilderStep::LoadBalance { .. }
2564 ));
2565 }
2566
2567 #[test]
2568 fn test_load_balance_builder_with_strategy() {
2569 let definition = RouteBuilder::from("timer:tick")
2570 .route_id("test-route")
2571 .load_balance()
2572 .random()
2573 .to("mock:result")
2574 .end_load_balance()
2575 .build()
2576 .unwrap();
2577
2578 if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2579 assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2580 } else {
2581 panic!("Expected LoadBalance step");
2582 }
2583 }
2584
2585 #[test]
2586 fn test_load_balance_builder_steps_collected() {
2587 let definition = RouteBuilder::from("timer:tick")
2588 .route_id("test-route")
2589 .load_balance()
2590 .set_header("lb", Value::Bool(true))
2591 .to("mock:a")
2592 .end_load_balance()
2593 .build()
2594 .unwrap();
2595
2596 match &definition.steps()[0] {
2597 BuilderStep::LoadBalance { steps, .. } => {
2598 assert_eq!(steps.len(), 2); }
2600 other => panic!("Expected LoadBalance, got {:?}", other),
2601 }
2602 }
2603
2604 #[test]
2605 fn test_load_balance_step_after_load_balance() {
2606 let definition = RouteBuilder::from("timer:tick")
2608 .route_id("test-route")
2609 .load_balance()
2610 .to("mock:inner")
2611 .end_load_balance()
2612 .to("mock:outer")
2613 .build()
2614 .unwrap();
2615
2616 assert_eq!(definition.steps().len(), 2);
2617 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2618 }
2619
2620 #[test]
2623 fn test_dynamic_router_builder() {
2624 let definition = RouteBuilder::from("timer:tick")
2625 .route_id("test-route")
2626 .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2627 .build()
2628 .unwrap();
2629
2630 assert_eq!(definition.steps().len(), 1);
2631 assert!(matches!(
2632 &definition.steps()[0],
2633 BuilderStep::DynamicRouter { .. }
2634 ));
2635 }
2636
2637 #[test]
2638 fn test_dynamic_router_builder_with_config() {
2639 let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2640 .max_iterations(100)
2641 .cache_size(500);
2642
2643 let definition = RouteBuilder::from("timer:tick")
2644 .route_id("test-route")
2645 .dynamic_router_with_config(config)
2646 .build()
2647 .unwrap();
2648
2649 assert_eq!(definition.steps().len(), 1);
2650 if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2651 assert_eq!(config.max_iterations, 100);
2652 assert_eq!(config.cache_size, 500);
2653 } else {
2654 panic!("Expected DynamicRouter step");
2655 }
2656 }
2657
2658 #[test]
2659 fn test_dynamic_router_step_after_router() {
2660 let definition = RouteBuilder::from("timer:tick")
2662 .route_id("test-route")
2663 .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2664 .to("mock:outer")
2665 .build()
2666 .unwrap();
2667
2668 assert_eq!(definition.steps().len(), 2);
2669 assert!(matches!(
2670 &definition.steps()[0],
2671 BuilderStep::DynamicRouter { .. }
2672 ));
2673 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2674 }
2675
2676 #[test]
2677 fn routing_slip_builder_creates_step() {
2678 use camel_api::RoutingSlipExpression;
2679
2680 let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2681
2682 let route = RouteBuilder::from("direct:start")
2683 .route_id("routing-slip-test")
2684 .routing_slip(expression)
2685 .build()
2686 .unwrap();
2687
2688 assert!(
2689 matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2690 "Expected RoutingSlip step"
2691 );
2692 }
2693
2694 #[test]
2695 fn routing_slip_with_config_builder_creates_step() {
2696 use camel_api::RoutingSlipConfig;
2697
2698 let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2699 .uri_delimiter("|")
2700 .cache_size(50)
2701 .ignore_invalid_endpoints(true);
2702
2703 let route = RouteBuilder::from("direct:start")
2704 .route_id("routing-slip-config-test")
2705 .routing_slip_with_config(config)
2706 .build()
2707 .unwrap();
2708
2709 if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2710 assert_eq!(config.uri_delimiter, "|");
2711 assert_eq!(config.cache_size, 50);
2712 assert!(config.ignore_invalid_endpoints);
2713 } else {
2714 panic!("Expected RoutingSlip step");
2715 }
2716 }
2717
2718 #[test]
2719 fn test_builder_marshal_adds_processor_step() {
2720 let definition = RouteBuilder::from("timer:tick")
2721 .route_id("test-route")
2722 .marshal("json")
2723 .build()
2724 .unwrap();
2725 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2726 }
2727
2728 #[test]
2729 fn test_builder_unmarshal_adds_processor_step() {
2730 let definition = RouteBuilder::from("timer:tick")
2731 .route_id("test-route")
2732 .unmarshal("json")
2733 .build()
2734 .unwrap();
2735 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2736 }
2737
2738 #[test]
2739 fn test_builder_stream_cache_adds_processor_step() {
2740 let definition = RouteBuilder::from("timer:tick")
2741 .route_id("test-route")
2742 .stream_cache(1024)
2743 .build()
2744 .unwrap();
2745 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2746 }
2747
2748 #[test]
2749 fn validate_adds_to_step_with_validator_uri() {
2750 let def = RouteBuilder::from("direct:in")
2751 .route_id("test")
2752 .validate("schemas/order.xsd")
2753 .build()
2754 .unwrap();
2755 let steps = def.steps();
2756 assert_eq!(steps.len(), 1);
2757 assert!(
2758 matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2759 "got: {:?}",
2760 steps[0]
2761 );
2762 }
2763
2764 #[test]
2765 #[should_panic(expected = "unknown data format: 'csv'")]
2766 fn test_builder_marshal_panics_on_unknown_format() {
2767 let _ = RouteBuilder::from("timer:tick")
2768 .route_id("test-route")
2769 .marshal("csv")
2770 .build();
2771 }
2772
2773 #[test]
2774 #[should_panic(expected = "unknown data format: 'csv'")]
2775 fn test_builder_unmarshal_panics_on_unknown_format() {
2776 let _ = RouteBuilder::from("timer:tick")
2777 .route_id("test-route")
2778 .unmarshal("csv")
2779 .build();
2780 }
2781
2782 #[test]
2783 fn test_builder_recipient_list_creates_step() {
2784 let route = RouteBuilder::from("direct:start")
2785 .route_id("recipient-list-test")
2786 .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2787 .build()
2788 .unwrap();
2789
2790 assert!(matches!(
2791 &route.steps()[0],
2792 BuilderStep::RecipientList { .. }
2793 ));
2794 }
2795
2796 #[test]
2797 fn test_builder_recipient_list_with_config_creates_step() {
2798 let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2799
2800 let route = RouteBuilder::from("direct:start")
2801 .route_id("recipient-list-config-test")
2802 .recipient_list_with_config(config)
2803 .build()
2804 .unwrap();
2805
2806 assert!(matches!(
2807 &route.steps()[0],
2808 BuilderStep::RecipientList { .. }
2809 ));
2810 }
2811
2812 #[test]
2813 fn test_builder_script_adds_script_step() {
2814 let route = RouteBuilder::from("direct:start")
2815 .route_id("script-test")
2816 .script("rhai", "headers[\"x\"] = \"y\"")
2817 .build()
2818 .unwrap();
2819
2820 assert!(matches!(
2821 &route.steps()[0],
2822 BuilderStep::Script { language, script }
2823 if language == "rhai" && script == "headers[\"x\"] = \"y\""
2824 ));
2825 }
2826
2827 #[test]
2828 fn test_builder_delay_and_delay_with_header_add_steps() {
2829 let route = RouteBuilder::from("direct:start")
2830 .route_id("delay-test")
2831 .delay(Duration::from_millis(250))
2832 .delay_with_header(Duration::from_millis(500), "x-delay")
2833 .build()
2834 .unwrap();
2835
2836 assert_eq!(route.steps().len(), 2);
2837 assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
2838 assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
2839 }
2840
2841 #[test]
2842 fn test_builder_log_and_stop_add_steps_in_order() {
2843 let route = RouteBuilder::from("direct:start")
2844 .route_id("log-stop-test")
2845 .log("hello", LogLevel::Info)
2846 .stop()
2847 .to("mock:after")
2848 .build()
2849 .unwrap();
2850
2851 assert_eq!(route.steps().len(), 3);
2852 assert!(matches!(
2853 &route.steps()[0],
2854 BuilderStep::Log { message, .. } if message == "hello"
2855 ));
2856 assert!(matches!(&route.steps()[1], BuilderStep::Stop));
2857 assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
2858 }
2859
2860 #[test]
2861 fn test_builder_stream_cache_default_adds_processor_step() {
2862 let route = RouteBuilder::from("direct:start")
2863 .route_id("stream-cache-default-test")
2864 .stream_cache_default()
2865 .build()
2866 .unwrap();
2867
2868 assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
2869 }
2870
2871 #[test]
2872 fn test_validate_preserves_existing_validator_prefix() {
2873 let route = RouteBuilder::from("direct:in")
2874 .route_id("validate-prefix-test")
2875 .validate("validator:schemas/order.xsd")
2876 .build()
2877 .unwrap();
2878
2879 assert!(matches!(
2880 &route.steps()[0],
2881 BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
2882 ));
2883 }
2884
2885 #[test]
2886 fn test_load_balance_builder_weighted_failover_parallel_config() {
2887 let route = RouteBuilder::from("direct:start")
2888 .route_id("lb-weighted-failover-parallel")
2889 .load_balance()
2890 .weighted(vec![
2891 ("direct:a".to_string(), 3),
2892 ("direct:b".to_string(), 1),
2893 ])
2894 .failover()
2895 .parallel(true)
2896 .to("mock:result")
2897 .end_load_balance()
2898 .build()
2899 .unwrap();
2900
2901 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
2902 assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
2903 assert!(config.parallel);
2904 } else {
2905 panic!("Expected LoadBalance step");
2906 }
2907 }
2908
2909 #[test]
2910 fn test_multicast_builder_all_config_setters() {
2911 let route = RouteBuilder::from("direct:start")
2912 .route_id("multicast-config-test")
2913 .multicast()
2914 .parallel(true)
2915 .parallel_limit(4)
2916 .stop_on_exception(true)
2917 .timeout(Duration::from_millis(300))
2918 .aggregation(MulticastStrategy::Original)
2919 .to("mock:a")
2920 .end_multicast()
2921 .build()
2922 .unwrap();
2923
2924 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
2925 assert!(config.parallel);
2926 assert_eq!(config.parallel_limit, Some(4));
2927 assert!(config.stop_on_exception);
2928 assert_eq!(config.timeout, Some(Duration::from_millis(300)));
2929 assert!(matches!(config.aggregation, MulticastStrategy::Original));
2930 } else {
2931 panic!("Expected Multicast step");
2932 }
2933 }
2934
2935 #[test]
2936 fn test_build_canonical_rejects_unsupported_processor_step() {
2937 let err = RouteBuilder::from("direct:start")
2938 .route_id("canonical-reject")
2939 .set_header("k", Value::String("v".into()))
2940 .build_canonical()
2941 .unwrap_err();
2942
2943 assert!(format!("{err}").contains("does not support step `processor`"));
2944 }
2945
2946 #[test]
2949 fn test_load_balance_builder_weighted_strategy() {
2950 let route = RouteBuilder::from("direct:start")
2951 .route_id("lb-weighted")
2952 .load_balance()
2953 .weighted(vec![
2954 ("direct:a".to_string(), 5),
2955 ("direct:b".to_string(), 2),
2956 ("direct:c".to_string(), 1),
2957 ])
2958 .to("mock:result")
2959 .end_load_balance()
2960 .build()
2961 .unwrap();
2962
2963 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
2964 assert!(matches!(config.strategy, LoadBalanceStrategy::Weighted(_)));
2965 } else {
2966 panic!("Expected LoadBalance step");
2967 }
2968 }
2969
2970 #[test]
2971 fn test_load_balance_builder_failover_strategy() {
2972 let route = RouteBuilder::from("direct:start")
2973 .route_id("lb-failover")
2974 .load_balance()
2975 .failover()
2976 .to("mock:primary")
2977 .end_load_balance()
2978 .build()
2979 .unwrap();
2980
2981 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
2982 assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
2983 assert!(!config.parallel);
2984 } else {
2985 panic!("Expected LoadBalance step");
2986 }
2987 }
2988
2989 #[test]
2990 fn test_load_balance_builder_parallel_false_explicit() {
2991 let route = RouteBuilder::from("direct:start")
2992 .route_id("lb-parallel-false")
2993 .load_balance()
2994 .round_robin()
2995 .parallel(false)
2996 .to("mock:result")
2997 .end_load_balance()
2998 .build()
2999 .unwrap();
3000
3001 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
3002 assert!(!config.parallel);
3003 } else {
3004 panic!("Expected LoadBalance step");
3005 }
3006 }
3007
3008 #[test]
3011 fn test_filter_in_split_builder_typestate() {
3012 use camel_api::splitter::{SplitterConfig, split_body_lines};
3013
3014 let definition = RouteBuilder::from("timer:test")
3015 .route_id("filter-in-split")
3016 .split(SplitterConfig::new(split_body_lines()))
3017 .filter(|_ex| true)
3018 .to("mock:filtered")
3019 .end_filter()
3020 .end_split()
3021 .build()
3022 .unwrap();
3023
3024 assert_eq!(definition.steps().len(), 1);
3025 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3026 assert_eq!(steps.len(), 1);
3027 assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3028 } else {
3029 panic!("Expected Split step");
3030 }
3031 }
3032
3033 #[test]
3034 fn test_filter_in_split_builder_multiple_steps() {
3035 use camel_api::splitter::{SplitterConfig, split_body_lines};
3036
3037 let definition = RouteBuilder::from("timer:test")
3038 .route_id("filter-in-split-multi")
3039 .split(SplitterConfig::new(split_body_lines()))
3040 .to("mock:before-filter")
3041 .filter(|_ex| true)
3042 .to("mock:inside-filter")
3043 .end_filter()
3044 .to("mock:after-filter")
3045 .end_split()
3046 .build()
3047 .unwrap();
3048
3049 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3050 assert_eq!(steps.len(), 3);
3052 } else {
3053 panic!("Expected Split step");
3054 }
3055 }
3056
3057 #[test]
3060 fn test_build_canonical_with_circuit_breaker() {
3061 use camel_api::circuit_breaker::CircuitBreakerConfig;
3062
3063 let spec = RouteBuilder::from("direct:start")
3064 .route_id("canonical-cb")
3065 .circuit_breaker(CircuitBreakerConfig::new().failure_threshold(10))
3066 .to("mock:result")
3067 .build_canonical()
3068 .unwrap();
3069
3070 let cb = spec.circuit_breaker.expect("circuit breaker should be set");
3071 assert_eq!(cb.failure_threshold, 10);
3072 }
3073
3074 #[test]
3075 fn test_build_canonical_rejects_custom_split_aggregation() {
3076 use camel_api::splitter::{SplitterConfig, split_body_lines};
3077
3078 let err = RouteBuilder::from("direct:start")
3079 .route_id("canonical-custom-split")
3080 .split(SplitterConfig::new(split_body_lines()).aggregation(
3081 camel_api::splitter::AggregationStrategy::Custom(Arc::new(|_, ex| ex)),
3082 ))
3083 .to("mock:frag")
3084 .end_split()
3085 .build_canonical()
3086 .unwrap_err();
3087
3088 assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3090 }
3091
3092 #[test]
3093 fn test_build_canonical_rejects_custom_aggregate_strategy() {
3094 let err = RouteBuilder::from("direct:start")
3095 .route_id("canonical-custom-agg")
3096 .aggregate(
3097 AggregatorConfig::correlate_by("key")
3098 .complete_when_size(2)
3099 .strategy(AggregationStrategy::Custom(Arc::new(|_, ex| ex)))
3100 .build(),
3101 )
3102 .build_canonical()
3103 .unwrap_err();
3104
3105 assert!(format!("{err}").contains("custom aggregate strategy"));
3106 }
3107
3108 #[test]
3109 fn test_build_canonical_rejects_fn_correlation_strategy() {
3110 let err = RouteBuilder::from("direct:start")
3111 .route_id("canonical-fn-corr")
3112 .aggregate(AggregatorConfig {
3113 header_name: "key".to_string(),
3114 completion: CompletionMode::Single(CompletionCondition::Size(1)),
3115 correlation: CorrelationStrategy::Fn(Arc::new(|_| Some("key".to_string()))),
3116 strategy: AggregationStrategy::CollectAll,
3117 max_buckets: None,
3118 bucket_ttl: None,
3119 force_completion_on_stop: false,
3120 discard_on_timeout: false,
3121 })
3122 .build_canonical()
3123 .unwrap_err();
3124
3125 assert!(format!("{err}").contains("Fn correlation strategy"));
3126 }
3127
3128 #[test]
3129 fn test_build_canonical_rejects_predicate_completion() {
3130 let err = RouteBuilder::from("direct:start")
3131 .route_id("canonical-pred-completion")
3132 .aggregate(AggregatorConfig {
3133 header_name: "key".to_string(),
3134 completion: CompletionMode::Single(CompletionCondition::Predicate(Arc::new(
3135 |_| false,
3136 ))),
3137 correlation: CorrelationStrategy::HeaderName("key".to_string()),
3138 strategy: AggregationStrategy::CollectAll,
3139 max_buckets: None,
3140 bucket_ttl: None,
3141 force_completion_on_stop: false,
3142 discard_on_timeout: false,
3143 })
3144 .build_canonical()
3145 .unwrap_err();
3146
3147 assert!(format!("{err}").contains("predicate completion"));
3148 }
3149
3150 #[test]
3151 fn test_build_canonical_with_expression_correlation() {
3152 let spec = RouteBuilder::from("direct:start")
3153 .route_id("canonical-expr-corr")
3154 .aggregate(AggregatorConfig {
3155 header_name: "key".to_string(),
3156 completion: CompletionMode::Single(CompletionCondition::Size(1)),
3157 correlation: CorrelationStrategy::Expression {
3158 expr: "header.key".to_string(),
3159 language: "simple".to_string(),
3160 },
3161 strategy: AggregationStrategy::CollectAll,
3162 max_buckets: None,
3163 bucket_ttl: None,
3164 force_completion_on_stop: false,
3165 discard_on_timeout: false,
3166 })
3167 .build_canonical()
3168 .unwrap();
3169
3170 assert!(spec.steps.iter().any(|s| matches!(s, CanonicalStepSpec::Aggregate(a) if a.correlation_key == Some("header.key".to_string()))));
3171 }
3172
3173 #[test]
3174 fn test_build_canonical_split_rejected_with_closure_expression() {
3175 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
3176
3177 let err = RouteBuilder::from("direct:start")
3179 .route_id("canonical-split-last")
3180 .split(
3181 SplitterConfig::new(split_body_lines()).aggregation(AggregationStrategy::LastWins),
3182 )
3183 .to("mock:frag")
3184 .end_split()
3185 .build_canonical()
3186 .unwrap_err();
3187
3188 assert!(format!("{err}").contains("canonical v1 does not support step `split`"));
3189 }
3190
3191 #[test]
3194 fn test_on_exception_full_chain_retry_backoff_jitter_handled_by() {
3195 let definition = RouteBuilder::from("direct:start")
3196 .route_id("on-exception-full")
3197 .dead_letter_channel("log:dlc")
3198 .on_exception(|e| matches!(e, CamelError::Io(_)))
3199 .retry(5)
3200 .with_backoff(Duration::from_millis(10), 2.0, Duration::from_millis(500))
3201 .with_jitter(0.3)
3202 .handled_by("log:io-handler")
3203 .end_on_exception()
3204 .to("mock:out")
3205 .build()
3206 .unwrap();
3207
3208 let cfg = definition
3209 .error_handler_config()
3210 .expect("error handler should be set");
3211 assert_eq!(cfg.policies.len(), 1);
3212 let policy = &cfg.policies[0];
3213 let retry = policy.retry.as_ref().expect("retry should be set");
3214 assert_eq!(retry.max_attempts, 5);
3215 assert_eq!(retry.initial_delay, Duration::from_millis(10));
3216 assert_eq!(retry.multiplier, 2.0);
3217 assert_eq!(retry.max_delay, Duration::from_millis(500));
3218 assert!((retry.jitter_factor - 0.3).abs() < f64::EPSILON);
3219 assert_eq!(policy.handled_by.as_deref(), Some("log:io-handler"));
3220 }
3221
3222 #[test]
3223 fn test_on_exception_jitter_clamped_to_valid_range() {
3224 let definition = RouteBuilder::from("direct:start")
3225 .route_id("jitter-clamp")
3226 .on_exception(|_e| true)
3227 .retry(1)
3228 .with_jitter(5.0)
3229 .end_on_exception()
3230 .to("mock:out")
3231 .build()
3232 .unwrap();
3233
3234 let cfg = definition.error_handler_config().unwrap();
3235 let retry = cfg.policies[0].retry.as_ref().unwrap();
3236 assert!((retry.jitter_factor - 1.0).abs() < f64::EPSILON);
3237 }
3238
3239 #[test]
3242 fn test_builder_process_fn_adds_processor_step() {
3243 use camel_api::BoxProcessorExt;
3244 let processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
3245 let definition = RouteBuilder::from("timer:tick")
3246 .route_id("process-fn-test")
3247 .process_fn(processor)
3248 .build()
3249 .unwrap();
3250
3251 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3252 }
3253
3254 #[test]
3255 fn test_builder_convert_body_to_adds_processor_step() {
3256 let definition = RouteBuilder::from("timer:tick")
3257 .route_id("convert-body-test")
3258 .convert_body_to(BodyType::Json)
3259 .build()
3260 .unwrap();
3261
3262 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
3263 }
3264
3265 #[test]
3266 fn test_builder_bean_adds_bean_step() {
3267 let definition = RouteBuilder::from("timer:tick")
3268 .route_id("bean-test")
3269 .bean("myBean", "process")
3270 .build()
3271 .unwrap();
3272
3273 assert!(
3274 matches!(&definition.steps()[0], BuilderStep::Bean { name, method }
3275 if name == "myBean" && method == "process")
3276 );
3277 }
3278
3279 #[test]
3282 fn test_throttle_builder_delay_strategy() {
3283 let definition = RouteBuilder::from("timer:tick")
3284 .route_id("throttle-delay")
3285 .throttle(10, Duration::from_secs(1))
3286 .strategy(ThrottleStrategy::Delay)
3287 .to("mock:result")
3288 .end_throttle()
3289 .build()
3290 .unwrap();
3291
3292 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3293 assert_eq!(config.strategy, ThrottleStrategy::Delay);
3294 } else {
3295 panic!("Expected Throttle step");
3296 }
3297 }
3298
3299 #[test]
3300 fn test_throttle_builder_drop_strategy() {
3301 let definition = RouteBuilder::from("timer:tick")
3302 .route_id("throttle-drop")
3303 .throttle(10, Duration::from_secs(1))
3304 .strategy(ThrottleStrategy::Drop)
3305 .to("mock:result")
3306 .end_throttle()
3307 .build()
3308 .unwrap();
3309
3310 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
3311 assert_eq!(config.strategy, ThrottleStrategy::Drop);
3312 } else {
3313 panic!("Expected Throttle step");
3314 }
3315 }
3316
3317 #[test]
3320 fn test_nested_loop_while_builder() {
3321 use camel_api::loop_eip::LoopMode;
3322
3323 let def = RouteBuilder::from("direct:start")
3324 .route_id("nested-loop-while")
3325 .loop_count(2)
3326 .to("mock:outer")
3327 .loop_while(|_ex| true)
3328 .to("mock:inner")
3329 .end_loop()
3330 .end_loop()
3331 .build()
3332 .unwrap();
3333
3334 assert_eq!(def.steps().len(), 1);
3335 if let BuilderStep::Loop { steps, .. } = &def.steps()[0] {
3336 assert_eq!(steps.len(), 2);
3337 if let BuilderStep::Loop { config, .. } = &steps[1] {
3338 assert!(matches!(config.mode, LoopMode::While(_)));
3339 } else {
3340 panic!("Expected inner Loop step");
3341 }
3342 } else {
3343 panic!("Expected outer Loop step");
3344 }
3345 }
3346
3347 #[test]
3350 fn test_choice_builder_multiple_whens_with_otherwise() {
3351 let definition = RouteBuilder::from("timer:tick")
3352 .route_id("choice-multi-otherwise")
3353 .choice()
3354 .when(|ex: &Exchange| ex.input.header("a").is_some())
3355 .to("mock:a")
3356 .end_when()
3357 .when(|ex: &Exchange| ex.input.header("b").is_some())
3358 .to("mock:b")
3359 .end_when()
3360 .when(|ex: &Exchange| ex.input.header("c").is_some())
3361 .to("mock:c")
3362 .end_when()
3363 .otherwise()
3364 .to("mock:fallback")
3365 .end_otherwise()
3366 .end_choice()
3367 .build()
3368 .unwrap();
3369
3370 if let BuilderStep::Choice { whens, otherwise } = &definition.steps()[0] {
3371 assert_eq!(whens.len(), 3);
3372 assert!(otherwise.is_some());
3373 assert_eq!(otherwise.as_ref().unwrap().len(), 1);
3374 } else {
3375 panic!("Expected Choice step");
3376 }
3377 }
3378
3379 #[test]
3382 fn test_multicast_builder_parallel_only() {
3383 let route = RouteBuilder::from("direct:start")
3384 .route_id("multicast-parallel")
3385 .multicast()
3386 .parallel(true)
3387 .to("mock:a")
3388 .end_multicast()
3389 .build()
3390 .unwrap();
3391
3392 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3393 assert!(config.parallel);
3394 assert_eq!(config.parallel_limit, None);
3395 } else {
3396 panic!("Expected Multicast step");
3397 }
3398 }
3399
3400 #[test]
3401 fn test_multicast_builder_timeout_only() {
3402 let route = RouteBuilder::from("direct:start")
3403 .route_id("multicast-timeout")
3404 .multicast()
3405 .timeout(Duration::from_secs(5))
3406 .to("mock:a")
3407 .end_multicast()
3408 .build()
3409 .unwrap();
3410
3411 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3412 assert_eq!(config.timeout, Some(Duration::from_secs(5)));
3413 } else {
3414 panic!("Expected Multicast step");
3415 }
3416 }
3417
3418 #[test]
3419 fn test_multicast_builder_aggregation_collect_all() {
3420 let route = RouteBuilder::from("direct:start")
3421 .route_id("multicast-collect")
3422 .multicast()
3423 .aggregation(MulticastStrategy::CollectAll)
3424 .to("mock:a")
3425 .end_multicast()
3426 .build()
3427 .unwrap();
3428
3429 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
3430 assert!(matches!(config.aggregation, MulticastStrategy::CollectAll));
3431 } else {
3432 panic!("Expected Multicast step");
3433 }
3434 }
3435
3436 #[test]
3439 fn test_build_canonical_aggregate_any_completion_mode() {
3440 let spec = RouteBuilder::from("direct:start")
3441 .route_id("canonical-any-completion")
3442 .aggregate(
3443 AggregatorConfig::correlate_by("key")
3444 .complete_on_size_or_timeout(10, Duration::from_secs(30))
3445 .build(),
3446 )
3447 .build_canonical()
3448 .unwrap();
3449
3450 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3451 assert_eq!(agg.completion_size, Some(10));
3452 assert_eq!(agg.completion_timeout_ms, Some(30_000));
3453 } else {
3454 panic!("Expected Aggregate step");
3455 }
3456 }
3457
3458 #[test]
3459 fn test_build_canonical_aggregate_timeout_completion() {
3460 let spec = RouteBuilder::from("direct:start")
3461 .route_id("canonical-timeout-completion")
3462 .aggregate(
3463 AggregatorConfig::correlate_by("key")
3464 .complete_on_timeout(Duration::from_millis(500))
3465 .build(),
3466 )
3467 .build_canonical()
3468 .unwrap();
3469
3470 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3471 assert_eq!(agg.completion_size, None);
3472 assert_eq!(agg.completion_timeout_ms, Some(500));
3473 } else {
3474 panic!("Expected Aggregate step");
3475 }
3476 }
3477
3478 #[test]
3481 fn test_build_canonical_aggregate_discard_on_timeout() {
3482 use camel_api::aggregator::AggregatorConfig;
3483
3484 let spec = RouteBuilder::from("direct:start")
3485 .route_id("canonical-discard-timeout")
3486 .aggregate(
3487 AggregatorConfig::correlate_by("key")
3488 .complete_when_size(1)
3489 .discard_on_timeout(true)
3490 .build(),
3491 )
3492 .build_canonical()
3493 .unwrap();
3494
3495 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3496 assert_eq!(agg.discard_on_timeout, Some(true));
3497 } else {
3498 panic!("Expected Aggregate step");
3499 }
3500 }
3501
3502 #[test]
3503 fn test_build_canonical_aggregate_force_completion_on_stop() {
3504 use camel_api::aggregator::AggregatorConfig;
3505
3506 let spec = RouteBuilder::from("direct:start")
3507 .route_id("canonical-force-stop")
3508 .aggregate(
3509 AggregatorConfig::correlate_by("key")
3510 .complete_when_size(1)
3511 .force_completion_on_stop(true)
3512 .build(),
3513 )
3514 .build_canonical()
3515 .unwrap();
3516
3517 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3518 assert_eq!(agg.force_completion_on_stop, Some(true));
3519 } else {
3520 panic!("Expected Aggregate step");
3521 }
3522 }
3523
3524 #[test]
3527 fn test_build_canonical_aggregate_max_buckets_and_ttl() {
3528 use camel_api::aggregator::AggregatorConfig;
3529
3530 let spec = RouteBuilder::from("direct:start")
3531 .route_id("canonical-buckets-ttl")
3532 .aggregate(
3533 AggregatorConfig::correlate_by("key")
3534 .complete_when_size(1)
3535 .max_buckets(100)
3536 .bucket_ttl(Duration::from_secs(60))
3537 .build(),
3538 )
3539 .build_canonical()
3540 .unwrap();
3541
3542 if let CanonicalStepSpec::Aggregate(agg) = &spec.steps[0] {
3543 assert_eq!(agg.max_buckets, Some(100));
3544 assert_eq!(agg.bucket_ttl_ms, Some(60_000));
3545 } else {
3546 panic!("Expected Aggregate step");
3547 }
3548 }
3549
3550 #[test]
3553 fn test_split_builder_with_filter_inside() {
3554 use camel_api::splitter::{SplitterConfig, split_body_lines};
3555
3556 let definition = RouteBuilder::from("timer:test")
3557 .route_id("split-with-filter")
3558 .split(SplitterConfig::new(split_body_lines()))
3559 .filter(|_ex| true)
3560 .to("mock:filtered-frag")
3561 .end_filter()
3562 .end_split()
3563 .build()
3564 .unwrap();
3565
3566 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3567 assert_eq!(steps.len(), 1);
3568 assert!(matches!(&steps[0], BuilderStep::Filter { .. }));
3569 } else {
3570 panic!("Expected Split step");
3571 }
3572 }
3573
3574 #[test]
3577 fn test_wire_tap_multiple_taps() {
3578 let definition = RouteBuilder::from("timer:tick")
3579 .route_id("multi-wire-tap")
3580 .wire_tap("mock:tap1")
3581 .wire_tap("mock:tap2")
3582 .to("mock:result")
3583 .build()
3584 .unwrap();
3585
3586 assert_eq!(definition.steps().len(), 3);
3587 assert!(
3588 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap1")
3589 );
3590 assert!(
3591 matches!(&definition.steps()[1], BuilderStep::WireTap { uri } if uri == "mock:tap2")
3592 );
3593 }
3594
3595 #[test]
3598 fn test_builder_shorthand_then_explicit_mixed_mode() {
3599 let result = RouteBuilder::from("direct:start")
3600 .route_id("mixed-mode-2")
3601 .dead_letter_channel("log:dlc")
3602 .error_handler(ErrorHandlerConfig::log_only())
3603 .to("mock:out")
3604 .build();
3605
3606 let err = result.err().expect("mixed mode should fail");
3607 assert!(format!("{err}").contains("mixed error handler modes"));
3608 }
3609
3610 #[test]
3613 fn test_build_canonical_empty_from_uri_errors() {
3614 let result = RouteBuilder::from("").route_id("test").build_canonical();
3615 assert!(result.is_err());
3616 }
3617
3618 #[test]
3619 fn test_build_canonical_missing_route_id_errors() {
3620 let result = RouteBuilder::from("direct:start").build_canonical();
3621 assert!(result.is_err());
3622 let err = result.unwrap_err().to_string();
3623 assert!(err.contains("route_id"));
3624 }
3625
3626 #[test]
3629 fn test_split_builder_with_aggregate_inside() {
3630 use camel_api::aggregator::AggregatorConfig;
3631 use camel_api::splitter::{SplitterConfig, split_body_lines};
3632
3633 let definition = RouteBuilder::from("timer:test")
3634 .route_id("split-agg")
3635 .split(SplitterConfig::new(split_body_lines()))
3636 .aggregate(
3637 AggregatorConfig::correlate_by("frag-key")
3638 .complete_when_size(3)
3639 .build(),
3640 )
3641 .end_split()
3642 .build()
3643 .unwrap();
3644
3645 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
3646 assert_eq!(steps.len(), 1);
3647 assert!(matches!(&steps[0], BuilderStep::Aggregate { .. }));
3648 } else {
3649 panic!("Expected Split step");
3650 }
3651 }
3652
3653 #[test]
3656 fn test_throttle_builder_with_steps_inside() {
3657 let definition = RouteBuilder::from("timer:tick")
3658 .route_id("throttle-steps")
3659 .throttle(10, Duration::from_secs(1))
3660 .set_header("throttled", Value::Bool(true))
3661 .to("mock:throttled")
3662 .end_throttle()
3663 .build()
3664 .unwrap();
3665
3666 if let BuilderStep::Throttle { steps, .. } = &definition.steps()[0] {
3667 assert_eq!(steps.len(), 2);
3668 } else {
3669 panic!("Expected Throttle step");
3670 }
3671 }
3672
3673 #[test]
3676 fn test_load_balance_builder_with_steps_inside() {
3677 let definition = RouteBuilder::from("timer:tick")
3678 .route_id("lb-steps")
3679 .load_balance()
3680 .round_robin()
3681 .set_header("lb", Value::Bool(true))
3682 .to("mock:lb")
3683 .end_load_balance()
3684 .build()
3685 .unwrap();
3686
3687 if let BuilderStep::LoadBalance { steps, .. } = &definition.steps()[0] {
3688 assert_eq!(steps.len(), 2);
3689 } else {
3690 panic!("Expected LoadBalance step");
3691 }
3692 }
3693
3694 #[test]
3697 fn test_multicast_builder_with_steps_inside() {
3698 let definition = RouteBuilder::from("timer:tick")
3699 .route_id("multicast-steps")
3700 .multicast()
3701 .set_header("mc", Value::Bool(true))
3702 .to("mock:multicast")
3703 .end_multicast()
3704 .build()
3705 .unwrap();
3706
3707 if let BuilderStep::Multicast { steps, .. } = &definition.steps()[0] {
3708 assert_eq!(steps.len(), 2);
3709 } else {
3710 panic!("Expected Multicast step");
3711 }
3712 }
3713
3714 #[test]
3717 fn test_loop_builder_with_steps_inside() {
3718 let definition = RouteBuilder::from("timer:tick")
3719 .route_id("loop-steps")
3720 .loop_count(3)
3721 .set_header("loop", Value::Bool(true))
3722 .to("mock:loop")
3723 .end_loop()
3724 .build()
3725 .unwrap();
3726
3727 if let BuilderStep::Loop { steps, .. } = &definition.steps()[0] {
3728 assert_eq!(steps.len(), 2);
3729 } else {
3730 panic!("Expected Loop step");
3731 }
3732 }
3733
3734 #[test]
3737 fn test_build_canonical_rejects_loop_step() {
3738 let err = RouteBuilder::from("direct:start")
3739 .route_id("canonical-loop")
3740 .loop_count(3)
3741 .to("mock:loop")
3742 .end_loop()
3743 .build_canonical()
3744 .unwrap_err();
3745
3746 assert!(format!("{err}").contains("does not support step `loop`"));
3747 }
3748
3749 #[test]
3750 fn test_build_canonical_rejects_multicast_step() {
3751 let err = RouteBuilder::from("direct:start")
3752 .route_id("canonical-multicast")
3753 .multicast()
3754 .to("mock:a")
3755 .end_multicast()
3756 .build_canonical()
3757 .unwrap_err();
3758
3759 assert!(format!("{err}").contains("does not support step `multicast`"));
3760 }
3761
3762 #[test]
3763 fn test_build_canonical_rejects_throttle_step() {
3764 let err = RouteBuilder::from("direct:start")
3765 .route_id("canonical-throttle")
3766 .throttle(10, Duration::from_secs(1))
3767 .to("mock:result")
3768 .end_throttle()
3769 .build_canonical()
3770 .unwrap_err();
3771
3772 assert!(format!("{err}").contains("does not support step `throttle`"));
3773 }
3774
3775 #[test]
3776 fn test_build_canonical_rejects_load_balancer_step() {
3777 let err = RouteBuilder::from("direct:start")
3778 .route_id("canonical-lb")
3779 .load_balance()
3780 .round_robin()
3781 .to("mock:result")
3782 .end_load_balance()
3783 .build_canonical()
3784 .unwrap_err();
3785
3786 assert!(format!("{err}").contains("does not support step `load_balancer`"));
3787 }
3788
3789 #[test]
3790 fn test_build_canonical_rejects_bean_step() {
3791 let err = RouteBuilder::from("direct:start")
3792 .route_id("canonical-bean")
3793 .bean("myBean", "process")
3794 .build_canonical()
3795 .unwrap_err();
3796
3797 assert!(format!("{err}").contains("does not support step `bean`"));
3798 }
3799
3800 #[test]
3801 fn test_build_canonical_rejects_script_step() {
3802 let err = RouteBuilder::from("direct:start")
3803 .route_id("canonical-script")
3804 .script("rhai", "x = 1")
3805 .build_canonical()
3806 .unwrap_err();
3807
3808 assert!(format!("{err}").contains("does not support step `script`"));
3809 }
3810
3811 #[test]
3812 fn test_build_canonical_accepts_delay_step() {
3813 let spec = RouteBuilder::from("direct:start")
3814 .route_id("canonical-delay")
3815 .delay(Duration::from_millis(100))
3816 .build_canonical()
3817 .unwrap();
3818
3819 assert!(
3820 spec.steps.iter().any(
3821 |s| matches!(s, CanonicalStepSpec::Delay { delay_ms, .. } if *delay_ms == 100)
3822 )
3823 );
3824 }
3825
3826 #[test]
3827 fn test_build_canonical_accepts_wire_tap_step() {
3828 let spec = RouteBuilder::from("direct:start")
3829 .route_id("canonical-wiretap")
3830 .wire_tap("mock:tap")
3831 .build_canonical()
3832 .unwrap();
3833
3834 assert!(
3835 spec.steps
3836 .iter()
3837 .any(|s| matches!(s, CanonicalStepSpec::WireTap { uri } if uri == "mock:tap"))
3838 );
3839 }
3840
3841 #[test]
3842 fn test_build_canonical_rejects_dynamic_router_step() {
3843 let err = RouteBuilder::from("direct:start")
3844 .route_id("canonical-dyn-router")
3845 .dynamic_router(Arc::new(|_| Some("mock:a".to_string())))
3846 .build_canonical()
3847 .unwrap_err();
3848
3849 assert!(format!("{err}").contains("does not support step `dynamic_router`"));
3850 }
3851
3852 #[test]
3853 fn test_build_canonical_rejects_routing_slip_step() {
3854 let err = RouteBuilder::from("direct:start")
3855 .route_id("canonical-routing-slip")
3856 .routing_slip(Arc::new(|_| Some("mock:a".to_string())))
3857 .build_canonical()
3858 .unwrap_err();
3859
3860 assert!(format!("{err}").contains("does not support step `routing_slip`"));
3861 }
3862
3863 #[test]
3864 fn test_build_canonical_rejects_recipient_list_step() {
3865 let err = RouteBuilder::from("direct:start")
3866 .route_id("canonical-recipient")
3867 .recipient_list(Arc::new(|_| "mock:a".to_string()))
3868 .build_canonical()
3869 .unwrap_err();
3870
3871 assert!(format!("{err}").contains("does not support step `recipient_list`"));
3872 }
3873
3874 #[test]
3877 fn test_build_canonical_rejects_any_mode_with_predicate() {
3878 let err = RouteBuilder::from("direct:start")
3879 .route_id("canonical-any-pred")
3880 .aggregate(AggregatorConfig {
3881 header_name: "key".to_string(),
3882 completion: CompletionMode::Any(vec![
3883 CompletionCondition::Size(5),
3884 CompletionCondition::Predicate(Arc::new(|_| false)),
3885 ]),
3886 correlation: CorrelationStrategy::HeaderName("key".to_string()),
3887 strategy: AggregationStrategy::CollectAll,
3888 max_buckets: None,
3889 bucket_ttl: None,
3890 force_completion_on_stop: false,
3891 discard_on_timeout: false,
3892 })
3893 .build_canonical()
3894 .unwrap_err();
3895
3896 assert!(format!("{err}").contains("predicate completion"));
3897 }
3898}