1use camel_api::DelayConfig;
2use camel_api::aggregator::{
3 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
4};
5use camel_api::body::Body;
6use camel_api::body_converter::BodyType;
7use camel_api::circuit_breaker::CircuitBreakerConfig;
8use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
9use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
10use camel_api::load_balancer::LoadBalancerConfig;
11use camel_api::multicast::{MulticastConfig, MulticastStrategy};
12use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
13use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
14use camel_api::splitter::SplitterConfig;
15use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
16use camel_api::{
17 BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
18 ProcessorFn, Value,
19 runtime::{
20 CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
21 CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
22 CanonicalWhenSpec,
23 },
24};
25use camel_component_api::ConcurrencyModel;
26use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
27use camel_processor::{
28 ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
29 UnmarshalService, builtin_data_format,
30};
31
32pub trait StepAccumulator: Sized {
38 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
39
40 fn to(mut self, endpoint: impl Into<String>) -> Self {
41 self.steps_mut().push(BuilderStep::To(endpoint.into()));
42 self
43 }
44
45 fn process<F, Fut>(mut self, f: F) -> Self
46 where
47 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
48 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
49 {
50 let svc = ProcessorFn::new(f);
51 self.steps_mut()
52 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
53 self
54 }
55
56 fn process_fn(mut self, processor: BoxProcessor) -> Self {
57 self.steps_mut().push(BuilderStep::Processor(processor));
58 self
59 }
60
61 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
62 let svc = SetHeader::new(IdentityProcessor, key, value);
63 self.steps_mut()
64 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
65 self
66 }
67
68 fn map_body<F>(mut self, mapper: F) -> Self
69 where
70 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
71 {
72 let svc = MapBody::new(IdentityProcessor, mapper);
73 self.steps_mut()
74 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
75 self
76 }
77
78 fn set_body<B>(mut self, body: B) -> Self
79 where
80 B: Into<Body> + Clone + Send + Sync + 'static,
81 {
82 let body: Body = body.into();
83 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
84 self.steps_mut()
85 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
86 self
87 }
88
89 fn transform<B>(self, body: B) -> Self
94 where
95 B: Into<Body> + Clone + Send + Sync + 'static,
96 {
97 self.set_body(body)
98 }
99
100 fn set_body_fn<F>(mut self, expr: F) -> Self
101 where
102 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
103 {
104 let svc = SetBody::new(IdentityProcessor, expr);
105 self.steps_mut()
106 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
107 self
108 }
109
110 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
111 where
112 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
113 {
114 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
115 self.steps_mut()
116 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
117 self
118 }
119
120 fn aggregate(mut self, config: AggregatorConfig) -> Self {
121 self.steps_mut().push(BuilderStep::Aggregate { config });
122 self
123 }
124
125 fn stop(mut self) -> Self {
131 self.steps_mut().push(BuilderStep::Stop);
132 self
133 }
134
135 fn delay(mut self, duration: std::time::Duration) -> Self {
136 self.steps_mut().push(BuilderStep::Delay {
137 config: DelayConfig::from_duration(duration),
138 });
139 self
140 }
141
142 fn delay_with_header(
143 mut self,
144 duration: std::time::Duration,
145 header: impl Into<String>,
146 ) -> Self {
147 self.steps_mut().push(BuilderStep::Delay {
148 config: DelayConfig::from_duration_with_header(duration, header),
149 });
150 self
151 }
152
153 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
157 self.steps_mut().push(BuilderStep::Log {
158 level,
159 message: message.into(),
160 });
161 self
162 }
163
164 fn convert_body_to(mut self, target: BodyType) -> Self {
176 let svc = ConvertBodyTo::new(IdentityProcessor, target);
177 self.steps_mut()
178 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
179 self
180 }
181
182 fn marshal(mut self, format: impl Into<String>) -> Self {
192 let name = format.into();
193 let df =
194 builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
195 let svc = MarshalService::new(IdentityProcessor, df);
196 self.steps_mut()
197 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
198 self
199 }
200
201 fn unmarshal(mut self, format: impl Into<String>) -> Self {
211 let name = format.into();
212 let df =
213 builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
214 let svc = UnmarshalService::new(IdentityProcessor, df);
215 self.steps_mut()
216 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
217 self
218 }
219
220 fn validate(mut self, schema_path: impl Into<String>) -> Self {
229 let path = schema_path.into();
230 let uri = if path.starts_with("validator:") {
231 path
232 } else {
233 format!("validator:{path}")
234 };
235 self.steps_mut().push(BuilderStep::To(uri));
236 self
237 }
238
239 fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
250 self.steps_mut().push(BuilderStep::Script {
251 language: language.into(),
252 script: script.into(),
253 });
254 self
255 }
256}
257
258pub struct RouteBuilder {
270 from_uri: String,
271 steps: Vec<BuilderStep>,
272 error_handler: Option<ErrorHandlerConfig>,
273 error_handler_mode: ErrorHandlerMode,
274 circuit_breaker_config: Option<CircuitBreakerConfig>,
275 concurrency: Option<ConcurrencyModel>,
276 route_id: Option<String>,
277 auto_startup: Option<bool>,
278 startup_order: Option<i32>,
279}
280
281#[derive(Default)]
282enum ErrorHandlerMode {
283 #[default]
284 None,
285 ExplicitConfig,
286 Shorthand {
287 dlc_uri: Option<String>,
288 specs: Vec<OnExceptionSpec>,
289 },
290 Mixed,
291}
292
293#[derive(Clone)]
294struct OnExceptionSpec {
295 matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
296 retry: Option<RedeliveryPolicy>,
297 handled_by: Option<String>,
298}
299
300impl RouteBuilder {
301 pub fn from(endpoint: &str) -> Self {
303 Self {
304 from_uri: endpoint.to_string(),
305 steps: Vec::new(),
306 error_handler: None,
307 error_handler_mode: ErrorHandlerMode::None,
308 circuit_breaker_config: None,
309 concurrency: None,
310 route_id: None,
311 auto_startup: None,
312 startup_order: None,
313 }
314 }
315
316 pub fn filter<F>(self, predicate: F) -> FilterBuilder
320 where
321 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
322 {
323 FilterBuilder {
324 parent: self,
325 predicate: std::sync::Arc::new(predicate),
326 steps: vec![],
327 }
328 }
329
330 pub fn choice(self) -> ChoiceBuilder {
336 ChoiceBuilder {
337 parent: self,
338 whens: vec![],
339 _otherwise: None,
340 }
341 }
342
343 pub fn wire_tap(mut self, endpoint: &str) -> Self {
347 self.steps.push(BuilderStep::WireTap {
348 uri: endpoint.to_string(),
349 });
350 self
351 }
352
353 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
355 self.error_handler_mode = match self.error_handler_mode {
356 ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
357 ErrorHandlerMode::ExplicitConfig
358 }
359 ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
360 };
361 self.error_handler = Some(config);
362 self
363 }
364
365 pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
367 let uri = uri.into();
368 self.error_handler_mode = match self.error_handler_mode {
369 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
370 dlc_uri: Some(uri),
371 specs: Vec::new(),
372 },
373 ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
374 dlc_uri: Some(uri),
375 specs,
376 },
377 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
378 };
379 self
380 }
381
382 pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
384 where
385 F: Fn(&CamelError) -> bool + Send + Sync + 'static,
386 {
387 self.error_handler_mode = match self.error_handler_mode {
388 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
389 dlc_uri: None,
390 specs: Vec::new(),
391 },
392 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
393 shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
394 };
395
396 OnExceptionBuilder {
397 parent: self,
398 policy: OnExceptionSpec {
399 matches: std::sync::Arc::new(matches),
400 retry: None,
401 handled_by: None,
402 },
403 }
404 }
405
406 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
408 self.circuit_breaker_config = Some(config);
409 self
410 }
411
412 pub fn concurrent(mut self, max: usize) -> Self {
426 let max = if max == 0 { None } else { Some(max) };
427 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
428 self
429 }
430
431 pub fn sequential(mut self) -> Self {
436 self.concurrency = Some(ConcurrencyModel::Sequential);
437 self
438 }
439
440 pub fn route_id(mut self, id: impl Into<String>) -> Self {
444 self.route_id = Some(id.into());
445 self
446 }
447
448 pub fn auto_startup(mut self, auto: bool) -> Self {
452 self.auto_startup = Some(auto);
453 self
454 }
455
456 pub fn startup_order(mut self, order: i32) -> Self {
460 self.startup_order = Some(order);
461 self
462 }
463
464 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
470 SplitBuilder {
471 parent: self,
472 config,
473 steps: Vec::new(),
474 }
475 }
476
477 pub fn multicast(self) -> MulticastBuilder {
483 MulticastBuilder {
484 parent: self,
485 steps: Vec::new(),
486 config: MulticastConfig::new(),
487 }
488 }
489
490 pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
497 ThrottleBuilder {
498 parent: self,
499 config: ThrottlerConfig::new(max_requests, period),
500 steps: Vec::new(),
501 }
502 }
503
504 pub fn load_balance(self) -> LoadBalancerBuilder {
510 LoadBalancerBuilder {
511 parent: self,
512 config: LoadBalancerConfig::round_robin(),
513 steps: Vec::new(),
514 }
515 }
516
517 pub fn dynamic_router(self, expression: RouterExpression) -> Self {
533 self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
534 }
535
536 pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
540 self.steps.push(BuilderStep::DynamicRouter { config });
541 self
542 }
543
544 pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
545 self.routing_slip_with_config(RoutingSlipConfig::new(expression))
546 }
547
548 pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
549 self.steps.push(BuilderStep::RoutingSlip { config });
550 self
551 }
552
553 pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
554 self.recipient_list_with_config(RecipientListConfig::new(expression))
555 }
556
557 pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
558 self.steps.push(BuilderStep::RecipientList { config });
559 self
560 }
561
562 pub fn build(self) -> Result<RouteDefinition, CamelError> {
564 if self.from_uri.is_empty() {
565 return Err(CamelError::RouteError(
566 "route must have a 'from' URI".to_string(),
567 ));
568 }
569 let route_id = self.route_id.ok_or_else(|| {
570 CamelError::RouteError(
571 "route must have a 'route_id' — call .route_id(\"name\") on the builder"
572 .to_string(),
573 )
574 })?;
575 let resolved_error_handler = match self.error_handler_mode {
576 ErrorHandlerMode::None => self.error_handler,
577 ErrorHandlerMode::ExplicitConfig => self.error_handler,
578 ErrorHandlerMode::Mixed => {
579 return Err(CamelError::RouteError(
580 "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
581 ));
582 }
583 ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
584 let mut config = if let Some(uri) = dlc_uri {
585 ErrorHandlerConfig::dead_letter_channel(uri)
586 } else {
587 ErrorHandlerConfig::log_only()
588 };
589
590 for spec in specs {
591 let matcher = spec.matches.clone();
592 let mut builder = config.on_exception(move |e| matcher(e));
593
594 if let Some(retry) = spec.retry {
595 builder = builder.retry(retry.max_attempts).with_backoff(
596 retry.initial_delay,
597 retry.multiplier,
598 retry.max_delay,
599 );
600 if retry.jitter_factor > 0.0 {
601 builder = builder.with_jitter(retry.jitter_factor);
602 }
603 }
604
605 if let Some(uri) = spec.handled_by {
606 builder = builder.handled_by(uri);
607 }
608
609 config = builder.build();
610 }
611
612 Some(config)
613 }
614 };
615
616 let definition = RouteDefinition::new(self.from_uri, self.steps);
617 let definition = if let Some(eh) = resolved_error_handler {
618 definition.with_error_handler(eh)
619 } else {
620 definition
621 };
622 let definition = if let Some(cb) = self.circuit_breaker_config {
623 definition.with_circuit_breaker(cb)
624 } else {
625 definition
626 };
627 let definition = if let Some(concurrency) = self.concurrency {
628 definition.with_concurrency(concurrency)
629 } else {
630 definition
631 };
632 let definition = definition.with_route_id(route_id);
633 let definition = if let Some(auto) = self.auto_startup {
634 definition.with_auto_startup(auto)
635 } else {
636 definition
637 };
638 let definition = if let Some(order) = self.startup_order {
639 definition.with_startup_order(order)
640 } else {
641 definition
642 };
643 Ok(definition)
644 }
645
646 pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
648 if self.from_uri.is_empty() {
649 return Err(CamelError::RouteError(
650 "route must have a 'from' URI".to_string(),
651 ));
652 }
653 let route_id = self.route_id.ok_or_else(|| {
654 CamelError::RouteError(
655 "route must have a 'route_id' — call .route_id(\"name\") on the builder"
656 .to_string(),
657 )
658 })?;
659
660 let steps = canonicalize_steps(self.steps)?;
661 let circuit_breaker = self
662 .circuit_breaker_config
663 .map(canonicalize_circuit_breaker);
664
665 let spec = CanonicalRouteSpec {
666 route_id,
667 from: self.from_uri,
668 steps,
669 circuit_breaker,
670 version: camel_api::CANONICAL_CONTRACT_VERSION,
671 };
672 spec.validate_contract()?;
673 Ok(spec)
674 }
675}
676
677pub struct OnExceptionBuilder {
678 parent: RouteBuilder,
679 policy: OnExceptionSpec,
680}
681
682impl OnExceptionBuilder {
683 pub fn retry(mut self, max_attempts: u32) -> Self {
684 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
685 self
686 }
687
688 pub fn with_backoff(
689 mut self,
690 initial: std::time::Duration,
691 multiplier: f64,
692 max: std::time::Duration,
693 ) -> Self {
694 if let Some(ref mut retry) = self.policy.retry {
695 retry.initial_delay = initial;
696 retry.multiplier = multiplier;
697 retry.max_delay = max;
698 }
699 self
700 }
701
702 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
703 if let Some(ref mut retry) = self.policy.retry {
704 retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
705 }
706 self
707 }
708
709 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
710 self.policy.handled_by = Some(uri.into());
711 self
712 }
713
714 pub fn end_on_exception(mut self) -> RouteBuilder {
715 if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
716 specs.push(self.policy);
717 }
718 self.parent
719 }
720}
721
722fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
723 let mut canonical = Vec::with_capacity(steps.len());
724 for step in steps {
725 canonical.push(canonicalize_step(step)?);
726 }
727 Ok(canonical)
728}
729
730fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
731 match step {
732 BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
733 BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
734 BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
735 BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
736 BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
737 delay_ms: config.delay_ms,
738 dynamic_header: config.dynamic_header,
739 }),
740 BuilderStep::DeclarativeScript { expression } => {
741 Ok(CanonicalStepSpec::Script { expression })
742 }
743 BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
744 predicate,
745 steps: canonicalize_steps(steps)?,
746 }),
747 BuilderStep::DeclarativeChoice { whens, otherwise } => {
748 let mut canonical_whens = Vec::with_capacity(whens.len());
749 for DeclarativeWhenStep { predicate, steps } in whens {
750 canonical_whens.push(CanonicalWhenSpec {
751 predicate,
752 steps: canonicalize_steps(steps)?,
753 });
754 }
755 let otherwise = match otherwise {
756 Some(steps) => Some(canonicalize_steps(steps)?),
757 None => None,
758 };
759 Ok(CanonicalStepSpec::Choice {
760 whens: canonical_whens,
761 otherwise,
762 })
763 }
764 BuilderStep::DeclarativeSplit {
765 expression,
766 aggregation,
767 parallel,
768 parallel_limit,
769 stop_on_exception,
770 steps,
771 } => Ok(CanonicalStepSpec::Split {
772 expression: CanonicalSplitExpressionSpec::Language(expression),
773 aggregation: canonicalize_split_aggregation(aggregation)?,
774 parallel,
775 parallel_limit,
776 stop_on_exception,
777 steps: canonicalize_steps(steps)?,
778 }),
779 BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate {
780 config: canonicalize_aggregate(config)?,
781 }),
782 other => {
783 let step_name = canonical_step_name(&other);
784 let detail = camel_api::canonical_contract_rejection_reason(step_name)
785 .unwrap_or("not included in canonical v1");
786 Err(CamelError::RouteError(format!(
787 "canonical v1 does not support step `{step_name}`: {detail}"
788 )))
789 }
790 }
791}
792
793fn canonicalize_split_aggregation(
794 strategy: camel_api::splitter::AggregationStrategy,
795) -> Result<CanonicalSplitAggregationSpec, CamelError> {
796 match strategy {
797 camel_api::splitter::AggregationStrategy::LastWins => {
798 Ok(CanonicalSplitAggregationSpec::LastWins)
799 }
800 camel_api::splitter::AggregationStrategy::CollectAll => {
801 Ok(CanonicalSplitAggregationSpec::CollectAll)
802 }
803 camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
804 "canonical v1 does not support custom split aggregation".to_string(),
805 )),
806 camel_api::splitter::AggregationStrategy::Original => {
807 Ok(CanonicalSplitAggregationSpec::Original)
808 }
809 }
810}
811
812fn extract_completion_fields(
813 mode: &CompletionMode,
814) -> Result<(Option<usize>, Option<u64>), CamelError> {
815 match mode {
816 CompletionMode::Single(cond) => match cond {
817 CompletionCondition::Size(n) => Ok((Some(*n), None)),
818 CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
819 CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
820 "canonical v1 does not support aggregate predicate completion".to_string(),
821 )),
822 },
823 CompletionMode::Any(conds) => {
824 let mut size = None;
825 let mut timeout_ms = None;
826 for cond in conds {
827 match cond {
828 CompletionCondition::Size(n) => size = Some(*n),
829 CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
830 CompletionCondition::Predicate(_) => {
831 return Err(CamelError::RouteError(
832 "canonical v1 does not support aggregate predicate completion"
833 .to_string(),
834 ));
835 }
836 }
837 }
838 Ok((size, timeout_ms))
839 }
840 }
841}
842
843fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
844 let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
845
846 let header = match &config.correlation {
847 CorrelationStrategy::HeaderName(h) => h.clone(),
848 CorrelationStrategy::Expression { expr, .. } => expr.clone(),
849 CorrelationStrategy::Fn(_) => {
850 return Err(CamelError::RouteError(
851 "canonical v1 does not support Fn correlation strategy".to_string(),
852 ));
853 }
854 };
855
856 let correlation_key = match &config.correlation {
857 CorrelationStrategy::HeaderName(_) => None,
858 CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
859 CorrelationStrategy::Fn(_) => unreachable!(),
860 };
861
862 let strategy = match config.strategy {
863 AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
864 AggregationStrategy::Custom(_) => {
865 return Err(CamelError::RouteError(
866 "canonical v1 does not support custom aggregate strategy".to_string(),
867 ));
868 }
869 };
870 let bucket_ttl_ms = config
871 .bucket_ttl
872 .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
873
874 Ok(CanonicalAggregateSpec {
875 header,
876 completion_size,
877 completion_timeout_ms,
878 correlation_key,
879 force_completion_on_stop: if config.force_completion_on_stop {
880 Some(true)
881 } else {
882 None
883 },
884 discard_on_timeout: if config.discard_on_timeout {
885 Some(true)
886 } else {
887 None
888 },
889 strategy,
890 max_buckets: config.max_buckets,
891 bucket_ttl_ms,
892 })
893}
894
895fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
896 CanonicalCircuitBreakerSpec {
897 failure_threshold: config.failure_threshold,
898 open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
899 }
900}
901
902fn canonical_step_name(step: &BuilderStep) -> &'static str {
903 match step {
904 BuilderStep::Processor(_) => "processor",
905 BuilderStep::To(_) => "to",
906 BuilderStep::Stop => "stop",
907 BuilderStep::Log { .. } => "log",
908 BuilderStep::DeclarativeSetHeader { .. } => "set_header",
909 BuilderStep::DeclarativeSetBody { .. } => "set_body",
910 BuilderStep::DeclarativeFilter { .. } => "filter",
911 BuilderStep::DeclarativeChoice { .. } => "choice",
912 BuilderStep::DeclarativeScript { .. } => "script",
913 BuilderStep::DeclarativeSplit { .. } => "split",
914 BuilderStep::Split { .. } => "split",
915 BuilderStep::Aggregate { .. } => "aggregate",
916 BuilderStep::Filter { .. } => "filter",
917 BuilderStep::Choice { .. } => "choice",
918 BuilderStep::WireTap { .. } => "wire_tap",
919 BuilderStep::Delay { .. } => "delay",
920 BuilderStep::Multicast { .. } => "multicast",
921 BuilderStep::DeclarativeLog { .. } => "log",
922 BuilderStep::Bean { .. } => "bean",
923 BuilderStep::Script { .. } => "script",
924 BuilderStep::Throttle { .. } => "throttle",
925 BuilderStep::LoadBalance { .. } => "load_balancer",
926 BuilderStep::DynamicRouter { .. } => "dynamic_router",
927 BuilderStep::RoutingSlip { .. } => "routing_slip",
928 BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
929 BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
930 BuilderStep::RecipientList { .. } => "recipient_list",
931 BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
932 }
933}
934
935impl StepAccumulator for RouteBuilder {
936 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
937 &mut self.steps
938 }
939}
940
941pub struct SplitBuilder {
949 parent: RouteBuilder,
950 config: SplitterConfig,
951 steps: Vec<BuilderStep>,
952}
953
954impl SplitBuilder {
955 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
957 where
958 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
959 {
960 FilterInSplitBuilder {
961 parent: self,
962 predicate: std::sync::Arc::new(predicate),
963 steps: vec![],
964 }
965 }
966
967 pub fn end_split(mut self) -> RouteBuilder {
970 let split_step = BuilderStep::Split {
971 config: self.config,
972 steps: self.steps,
973 };
974 self.parent.steps.push(split_step);
975 self.parent
976 }
977}
978
979impl StepAccumulator for SplitBuilder {
980 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
981 &mut self.steps
982 }
983}
984
985pub struct FilterBuilder {
987 parent: RouteBuilder,
988 predicate: FilterPredicate,
989 steps: Vec<BuilderStep>,
990}
991
992impl FilterBuilder {
993 pub fn end_filter(mut self) -> RouteBuilder {
996 let step = BuilderStep::Filter {
997 predicate: self.predicate,
998 steps: self.steps,
999 };
1000 self.parent.steps.push(step);
1001 self.parent
1002 }
1003}
1004
1005impl StepAccumulator for FilterBuilder {
1006 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1007 &mut self.steps
1008 }
1009}
1010
1011pub struct FilterInSplitBuilder {
1013 parent: SplitBuilder,
1014 predicate: FilterPredicate,
1015 steps: Vec<BuilderStep>,
1016}
1017
1018impl FilterInSplitBuilder {
1019 pub fn end_filter(mut self) -> SplitBuilder {
1021 let step = BuilderStep::Filter {
1022 predicate: self.predicate,
1023 steps: self.steps,
1024 };
1025 self.parent.steps.push(step);
1026 self.parent
1027 }
1028}
1029
1030impl StepAccumulator for FilterInSplitBuilder {
1031 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1032 &mut self.steps
1033 }
1034}
1035
1036pub struct ChoiceBuilder {
1043 parent: RouteBuilder,
1044 whens: Vec<WhenStep>,
1045 _otherwise: Option<Vec<BuilderStep>>,
1046}
1047
1048impl ChoiceBuilder {
1049 pub fn when<F>(self, predicate: F) -> WhenBuilder
1052 where
1053 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1054 {
1055 WhenBuilder {
1056 parent: self,
1057 predicate: std::sync::Arc::new(predicate),
1058 steps: vec![],
1059 }
1060 }
1061
1062 pub fn otherwise(self) -> OtherwiseBuilder {
1066 OtherwiseBuilder {
1067 parent: self,
1068 steps: vec![],
1069 }
1070 }
1071
1072 pub fn end_choice(mut self) -> RouteBuilder {
1076 let step = BuilderStep::Choice {
1077 whens: self.whens,
1078 otherwise: self._otherwise,
1079 };
1080 self.parent.steps.push(step);
1081 self.parent
1082 }
1083}
1084
1085pub struct WhenBuilder {
1087 parent: ChoiceBuilder,
1088 predicate: camel_api::FilterPredicate,
1089 steps: Vec<BuilderStep>,
1090}
1091
1092impl WhenBuilder {
1093 pub fn end_when(mut self) -> ChoiceBuilder {
1096 self.parent.whens.push(WhenStep {
1097 predicate: self.predicate,
1098 steps: self.steps,
1099 });
1100 self.parent
1101 }
1102}
1103
1104impl StepAccumulator for WhenBuilder {
1105 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1106 &mut self.steps
1107 }
1108}
1109
1110pub struct OtherwiseBuilder {
1112 parent: ChoiceBuilder,
1113 steps: Vec<BuilderStep>,
1114}
1115
1116impl OtherwiseBuilder {
1117 pub fn end_otherwise(self) -> ChoiceBuilder {
1119 let OtherwiseBuilder { mut parent, steps } = self;
1120 parent._otherwise = Some(steps);
1121 parent
1122 }
1123}
1124
1125impl StepAccumulator for OtherwiseBuilder {
1126 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1127 &mut self.steps
1128 }
1129}
1130
1131pub struct MulticastBuilder {
1139 parent: RouteBuilder,
1140 steps: Vec<BuilderStep>,
1141 config: MulticastConfig,
1142}
1143
1144impl MulticastBuilder {
1145 pub fn parallel(mut self, parallel: bool) -> Self {
1146 self.config = self.config.parallel(parallel);
1147 self
1148 }
1149
1150 pub fn parallel_limit(mut self, limit: usize) -> Self {
1151 self.config = self.config.parallel_limit(limit);
1152 self
1153 }
1154
1155 pub fn stop_on_exception(mut self, stop: bool) -> Self {
1156 self.config = self.config.stop_on_exception(stop);
1157 self
1158 }
1159
1160 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1161 self.config = self.config.timeout(duration);
1162 self
1163 }
1164
1165 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1166 self.config = self.config.aggregation(strategy);
1167 self
1168 }
1169
1170 pub fn end_multicast(mut self) -> RouteBuilder {
1171 let step = BuilderStep::Multicast {
1172 steps: self.steps,
1173 config: self.config,
1174 };
1175 self.parent.steps.push(step);
1176 self.parent
1177 }
1178}
1179
1180impl StepAccumulator for MulticastBuilder {
1181 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1182 &mut self.steps
1183 }
1184}
1185
1186pub struct ThrottleBuilder {
1194 parent: RouteBuilder,
1195 config: ThrottlerConfig,
1196 steps: Vec<BuilderStep>,
1197}
1198
1199impl ThrottleBuilder {
1200 pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1206 self.config = self.config.strategy(strategy);
1207 self
1208 }
1209
1210 pub fn end_throttle(mut self) -> RouteBuilder {
1213 let step = BuilderStep::Throttle {
1214 config: self.config,
1215 steps: self.steps,
1216 };
1217 self.parent.steps.push(step);
1218 self.parent
1219 }
1220}
1221
1222impl StepAccumulator for ThrottleBuilder {
1223 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1224 &mut self.steps
1225 }
1226}
1227
1228pub struct LoadBalancerBuilder {
1236 parent: RouteBuilder,
1237 config: LoadBalancerConfig,
1238 steps: Vec<BuilderStep>,
1239}
1240
1241impl LoadBalancerBuilder {
1242 pub fn round_robin(mut self) -> Self {
1244 self.config = LoadBalancerConfig::round_robin();
1245 self
1246 }
1247
1248 pub fn random(mut self) -> Self {
1250 self.config = LoadBalancerConfig::random();
1251 self
1252 }
1253
1254 pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1259 self.config = LoadBalancerConfig::weighted(weights);
1260 self
1261 }
1262
1263 pub fn failover(mut self) -> Self {
1268 self.config = LoadBalancerConfig::failover();
1269 self
1270 }
1271
1272 pub fn parallel(mut self, parallel: bool) -> Self {
1277 self.config = self.config.parallel(parallel);
1278 self
1279 }
1280
1281 pub fn end_load_balance(mut self) -> RouteBuilder {
1284 let step = BuilderStep::LoadBalance {
1285 config: self.config,
1286 steps: self.steps,
1287 };
1288 self.parent.steps.push(step);
1289 self.parent
1290 }
1291}
1292
1293impl StepAccumulator for LoadBalancerBuilder {
1294 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1295 &mut self.steps
1296 }
1297}
1298
1299#[cfg(test)]
1304mod tests {
1305 use super::*;
1306 use camel_api::error_handler::ErrorHandlerConfig;
1307 use camel_api::load_balancer::LoadBalanceStrategy;
1308 use camel_api::{Exchange, Message};
1309 use camel_core::route::BuilderStep;
1310 use std::sync::Arc;
1311 use std::time::Duration;
1312 use tower::{Service, ServiceExt};
1313
1314 #[test]
1315 fn test_builder_from_creates_definition() {
1316 let definition = RouteBuilder::from("timer:tick")
1317 .route_id("test-route")
1318 .build()
1319 .unwrap();
1320 assert_eq!(definition.from_uri(), "timer:tick");
1321 }
1322
1323 #[test]
1324 fn test_builder_empty_from_uri_errors() {
1325 let result = RouteBuilder::from("").route_id("test-route").build();
1326 assert!(result.is_err());
1327 }
1328
1329 #[test]
1330 fn test_builder_to_adds_step() {
1331 let definition = RouteBuilder::from("timer:tick")
1332 .route_id("test-route")
1333 .to("log:info")
1334 .build()
1335 .unwrap();
1336
1337 assert_eq!(definition.from_uri(), "timer:tick");
1338 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1340 }
1341
1342 #[test]
1343 fn test_builder_filter_adds_filter_step() {
1344 let definition = RouteBuilder::from("timer:tick")
1345 .route_id("test-route")
1346 .filter(|_ex| true)
1347 .to("mock:result")
1348 .end_filter()
1349 .build()
1350 .unwrap();
1351
1352 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1353 }
1354
1355 #[test]
1356 fn test_builder_set_header_adds_processor_step() {
1357 let definition = RouteBuilder::from("timer:tick")
1358 .route_id("test-route")
1359 .set_header("key", Value::String("value".into()))
1360 .build()
1361 .unwrap();
1362
1363 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1364 }
1365
1366 #[test]
1367 fn test_builder_map_body_adds_processor_step() {
1368 let definition = RouteBuilder::from("timer:tick")
1369 .route_id("test-route")
1370 .map_body(|body| body)
1371 .build()
1372 .unwrap();
1373
1374 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1375 }
1376
1377 #[test]
1378 fn test_builder_process_adds_processor_step() {
1379 let definition = RouteBuilder::from("timer:tick")
1380 .route_id("test-route")
1381 .process(|ex| async move { Ok(ex) })
1382 .build()
1383 .unwrap();
1384
1385 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1386 }
1387
1388 #[test]
1389 fn test_builder_chain_multiple_steps() {
1390 let definition = RouteBuilder::from("timer:tick")
1391 .route_id("test-route")
1392 .set_header("source", Value::String("timer".into()))
1393 .filter(|ex| ex.input.header("source").is_some())
1394 .to("log:info")
1395 .end_filter()
1396 .to("mock:result")
1397 .build()
1398 .unwrap();
1399
1400 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1404 }
1405
1406 #[tokio::test]
1411 async fn test_set_header_processor_works() {
1412 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1413 let exchange = Exchange::new(Message::new("test"));
1414 let result = svc.call(exchange).await.unwrap();
1415 assert_eq!(
1416 result.input.header("greeting"),
1417 Some(&Value::String("hello".into()))
1418 );
1419 }
1420
1421 #[tokio::test]
1422 async fn test_filter_processor_passes() {
1423 use camel_api::BoxProcessorExt;
1424 use camel_processor::FilterService;
1425
1426 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1427 let mut svc =
1428 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1429 let exchange = Exchange::new(Message::new("pass"));
1430 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1431 assert_eq!(result.input.body.as_text(), Some("pass"));
1432 }
1433
1434 #[tokio::test]
1435 async fn test_filter_processor_blocks() {
1436 use camel_api::BoxProcessorExt;
1437 use camel_processor::FilterService;
1438
1439 let sub = BoxProcessor::from_fn(|_ex| {
1440 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1441 });
1442 let mut svc =
1443 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1444 let exchange = Exchange::new(Message::new("reject"));
1445 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1446 assert_eq!(result.input.body.as_text(), Some("reject"));
1447 }
1448
1449 #[tokio::test]
1450 async fn test_map_body_processor_works() {
1451 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1452 if let Some(text) = body.as_text() {
1453 Body::Text(text.to_uppercase())
1454 } else {
1455 body
1456 }
1457 });
1458 let exchange = Exchange::new(Message::new("hello"));
1459 let result = mapper.oneshot(exchange).await.unwrap();
1460 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1461 }
1462
1463 #[tokio::test]
1464 async fn test_process_custom_processor_works() {
1465 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1466 ex.set_property("custom", Value::Bool(true));
1467 Ok(ex)
1468 });
1469 let exchange = Exchange::new(Message::default());
1470 let result = processor.oneshot(exchange).await.unwrap();
1471 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1472 }
1473
1474 #[tokio::test]
1479 async fn test_compose_pipeline_runs_steps_in_order() {
1480 use camel_core::route::compose_pipeline;
1481
1482 let processors = vec![
1483 BoxProcessor::new(SetHeader::new(
1484 IdentityProcessor,
1485 "step",
1486 Value::String("one".into()),
1487 )),
1488 BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1489 if let Some(text) = body.as_text() {
1490 Body::Text(format!("{}-processed", text))
1491 } else {
1492 body
1493 }
1494 })),
1495 ];
1496
1497 let pipeline = compose_pipeline(processors);
1498 let exchange = Exchange::new(Message::new("hello"));
1499 let result = pipeline.oneshot(exchange).await.unwrap();
1500
1501 assert_eq!(
1502 result.input.header("step"),
1503 Some(&Value::String("one".into()))
1504 );
1505 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1506 }
1507
1508 #[tokio::test]
1509 async fn test_compose_pipeline_empty_is_identity() {
1510 use camel_core::route::compose_pipeline;
1511
1512 let pipeline = compose_pipeline(vec![]);
1513 let exchange = Exchange::new(Message::new("unchanged"));
1514 let result = pipeline.oneshot(exchange).await.unwrap();
1515 assert_eq!(result.input.body.as_text(), Some("unchanged"));
1516 }
1517
1518 #[test]
1523 fn test_builder_circuit_breaker_sets_config() {
1524 use camel_api::circuit_breaker::CircuitBreakerConfig;
1525
1526 let config = CircuitBreakerConfig::new().failure_threshold(5);
1527 let definition = RouteBuilder::from("timer:tick")
1528 .route_id("test-route")
1529 .circuit_breaker(config)
1530 .build()
1531 .unwrap();
1532
1533 let cb = definition
1534 .circuit_breaker_config()
1535 .expect("circuit breaker should be set");
1536 assert_eq!(cb.failure_threshold, 5);
1537 }
1538
1539 #[test]
1540 fn test_builder_circuit_breaker_with_error_handler() {
1541 use camel_api::circuit_breaker::CircuitBreakerConfig;
1542 use camel_api::error_handler::ErrorHandlerConfig;
1543
1544 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1545 let eh_config = ErrorHandlerConfig::log_only();
1546
1547 let definition = RouteBuilder::from("timer:tick")
1548 .route_id("test-route")
1549 .to("log:info")
1550 .circuit_breaker(cb_config)
1551 .error_handler(eh_config)
1552 .build()
1553 .unwrap();
1554
1555 assert!(
1556 definition.circuit_breaker_config().is_some(),
1557 "circuit breaker config should be set"
1558 );
1559 }
1561
1562 #[test]
1563 fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1564 let definition = RouteBuilder::from("direct:start")
1565 .route_id("test-route")
1566 .dead_letter_channel("log:dlc")
1567 .on_exception(|e| matches!(e, CamelError::Io(_)))
1568 .retry(3)
1569 .handled_by("log:io")
1570 .end_on_exception()
1571 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1572 .retry(1)
1573 .end_on_exception()
1574 .to("mock:out")
1575 .build()
1576 .expect("route should build");
1577
1578 let cfg = definition
1579 .error_handler_config()
1580 .expect("error handler should be set");
1581 assert_eq!(cfg.policies.len(), 2);
1582 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1583 assert_eq!(
1584 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1585 Some(3)
1586 );
1587 assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1588 assert_eq!(
1589 cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1590 Some(1)
1591 );
1592 }
1593
1594 #[test]
1595 fn test_builder_on_exception_mixed_mode_rejected() {
1596 let result = RouteBuilder::from("direct:start")
1597 .route_id("test-route")
1598 .error_handler(ErrorHandlerConfig::log_only())
1599 .on_exception(|_e| true)
1600 .end_on_exception()
1601 .to("mock:out")
1602 .build();
1603
1604 let err = result.err().expect("mixed mode should fail with an error");
1605
1606 assert!(
1607 format!("{err}").contains("mixed error handler modes"),
1608 "unexpected error: {err}"
1609 );
1610 }
1611
1612 #[test]
1613 fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1614 let definition = RouteBuilder::from("direct:start")
1615 .route_id("test-route")
1616 .on_exception(|_e| true)
1617 .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1618 .with_jitter(0.5)
1619 .end_on_exception()
1620 .to("mock:out")
1621 .build()
1622 .expect("route should build");
1623
1624 let cfg = definition
1625 .error_handler_config()
1626 .expect("error handler should be set");
1627 assert_eq!(cfg.policies.len(), 1);
1628 assert!(cfg.policies[0].retry.is_none());
1629 }
1630
1631 #[test]
1632 fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1633 let definition = RouteBuilder::from("direct:start")
1634 .route_id("test-route")
1635 .dead_letter_channel("log:dlc")
1636 .to("mock:out")
1637 .build()
1638 .expect("route should build");
1639
1640 let cfg = definition
1641 .error_handler_config()
1642 .expect("error handler should be set");
1643 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1644 assert!(cfg.policies.is_empty());
1645 }
1646
1647 #[test]
1648 fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1649 let definition = RouteBuilder::from("direct:start")
1650 .route_id("test-route")
1651 .dead_letter_channel("log:first")
1652 .on_exception(|e| matches!(e, CamelError::Io(_)))
1653 .retry(2)
1654 .end_on_exception()
1655 .dead_letter_channel("log:second")
1656 .to("mock:out")
1657 .build()
1658 .expect("route should build");
1659
1660 let cfg = definition
1661 .error_handler_config()
1662 .expect("error handler should be set");
1663 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1664 assert_eq!(cfg.policies.len(), 1);
1665 assert_eq!(
1666 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1667 Some(2)
1668 );
1669 }
1670
1671 #[test]
1672 fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
1673 let definition = RouteBuilder::from("direct:start")
1674 .route_id("test-route")
1675 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1676 .retry(1)
1677 .end_on_exception()
1678 .to("mock:out")
1679 .build()
1680 .expect("route should build");
1681
1682 let cfg = definition
1683 .error_handler_config()
1684 .expect("error handler should be set");
1685 assert!(cfg.dlc_uri.is_none());
1686 assert_eq!(cfg.policies.len(), 1);
1687 }
1688
1689 #[test]
1690 fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
1691 let first = ErrorHandlerConfig::dead_letter_channel("log:first");
1692 let second = ErrorHandlerConfig::dead_letter_channel("log:second");
1693
1694 let definition = RouteBuilder::from("direct:start")
1695 .route_id("test-route")
1696 .error_handler(first)
1697 .error_handler(second)
1698 .to("mock:out")
1699 .build()
1700 .expect("route should build");
1701
1702 let cfg = definition
1703 .error_handler_config()
1704 .expect("error handler should be set");
1705 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1706 }
1707
1708 #[test]
1711 fn test_split_builder_typestate() {
1712 use camel_api::splitter::{SplitterConfig, split_body_lines};
1713
1714 let definition = RouteBuilder::from("timer:test?period=1000")
1716 .route_id("test-route")
1717 .split(SplitterConfig::new(split_body_lines()))
1718 .to("mock:per-fragment")
1719 .end_split()
1720 .to("mock:final")
1721 .build()
1722 .unwrap();
1723
1724 assert_eq!(definition.steps().len(), 2);
1726 }
1727
1728 #[test]
1729 fn test_split_builder_steps_collected() {
1730 use camel_api::splitter::{SplitterConfig, split_body_lines};
1731
1732 let definition = RouteBuilder::from("timer:test?period=1000")
1733 .route_id("test-route")
1734 .split(SplitterConfig::new(split_body_lines()))
1735 .set_header("fragment", Value::String("yes".into()))
1736 .to("mock:per-fragment")
1737 .end_split()
1738 .build()
1739 .unwrap();
1740
1741 assert_eq!(definition.steps().len(), 1);
1743 match &definition.steps()[0] {
1744 BuilderStep::Split { steps, .. } => {
1745 assert_eq!(steps.len(), 2); }
1747 other => panic!("Expected Split, got {:?}", other),
1748 }
1749 }
1750
1751 #[test]
1752 fn test_split_builder_config_propagated() {
1753 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1754
1755 let definition = RouteBuilder::from("timer:test?period=1000")
1756 .route_id("test-route")
1757 .split(
1758 SplitterConfig::new(split_body_lines())
1759 .parallel(true)
1760 .parallel_limit(4)
1761 .aggregation(AggregationStrategy::CollectAll),
1762 )
1763 .to("mock:per-fragment")
1764 .end_split()
1765 .build()
1766 .unwrap();
1767
1768 match &definition.steps()[0] {
1769 BuilderStep::Split { config, .. } => {
1770 assert!(config.parallel);
1771 assert_eq!(config.parallel_limit, Some(4));
1772 assert!(matches!(
1773 config.aggregation,
1774 AggregationStrategy::CollectAll
1775 ));
1776 }
1777 other => panic!("Expected Split, got {:?}", other),
1778 }
1779 }
1780
1781 #[test]
1782 fn test_aggregate_builder_adds_step() {
1783 use camel_api::aggregator::AggregatorConfig;
1784 use camel_core::route::BuilderStep;
1785
1786 let definition = RouteBuilder::from("timer:tick")
1787 .route_id("test-route")
1788 .aggregate(
1789 AggregatorConfig::correlate_by("key")
1790 .complete_when_size(2)
1791 .build(),
1792 )
1793 .build()
1794 .unwrap();
1795
1796 assert_eq!(definition.steps().len(), 1);
1797 assert!(matches!(
1798 definition.steps()[0],
1799 BuilderStep::Aggregate { .. }
1800 ));
1801 }
1802
1803 #[test]
1804 fn test_aggregate_in_split_builder() {
1805 use camel_api::aggregator::AggregatorConfig;
1806 use camel_api::splitter::{SplitterConfig, split_body_lines};
1807 use camel_core::route::BuilderStep;
1808
1809 let definition = RouteBuilder::from("timer:tick")
1810 .route_id("test-route")
1811 .split(SplitterConfig::new(split_body_lines()))
1812 .aggregate(
1813 AggregatorConfig::correlate_by("key")
1814 .complete_when_size(1)
1815 .build(),
1816 )
1817 .end_split()
1818 .build()
1819 .unwrap();
1820
1821 assert_eq!(definition.steps().len(), 1);
1822 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
1823 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
1824 } else {
1825 panic!("expected Split step");
1826 }
1827 }
1828
1829 #[test]
1832 fn test_builder_set_body_static_adds_processor() {
1833 let definition = RouteBuilder::from("timer:tick")
1834 .route_id("test-route")
1835 .set_body("fixed")
1836 .build()
1837 .unwrap();
1838 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1839 }
1840
1841 #[test]
1842 fn test_builder_set_body_fn_adds_processor() {
1843 let definition = RouteBuilder::from("timer:tick")
1844 .route_id("test-route")
1845 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
1846 .build()
1847 .unwrap();
1848 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1849 }
1850
1851 #[test]
1852 fn transform_alias_produces_same_as_set_body() {
1853 let route_transform = RouteBuilder::from("timer:tick")
1854 .route_id("test-route")
1855 .transform("hello")
1856 .build()
1857 .unwrap();
1858
1859 let route_set_body = RouteBuilder::from("timer:tick")
1860 .route_id("test-route")
1861 .set_body("hello")
1862 .build()
1863 .unwrap();
1864
1865 assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
1866 }
1867
1868 #[test]
1869 fn test_builder_set_header_fn_adds_processor() {
1870 let definition = RouteBuilder::from("timer:tick")
1871 .route_id("test-route")
1872 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
1873 .build()
1874 .unwrap();
1875 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1876 }
1877
1878 #[tokio::test]
1879 async fn test_set_body_static_processor_works() {
1880 use camel_core::route::compose_pipeline;
1881 let def = RouteBuilder::from("t:t")
1882 .route_id("test-route")
1883 .set_body("replaced")
1884 .build()
1885 .unwrap();
1886 let pipeline = compose_pipeline(
1887 def.steps()
1888 .iter()
1889 .filter_map(|s| {
1890 if let BuilderStep::Processor(p) = s {
1891 Some(p.clone())
1892 } else {
1893 None
1894 }
1895 })
1896 .collect(),
1897 );
1898 let exchange = Exchange::new(Message::new("original"));
1899 let result = pipeline.oneshot(exchange).await.unwrap();
1900 assert_eq!(result.input.body.as_text(), Some("replaced"));
1901 }
1902
1903 #[tokio::test]
1904 async fn test_set_body_fn_processor_works() {
1905 use camel_core::route::compose_pipeline;
1906 let def = RouteBuilder::from("t:t")
1907 .route_id("test-route")
1908 .set_body_fn(|ex: &Exchange| {
1909 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
1910 })
1911 .build()
1912 .unwrap();
1913 let pipeline = compose_pipeline(
1914 def.steps()
1915 .iter()
1916 .filter_map(|s| {
1917 if let BuilderStep::Processor(p) = s {
1918 Some(p.clone())
1919 } else {
1920 None
1921 }
1922 })
1923 .collect(),
1924 );
1925 let exchange = Exchange::new(Message::new("hello"));
1926 let result = pipeline.oneshot(exchange).await.unwrap();
1927 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1928 }
1929
1930 #[tokio::test]
1931 async fn test_set_header_fn_processor_works() {
1932 use camel_core::route::compose_pipeline;
1933 let def = RouteBuilder::from("t:t")
1934 .route_id("test-route")
1935 .set_header_fn("echo", |ex: &Exchange| {
1936 ex.input
1937 .body
1938 .as_text()
1939 .map(|t| Value::String(t.into()))
1940 .unwrap_or(Value::Null)
1941 })
1942 .build()
1943 .unwrap();
1944 let pipeline = compose_pipeline(
1945 def.steps()
1946 .iter()
1947 .filter_map(|s| {
1948 if let BuilderStep::Processor(p) = s {
1949 Some(p.clone())
1950 } else {
1951 None
1952 }
1953 })
1954 .collect(),
1955 );
1956 let exchange = Exchange::new(Message::new("ping"));
1957 let result = pipeline.oneshot(exchange).await.unwrap();
1958 assert_eq!(
1959 result.input.header("echo"),
1960 Some(&Value::String("ping".into()))
1961 );
1962 }
1963
1964 #[test]
1967 fn test_filter_builder_typestate() {
1968 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1969 .route_id("test-route")
1970 .filter(|_ex| true)
1971 .to("mock:inner")
1972 .end_filter()
1973 .to("mock:outer")
1974 .build();
1975 assert!(result.is_ok());
1976 }
1977
1978 #[test]
1979 fn test_filter_builder_steps_collected() {
1980 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1981 .route_id("test-route")
1982 .filter(|_ex| true)
1983 .to("mock:inner")
1984 .end_filter()
1985 .build()
1986 .unwrap();
1987
1988 assert_eq!(definition.steps().len(), 1);
1989 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1990 }
1991
1992 #[test]
1993 fn test_wire_tap_builder_adds_step() {
1994 let definition = RouteBuilder::from("timer:tick")
1995 .route_id("test-route")
1996 .wire_tap("mock:tap")
1997 .to("mock:result")
1998 .build()
1999 .unwrap();
2000
2001 assert_eq!(definition.steps().len(), 2);
2002 assert!(
2003 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2004 );
2005 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2006 }
2007
2008 #[test]
2011 fn test_multicast_builder_typestate() {
2012 let definition = RouteBuilder::from("timer:tick")
2013 .route_id("test-route")
2014 .multicast()
2015 .to("direct:a")
2016 .to("direct:b")
2017 .end_multicast()
2018 .to("mock:result")
2019 .build()
2020 .unwrap();
2021
2022 assert_eq!(definition.steps().len(), 2); }
2024
2025 #[test]
2026 fn test_multicast_builder_steps_collected() {
2027 let definition = RouteBuilder::from("timer:tick")
2028 .route_id("test-route")
2029 .multicast()
2030 .to("direct:a")
2031 .to("direct:b")
2032 .end_multicast()
2033 .build()
2034 .unwrap();
2035
2036 match &definition.steps()[0] {
2037 BuilderStep::Multicast { steps, .. } => {
2038 assert_eq!(steps.len(), 2);
2039 }
2040 other => panic!("Expected Multicast, got {:?}", other),
2041 }
2042 }
2043
2044 #[test]
2047 fn test_builder_concurrent_sets_concurrency() {
2048 use camel_component_api::ConcurrencyModel;
2049
2050 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2051 .route_id("test-route")
2052 .concurrent(16)
2053 .to("log:info")
2054 .build()
2055 .unwrap();
2056
2057 assert_eq!(
2058 definition.concurrency_override(),
2059 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2060 );
2061 }
2062
2063 #[test]
2064 fn test_builder_concurrent_zero_means_unbounded() {
2065 use camel_component_api::ConcurrencyModel;
2066
2067 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2068 .route_id("test-route")
2069 .concurrent(0)
2070 .to("log:info")
2071 .build()
2072 .unwrap();
2073
2074 assert_eq!(
2075 definition.concurrency_override(),
2076 Some(&ConcurrencyModel::Concurrent { max: None })
2077 );
2078 }
2079
2080 #[test]
2081 fn test_builder_sequential_sets_concurrency() {
2082 use camel_component_api::ConcurrencyModel;
2083
2084 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2085 .route_id("test-route")
2086 .sequential()
2087 .to("log:info")
2088 .build()
2089 .unwrap();
2090
2091 assert_eq!(
2092 definition.concurrency_override(),
2093 Some(&ConcurrencyModel::Sequential)
2094 );
2095 }
2096
2097 #[test]
2098 fn test_builder_default_concurrency_is_none() {
2099 let definition = RouteBuilder::from("timer:tick")
2100 .route_id("test-route")
2101 .to("log:info")
2102 .build()
2103 .unwrap();
2104
2105 assert_eq!(definition.concurrency_override(), None);
2106 }
2107
2108 #[test]
2111 fn test_builder_route_id_sets_id() {
2112 let definition = RouteBuilder::from("timer:tick")
2113 .route_id("my-route")
2114 .build()
2115 .unwrap();
2116
2117 assert_eq!(definition.route_id(), "my-route");
2118 }
2119
2120 #[test]
2121 fn test_build_without_route_id_fails() {
2122 let result = RouteBuilder::from("timer:tick?period=1000")
2123 .to("log:info")
2124 .build();
2125 let err = match result {
2126 Err(e) => e.to_string(),
2127 Ok(_) => panic!("build() should fail without route_id"),
2128 };
2129 assert!(
2130 err.contains("route_id"),
2131 "error should mention route_id, got: {}",
2132 err
2133 );
2134 }
2135
2136 #[test]
2137 fn test_builder_auto_startup_false() {
2138 let definition = RouteBuilder::from("timer:tick")
2139 .route_id("test-route")
2140 .auto_startup(false)
2141 .build()
2142 .unwrap();
2143
2144 assert!(!definition.auto_startup());
2145 }
2146
2147 #[test]
2148 fn test_builder_startup_order_custom() {
2149 let definition = RouteBuilder::from("timer:tick")
2150 .route_id("test-route")
2151 .startup_order(50)
2152 .build()
2153 .unwrap();
2154
2155 assert_eq!(definition.startup_order(), 50);
2156 }
2157
2158 #[test]
2159 fn test_builder_defaults() {
2160 let definition = RouteBuilder::from("timer:tick")
2161 .route_id("test-route")
2162 .build()
2163 .unwrap();
2164
2165 assert_eq!(definition.route_id(), "test-route");
2166 assert!(definition.auto_startup());
2167 assert_eq!(definition.startup_order(), 1000);
2168 }
2169
2170 #[test]
2173 fn test_choice_builder_single_when() {
2174 let definition = RouteBuilder::from("timer:tick")
2175 .route_id("test-route")
2176 .choice()
2177 .when(|ex: &Exchange| ex.input.header("type").is_some())
2178 .to("mock:typed")
2179 .end_when()
2180 .end_choice()
2181 .build()
2182 .unwrap();
2183 assert_eq!(definition.steps().len(), 1);
2184 assert!(
2185 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2186 if whens.len() == 1 && otherwise.is_none())
2187 );
2188 }
2189
2190 #[test]
2191 fn test_choice_builder_when_otherwise() {
2192 let definition = RouteBuilder::from("timer:tick")
2193 .route_id("test-route")
2194 .choice()
2195 .when(|ex: &Exchange| ex.input.header("a").is_some())
2196 .to("mock:a")
2197 .end_when()
2198 .otherwise()
2199 .to("mock:fallback")
2200 .end_otherwise()
2201 .end_choice()
2202 .build()
2203 .unwrap();
2204 assert!(
2205 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2206 if whens.len() == 1 && otherwise.is_some())
2207 );
2208 }
2209
2210 #[test]
2211 fn test_choice_builder_multiple_whens() {
2212 let definition = RouteBuilder::from("timer:tick")
2213 .route_id("test-route")
2214 .choice()
2215 .when(|ex: &Exchange| ex.input.header("a").is_some())
2216 .to("mock:a")
2217 .end_when()
2218 .when(|ex: &Exchange| ex.input.header("b").is_some())
2219 .to("mock:b")
2220 .end_when()
2221 .end_choice()
2222 .build()
2223 .unwrap();
2224 assert!(
2225 matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2226 if whens.len() == 2)
2227 );
2228 }
2229
2230 #[test]
2231 fn test_choice_step_after_choice() {
2232 let definition = RouteBuilder::from("timer:tick")
2234 .route_id("test-route")
2235 .choice()
2236 .when(|_ex: &Exchange| true)
2237 .to("mock:inner")
2238 .end_when()
2239 .end_choice()
2240 .to("mock:outer") .build()
2242 .unwrap();
2243 assert_eq!(definition.steps().len(), 2);
2244 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2245 }
2246
2247 #[test]
2250 fn test_throttle_builder_typestate() {
2251 let definition = RouteBuilder::from("timer:tick")
2252 .route_id("test-route")
2253 .throttle(10, std::time::Duration::from_secs(1))
2254 .to("mock:result")
2255 .end_throttle()
2256 .build()
2257 .unwrap();
2258
2259 assert_eq!(definition.steps().len(), 1);
2260 assert!(matches!(
2261 &definition.steps()[0],
2262 BuilderStep::Throttle { .. }
2263 ));
2264 }
2265
2266 #[test]
2267 fn test_throttle_builder_with_strategy() {
2268 let definition = RouteBuilder::from("timer:tick")
2269 .route_id("test-route")
2270 .throttle(10, std::time::Duration::from_secs(1))
2271 .strategy(ThrottleStrategy::Reject)
2272 .to("mock:result")
2273 .end_throttle()
2274 .build()
2275 .unwrap();
2276
2277 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2278 assert_eq!(config.strategy, ThrottleStrategy::Reject);
2279 } else {
2280 panic!("Expected Throttle step");
2281 }
2282 }
2283
2284 #[test]
2285 fn test_throttle_builder_steps_collected() {
2286 let definition = RouteBuilder::from("timer:tick")
2287 .route_id("test-route")
2288 .throttle(5, std::time::Duration::from_secs(1))
2289 .set_header("throttled", Value::Bool(true))
2290 .to("mock:throttled")
2291 .end_throttle()
2292 .build()
2293 .unwrap();
2294
2295 match &definition.steps()[0] {
2296 BuilderStep::Throttle { steps, .. } => {
2297 assert_eq!(steps.len(), 2); }
2299 other => panic!("Expected Throttle, got {:?}", other),
2300 }
2301 }
2302
2303 #[test]
2304 fn test_throttle_step_after_throttle() {
2305 let definition = RouteBuilder::from("timer:tick")
2307 .route_id("test-route")
2308 .throttle(10, std::time::Duration::from_secs(1))
2309 .to("mock:inner")
2310 .end_throttle()
2311 .to("mock:outer")
2312 .build()
2313 .unwrap();
2314
2315 assert_eq!(definition.steps().len(), 2);
2316 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2317 }
2318
2319 #[test]
2322 fn test_load_balance_builder_typestate() {
2323 let definition = RouteBuilder::from("timer:tick")
2324 .route_id("test-route")
2325 .load_balance()
2326 .round_robin()
2327 .to("mock:a")
2328 .to("mock:b")
2329 .end_load_balance()
2330 .build()
2331 .unwrap();
2332
2333 assert_eq!(definition.steps().len(), 1);
2334 assert!(matches!(
2335 &definition.steps()[0],
2336 BuilderStep::LoadBalance { .. }
2337 ));
2338 }
2339
2340 #[test]
2341 fn test_load_balance_builder_with_strategy() {
2342 let definition = RouteBuilder::from("timer:tick")
2343 .route_id("test-route")
2344 .load_balance()
2345 .random()
2346 .to("mock:result")
2347 .end_load_balance()
2348 .build()
2349 .unwrap();
2350
2351 if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2352 assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2353 } else {
2354 panic!("Expected LoadBalance step");
2355 }
2356 }
2357
2358 #[test]
2359 fn test_load_balance_builder_steps_collected() {
2360 let definition = RouteBuilder::from("timer:tick")
2361 .route_id("test-route")
2362 .load_balance()
2363 .set_header("lb", Value::Bool(true))
2364 .to("mock:a")
2365 .end_load_balance()
2366 .build()
2367 .unwrap();
2368
2369 match &definition.steps()[0] {
2370 BuilderStep::LoadBalance { steps, .. } => {
2371 assert_eq!(steps.len(), 2); }
2373 other => panic!("Expected LoadBalance, got {:?}", other),
2374 }
2375 }
2376
2377 #[test]
2378 fn test_load_balance_step_after_load_balance() {
2379 let definition = RouteBuilder::from("timer:tick")
2381 .route_id("test-route")
2382 .load_balance()
2383 .to("mock:inner")
2384 .end_load_balance()
2385 .to("mock:outer")
2386 .build()
2387 .unwrap();
2388
2389 assert_eq!(definition.steps().len(), 2);
2390 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2391 }
2392
2393 #[test]
2396 fn test_dynamic_router_builder() {
2397 let definition = RouteBuilder::from("timer:tick")
2398 .route_id("test-route")
2399 .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2400 .build()
2401 .unwrap();
2402
2403 assert_eq!(definition.steps().len(), 1);
2404 assert!(matches!(
2405 &definition.steps()[0],
2406 BuilderStep::DynamicRouter { .. }
2407 ));
2408 }
2409
2410 #[test]
2411 fn test_dynamic_router_builder_with_config() {
2412 let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2413 .max_iterations(100)
2414 .cache_size(500);
2415
2416 let definition = RouteBuilder::from("timer:tick")
2417 .route_id("test-route")
2418 .dynamic_router_with_config(config)
2419 .build()
2420 .unwrap();
2421
2422 assert_eq!(definition.steps().len(), 1);
2423 if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2424 assert_eq!(config.max_iterations, 100);
2425 assert_eq!(config.cache_size, 500);
2426 } else {
2427 panic!("Expected DynamicRouter step");
2428 }
2429 }
2430
2431 #[test]
2432 fn test_dynamic_router_step_after_router() {
2433 let definition = RouteBuilder::from("timer:tick")
2435 .route_id("test-route")
2436 .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2437 .to("mock:outer")
2438 .build()
2439 .unwrap();
2440
2441 assert_eq!(definition.steps().len(), 2);
2442 assert!(matches!(
2443 &definition.steps()[0],
2444 BuilderStep::DynamicRouter { .. }
2445 ));
2446 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2447 }
2448
2449 #[test]
2450 fn routing_slip_builder_creates_step() {
2451 use camel_api::RoutingSlipExpression;
2452
2453 let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2454
2455 let route = RouteBuilder::from("direct:start")
2456 .route_id("routing-slip-test")
2457 .routing_slip(expression)
2458 .build()
2459 .unwrap();
2460
2461 assert!(
2462 matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2463 "Expected RoutingSlip step"
2464 );
2465 }
2466
2467 #[test]
2468 fn routing_slip_with_config_builder_creates_step() {
2469 use camel_api::RoutingSlipConfig;
2470
2471 let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2472 .uri_delimiter("|")
2473 .cache_size(50)
2474 .ignore_invalid_endpoints(true);
2475
2476 let route = RouteBuilder::from("direct:start")
2477 .route_id("routing-slip-config-test")
2478 .routing_slip_with_config(config)
2479 .build()
2480 .unwrap();
2481
2482 if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2483 assert_eq!(config.uri_delimiter, "|");
2484 assert_eq!(config.cache_size, 50);
2485 assert!(config.ignore_invalid_endpoints);
2486 } else {
2487 panic!("Expected RoutingSlip step");
2488 }
2489 }
2490
2491 #[test]
2492 fn test_builder_marshal_adds_processor_step() {
2493 let definition = RouteBuilder::from("timer:tick")
2494 .route_id("test-route")
2495 .marshal("json")
2496 .build()
2497 .unwrap();
2498 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2499 }
2500
2501 #[test]
2502 fn test_builder_unmarshal_adds_processor_step() {
2503 let definition = RouteBuilder::from("timer:tick")
2504 .route_id("test-route")
2505 .unmarshal("json")
2506 .build()
2507 .unwrap();
2508 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2509 }
2510
2511 #[test]
2512 fn validate_adds_to_step_with_validator_uri() {
2513 let def = RouteBuilder::from("direct:in")
2514 .route_id("test")
2515 .validate("schemas/order.xsd")
2516 .build()
2517 .unwrap();
2518 let steps = def.steps();
2519 assert_eq!(steps.len(), 1);
2520 assert!(
2521 matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2522 "got: {:?}",
2523 steps[0]
2524 );
2525 }
2526
2527 #[test]
2528 #[should_panic(expected = "unknown data format: 'csv'")]
2529 fn test_builder_marshal_panics_on_unknown_format() {
2530 let _ = RouteBuilder::from("timer:tick")
2531 .route_id("test-route")
2532 .marshal("csv")
2533 .build();
2534 }
2535
2536 #[test]
2537 #[should_panic(expected = "unknown data format: 'csv'")]
2538 fn test_builder_unmarshal_panics_on_unknown_format() {
2539 let _ = RouteBuilder::from("timer:tick")
2540 .route_id("test-route")
2541 .unmarshal("csv")
2542 .build();
2543 }
2544}