1use camel_api::aggregator::{
2 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
3};
4use camel_api::body::Body;
5use camel_api::body_converter::BodyType;
6use camel_api::circuit_breaker::CircuitBreakerConfig;
7use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
8use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
9use camel_api::load_balancer::LoadBalancerConfig;
10use camel_api::multicast::{MulticastConfig, MulticastStrategy};
11use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
12use camel_api::splitter::SplitterConfig;
13use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
14use camel_api::DelayConfig;
15use camel_api::{
16 BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
17 ProcessorFn, Value,
18 runtime::{
19 CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
20 CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
21 CanonicalWhenSpec,
22 },
23};
24use camel_component_api::ConcurrencyModel;
25use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
26use camel_processor::{
27 ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
28 UnmarshalService, builtin_data_format,
29};
30
31pub trait StepAccumulator: Sized {
37 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
38
39 fn to(mut self, endpoint: impl Into<String>) -> Self {
40 self.steps_mut().push(BuilderStep::To(endpoint.into()));
41 self
42 }
43
44 fn process<F, Fut>(mut self, f: F) -> Self
45 where
46 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
47 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
48 {
49 let svc = ProcessorFn::new(f);
50 self.steps_mut()
51 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
52 self
53 }
54
55 fn process_fn(mut self, processor: BoxProcessor) -> Self {
56 self.steps_mut().push(BuilderStep::Processor(processor));
57 self
58 }
59
60 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
61 let svc = SetHeader::new(IdentityProcessor, key, value);
62 self.steps_mut()
63 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
64 self
65 }
66
67 fn map_body<F>(mut self, mapper: F) -> Self
68 where
69 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
70 {
71 let svc = MapBody::new(IdentityProcessor, mapper);
72 self.steps_mut()
73 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
74 self
75 }
76
77 fn set_body<B>(mut self, body: B) -> Self
78 where
79 B: Into<Body> + Clone + Send + Sync + 'static,
80 {
81 let body: Body = body.into();
82 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
83 self.steps_mut()
84 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
85 self
86 }
87
88 fn transform<B>(self, body: B) -> Self
93 where
94 B: Into<Body> + Clone + Send + Sync + 'static,
95 {
96 self.set_body(body)
97 }
98
99 fn set_body_fn<F>(mut self, expr: F) -> Self
100 where
101 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
102 {
103 let svc = SetBody::new(IdentityProcessor, expr);
104 self.steps_mut()
105 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
106 self
107 }
108
109 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
110 where
111 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
112 {
113 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
114 self.steps_mut()
115 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
116 self
117 }
118
119 fn aggregate(mut self, config: AggregatorConfig) -> Self {
120 self.steps_mut().push(BuilderStep::Aggregate { config });
121 self
122 }
123
124 fn stop(mut self) -> Self {
130 self.steps_mut().push(BuilderStep::Stop);
131 self
132 }
133
134 fn delay(mut self, duration: std::time::Duration) -> Self {
135 self.steps_mut().push(BuilderStep::Delay {
136 config: DelayConfig::from_duration(duration),
137 });
138 self
139 }
140
141 fn delay_with_header(
142 mut self,
143 duration: std::time::Duration,
144 header: impl Into<String>,
145 ) -> Self {
146 self.steps_mut().push(BuilderStep::Delay {
147 config: DelayConfig::from_duration_with_header(duration, header),
148 });
149 self
150 }
151
152 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
156 self.steps_mut().push(BuilderStep::Log {
157 level,
158 message: message.into(),
159 });
160 self
161 }
162
163 fn convert_body_to(mut self, target: BodyType) -> Self {
175 let svc = ConvertBodyTo::new(IdentityProcessor, target);
176 self.steps_mut()
177 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
178 self
179 }
180
181 fn marshal(mut self, format: impl Into<String>) -> Self {
191 let name = format.into();
192 let df =
193 builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
194 let svc = MarshalService::new(IdentityProcessor, df);
195 self.steps_mut()
196 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
197 self
198 }
199
200 fn unmarshal(mut self, format: impl Into<String>) -> Self {
210 let name = format.into();
211 let df =
212 builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
213 let svc = UnmarshalService::new(IdentityProcessor, df);
214 self.steps_mut()
215 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
216 self
217 }
218
219 fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
230 self.steps_mut().push(BuilderStep::Script {
231 language: language.into(),
232 script: script.into(),
233 });
234 self
235 }
236}
237
238pub struct RouteBuilder {
250 from_uri: String,
251 steps: Vec<BuilderStep>,
252 error_handler: Option<ErrorHandlerConfig>,
253 error_handler_mode: ErrorHandlerMode,
254 circuit_breaker_config: Option<CircuitBreakerConfig>,
255 concurrency: Option<ConcurrencyModel>,
256 route_id: Option<String>,
257 auto_startup: Option<bool>,
258 startup_order: Option<i32>,
259}
260
261#[derive(Default)]
262enum ErrorHandlerMode {
263 #[default]
264 None,
265 ExplicitConfig,
266 Shorthand {
267 dlc_uri: Option<String>,
268 specs: Vec<OnExceptionSpec>,
269 },
270 Mixed,
271}
272
273#[derive(Clone)]
274struct OnExceptionSpec {
275 matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
276 retry: Option<RedeliveryPolicy>,
277 handled_by: Option<String>,
278}
279
280impl RouteBuilder {
281 pub fn from(endpoint: &str) -> Self {
283 Self {
284 from_uri: endpoint.to_string(),
285 steps: Vec::new(),
286 error_handler: None,
287 error_handler_mode: ErrorHandlerMode::None,
288 circuit_breaker_config: None,
289 concurrency: None,
290 route_id: None,
291 auto_startup: None,
292 startup_order: None,
293 }
294 }
295
296 pub fn filter<F>(self, predicate: F) -> FilterBuilder
300 where
301 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
302 {
303 FilterBuilder {
304 parent: self,
305 predicate: std::sync::Arc::new(predicate),
306 steps: vec![],
307 }
308 }
309
310 pub fn choice(self) -> ChoiceBuilder {
316 ChoiceBuilder {
317 parent: self,
318 whens: vec![],
319 _otherwise: None,
320 }
321 }
322
323 pub fn wire_tap(mut self, endpoint: &str) -> Self {
327 self.steps.push(BuilderStep::WireTap {
328 uri: endpoint.to_string(),
329 });
330 self
331 }
332
333 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
335 self.error_handler_mode = match self.error_handler_mode {
336 ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
337 ErrorHandlerMode::ExplicitConfig
338 }
339 ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
340 };
341 self.error_handler = Some(config);
342 self
343 }
344
345 pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
347 let uri = uri.into();
348 self.error_handler_mode = match self.error_handler_mode {
349 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
350 dlc_uri: Some(uri),
351 specs: Vec::new(),
352 },
353 ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
354 dlc_uri: Some(uri),
355 specs,
356 },
357 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
358 };
359 self
360 }
361
362 pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
364 where
365 F: Fn(&CamelError) -> bool + Send + Sync + 'static,
366 {
367 self.error_handler_mode = match self.error_handler_mode {
368 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
369 dlc_uri: None,
370 specs: Vec::new(),
371 },
372 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
373 shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
374 };
375
376 OnExceptionBuilder {
377 parent: self,
378 policy: OnExceptionSpec {
379 matches: std::sync::Arc::new(matches),
380 retry: None,
381 handled_by: None,
382 },
383 }
384 }
385
386 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
388 self.circuit_breaker_config = Some(config);
389 self
390 }
391
392 pub fn concurrent(mut self, max: usize) -> Self {
406 let max = if max == 0 { None } else { Some(max) };
407 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
408 self
409 }
410
411 pub fn sequential(mut self) -> Self {
416 self.concurrency = Some(ConcurrencyModel::Sequential);
417 self
418 }
419
420 pub fn route_id(mut self, id: impl Into<String>) -> Self {
424 self.route_id = Some(id.into());
425 self
426 }
427
428 pub fn auto_startup(mut self, auto: bool) -> Self {
432 self.auto_startup = Some(auto);
433 self
434 }
435
436 pub fn startup_order(mut self, order: i32) -> Self {
440 self.startup_order = Some(order);
441 self
442 }
443
444 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
450 SplitBuilder {
451 parent: self,
452 config,
453 steps: Vec::new(),
454 }
455 }
456
457 pub fn multicast(self) -> MulticastBuilder {
463 MulticastBuilder {
464 parent: self,
465 steps: Vec::new(),
466 config: MulticastConfig::new(),
467 }
468 }
469
470 pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
477 ThrottleBuilder {
478 parent: self,
479 config: ThrottlerConfig::new(max_requests, period),
480 steps: Vec::new(),
481 }
482 }
483
484 pub fn load_balance(self) -> LoadBalancerBuilder {
490 LoadBalancerBuilder {
491 parent: self,
492 config: LoadBalancerConfig::round_robin(),
493 steps: Vec::new(),
494 }
495 }
496
497 pub fn dynamic_router(self, expression: RouterExpression) -> Self {
513 self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
514 }
515
516 pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
520 self.steps.push(BuilderStep::DynamicRouter { config });
521 self
522 }
523
524 pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
525 self.routing_slip_with_config(RoutingSlipConfig::new(expression))
526 }
527
528 pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
529 self.steps.push(BuilderStep::RoutingSlip { config });
530 self
531 }
532
533 pub fn build(self) -> Result<RouteDefinition, CamelError> {
535 if self.from_uri.is_empty() {
536 return Err(CamelError::RouteError(
537 "route must have a 'from' URI".to_string(),
538 ));
539 }
540 let route_id = self.route_id.ok_or_else(|| {
541 CamelError::RouteError(
542 "route must have a 'route_id' — call .route_id(\"name\") on the builder"
543 .to_string(),
544 )
545 })?;
546 let resolved_error_handler = match self.error_handler_mode {
547 ErrorHandlerMode::None => self.error_handler,
548 ErrorHandlerMode::ExplicitConfig => self.error_handler,
549 ErrorHandlerMode::Mixed => {
550 return Err(CamelError::RouteError(
551 "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
552 ));
553 }
554 ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
555 let mut config = if let Some(uri) = dlc_uri {
556 ErrorHandlerConfig::dead_letter_channel(uri)
557 } else {
558 ErrorHandlerConfig::log_only()
559 };
560
561 for spec in specs {
562 let matcher = spec.matches.clone();
563 let mut builder = config.on_exception(move |e| matcher(e));
564
565 if let Some(retry) = spec.retry {
566 builder = builder.retry(retry.max_attempts).with_backoff(
567 retry.initial_delay,
568 retry.multiplier,
569 retry.max_delay,
570 );
571 if retry.jitter_factor > 0.0 {
572 builder = builder.with_jitter(retry.jitter_factor);
573 }
574 }
575
576 if let Some(uri) = spec.handled_by {
577 builder = builder.handled_by(uri);
578 }
579
580 config = builder.build();
581 }
582
583 Some(config)
584 }
585 };
586
587 let definition = RouteDefinition::new(self.from_uri, self.steps);
588 let definition = if let Some(eh) = resolved_error_handler {
589 definition.with_error_handler(eh)
590 } else {
591 definition
592 };
593 let definition = if let Some(cb) = self.circuit_breaker_config {
594 definition.with_circuit_breaker(cb)
595 } else {
596 definition
597 };
598 let definition = if let Some(concurrency) = self.concurrency {
599 definition.with_concurrency(concurrency)
600 } else {
601 definition
602 };
603 let definition = definition.with_route_id(route_id);
604 let definition = if let Some(auto) = self.auto_startup {
605 definition.with_auto_startup(auto)
606 } else {
607 definition
608 };
609 let definition = if let Some(order) = self.startup_order {
610 definition.with_startup_order(order)
611 } else {
612 definition
613 };
614 Ok(definition)
615 }
616
617 pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
619 if self.from_uri.is_empty() {
620 return Err(CamelError::RouteError(
621 "route must have a 'from' URI".to_string(),
622 ));
623 }
624 let route_id = self.route_id.ok_or_else(|| {
625 CamelError::RouteError(
626 "route must have a 'route_id' — call .route_id(\"name\") on the builder"
627 .to_string(),
628 )
629 })?;
630
631 let steps = canonicalize_steps(self.steps)?;
632 let circuit_breaker = self
633 .circuit_breaker_config
634 .map(canonicalize_circuit_breaker);
635
636 let spec = CanonicalRouteSpec {
637 route_id,
638 from: self.from_uri,
639 steps,
640 circuit_breaker,
641 version: camel_api::CANONICAL_CONTRACT_VERSION,
642 };
643 spec.validate_contract()?;
644 Ok(spec)
645 }
646}
647
648pub struct OnExceptionBuilder {
649 parent: RouteBuilder,
650 policy: OnExceptionSpec,
651}
652
653impl OnExceptionBuilder {
654 pub fn retry(mut self, max_attempts: u32) -> Self {
655 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
656 self
657 }
658
659 pub fn with_backoff(
660 mut self,
661 initial: std::time::Duration,
662 multiplier: f64,
663 max: std::time::Duration,
664 ) -> Self {
665 if let Some(ref mut retry) = self.policy.retry {
666 retry.initial_delay = initial;
667 retry.multiplier = multiplier;
668 retry.max_delay = max;
669 }
670 self
671 }
672
673 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
674 if let Some(ref mut retry) = self.policy.retry {
675 retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
676 }
677 self
678 }
679
680 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
681 self.policy.handled_by = Some(uri.into());
682 self
683 }
684
685 pub fn end_on_exception(mut self) -> RouteBuilder {
686 if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
687 specs.push(self.policy);
688 }
689 self.parent
690 }
691}
692
693fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
694 let mut canonical = Vec::with_capacity(steps.len());
695 for step in steps {
696 canonical.push(canonicalize_step(step)?);
697 }
698 Ok(canonical)
699}
700
701fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
702 match step {
703 BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
704 BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
705 BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
706 BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
707 BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
708 delay_ms: config.delay_ms,
709 dynamic_header: config.dynamic_header,
710 }),
711 BuilderStep::DeclarativeScript { expression } => {
712 Ok(CanonicalStepSpec::Script { expression })
713 }
714 BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
715 predicate,
716 steps: canonicalize_steps(steps)?,
717 }),
718 BuilderStep::DeclarativeChoice { whens, otherwise } => {
719 let mut canonical_whens = Vec::with_capacity(whens.len());
720 for DeclarativeWhenStep { predicate, steps } in whens {
721 canonical_whens.push(CanonicalWhenSpec {
722 predicate,
723 steps: canonicalize_steps(steps)?,
724 });
725 }
726 let otherwise = match otherwise {
727 Some(steps) => Some(canonicalize_steps(steps)?),
728 None => None,
729 };
730 Ok(CanonicalStepSpec::Choice {
731 whens: canonical_whens,
732 otherwise,
733 })
734 }
735 BuilderStep::DeclarativeSplit {
736 expression,
737 aggregation,
738 parallel,
739 parallel_limit,
740 stop_on_exception,
741 steps,
742 } => Ok(CanonicalStepSpec::Split {
743 expression: CanonicalSplitExpressionSpec::Language(expression),
744 aggregation: canonicalize_split_aggregation(aggregation)?,
745 parallel,
746 parallel_limit,
747 stop_on_exception,
748 steps: canonicalize_steps(steps)?,
749 }),
750 BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate {
751 config: canonicalize_aggregate(config)?,
752 }),
753 other => {
754 let step_name = canonical_step_name(&other);
755 let detail = camel_api::canonical_contract_rejection_reason(step_name)
756 .unwrap_or("not included in canonical v1");
757 Err(CamelError::RouteError(format!(
758 "canonical v1 does not support step `{step_name}`: {detail}"
759 )))
760 }
761 }
762}
763
764fn canonicalize_split_aggregation(
765 strategy: camel_api::splitter::AggregationStrategy,
766) -> Result<CanonicalSplitAggregationSpec, CamelError> {
767 match strategy {
768 camel_api::splitter::AggregationStrategy::LastWins => {
769 Ok(CanonicalSplitAggregationSpec::LastWins)
770 }
771 camel_api::splitter::AggregationStrategy::CollectAll => {
772 Ok(CanonicalSplitAggregationSpec::CollectAll)
773 }
774 camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
775 "canonical v1 does not support custom split aggregation".to_string(),
776 )),
777 camel_api::splitter::AggregationStrategy::Original => {
778 Ok(CanonicalSplitAggregationSpec::Original)
779 }
780 }
781}
782
783fn extract_completion_fields(
784 mode: &CompletionMode,
785) -> Result<(Option<usize>, Option<u64>), CamelError> {
786 match mode {
787 CompletionMode::Single(cond) => match cond {
788 CompletionCondition::Size(n) => Ok((Some(*n), None)),
789 CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
790 CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
791 "canonical v1 does not support aggregate predicate completion".to_string(),
792 )),
793 },
794 CompletionMode::Any(conds) => {
795 let mut size = None;
796 let mut timeout_ms = None;
797 for cond in conds {
798 match cond {
799 CompletionCondition::Size(n) => size = Some(*n),
800 CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
801 CompletionCondition::Predicate(_) => {
802 return Err(CamelError::RouteError(
803 "canonical v1 does not support aggregate predicate completion"
804 .to_string(),
805 ));
806 }
807 }
808 }
809 Ok((size, timeout_ms))
810 }
811 }
812}
813
814fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
815 let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
816
817 let header = match &config.correlation {
818 CorrelationStrategy::HeaderName(h) => h.clone(),
819 CorrelationStrategy::Expression { expr, .. } => expr.clone(),
820 CorrelationStrategy::Fn(_) => {
821 return Err(CamelError::RouteError(
822 "canonical v1 does not support Fn correlation strategy".to_string(),
823 ));
824 }
825 };
826
827 let correlation_key = match &config.correlation {
828 CorrelationStrategy::HeaderName(_) => None,
829 CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
830 CorrelationStrategy::Fn(_) => unreachable!(),
831 };
832
833 let strategy = match config.strategy {
834 AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
835 AggregationStrategy::Custom(_) => {
836 return Err(CamelError::RouteError(
837 "canonical v1 does not support custom aggregate strategy".to_string(),
838 ));
839 }
840 };
841 let bucket_ttl_ms = config
842 .bucket_ttl
843 .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
844
845 Ok(CanonicalAggregateSpec {
846 header,
847 completion_size,
848 completion_timeout_ms,
849 correlation_key,
850 force_completion_on_stop: if config.force_completion_on_stop {
851 Some(true)
852 } else {
853 None
854 },
855 discard_on_timeout: if config.discard_on_timeout {
856 Some(true)
857 } else {
858 None
859 },
860 strategy,
861 max_buckets: config.max_buckets,
862 bucket_ttl_ms,
863 })
864}
865
866fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
867 CanonicalCircuitBreakerSpec {
868 failure_threshold: config.failure_threshold,
869 open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
870 }
871}
872
873fn canonical_step_name(step: &BuilderStep) -> &'static str {
874 match step {
875 BuilderStep::Processor(_) => "processor",
876 BuilderStep::To(_) => "to",
877 BuilderStep::Stop => "stop",
878 BuilderStep::Log { .. } => "log",
879 BuilderStep::DeclarativeSetHeader { .. } => "set_header",
880 BuilderStep::DeclarativeSetBody { .. } => "set_body",
881 BuilderStep::DeclarativeFilter { .. } => "filter",
882 BuilderStep::DeclarativeChoice { .. } => "choice",
883 BuilderStep::DeclarativeScript { .. } => "script",
884 BuilderStep::DeclarativeSplit { .. } => "split",
885 BuilderStep::Split { .. } => "split",
886 BuilderStep::Aggregate { .. } => "aggregate",
887 BuilderStep::Filter { .. } => "filter",
888 BuilderStep::Choice { .. } => "choice",
889 BuilderStep::WireTap { .. } => "wire_tap",
890 BuilderStep::Delay { .. } => "delay",
891 BuilderStep::Multicast { .. } => "multicast",
892 BuilderStep::DeclarativeLog { .. } => "log",
893 BuilderStep::Bean { .. } => "bean",
894 BuilderStep::Script { .. } => "script",
895 BuilderStep::Throttle { .. } => "throttle",
896 BuilderStep::LoadBalance { .. } => "load_balancer",
897 BuilderStep::DynamicRouter { .. } => "dynamic_router",
898 BuilderStep::RoutingSlip { .. } => "routing_slip",
899 BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
900 BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
901 }
902}
903
904impl StepAccumulator for RouteBuilder {
905 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
906 &mut self.steps
907 }
908}
909
910pub struct SplitBuilder {
918 parent: RouteBuilder,
919 config: SplitterConfig,
920 steps: Vec<BuilderStep>,
921}
922
923impl SplitBuilder {
924 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
926 where
927 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
928 {
929 FilterInSplitBuilder {
930 parent: self,
931 predicate: std::sync::Arc::new(predicate),
932 steps: vec![],
933 }
934 }
935
936 pub fn end_split(mut self) -> RouteBuilder {
939 let split_step = BuilderStep::Split {
940 config: self.config,
941 steps: self.steps,
942 };
943 self.parent.steps.push(split_step);
944 self.parent
945 }
946}
947
948impl StepAccumulator for SplitBuilder {
949 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
950 &mut self.steps
951 }
952}
953
954pub struct FilterBuilder {
956 parent: RouteBuilder,
957 predicate: FilterPredicate,
958 steps: Vec<BuilderStep>,
959}
960
961impl FilterBuilder {
962 pub fn end_filter(mut self) -> RouteBuilder {
965 let step = BuilderStep::Filter {
966 predicate: self.predicate,
967 steps: self.steps,
968 };
969 self.parent.steps.push(step);
970 self.parent
971 }
972}
973
974impl StepAccumulator for FilterBuilder {
975 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
976 &mut self.steps
977 }
978}
979
980pub struct FilterInSplitBuilder {
982 parent: SplitBuilder,
983 predicate: FilterPredicate,
984 steps: Vec<BuilderStep>,
985}
986
987impl FilterInSplitBuilder {
988 pub fn end_filter(mut self) -> SplitBuilder {
990 let step = BuilderStep::Filter {
991 predicate: self.predicate,
992 steps: self.steps,
993 };
994 self.parent.steps.push(step);
995 self.parent
996 }
997}
998
999impl StepAccumulator for FilterInSplitBuilder {
1000 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1001 &mut self.steps
1002 }
1003}
1004
1005pub struct ChoiceBuilder {
1012 parent: RouteBuilder,
1013 whens: Vec<WhenStep>,
1014 _otherwise: Option<Vec<BuilderStep>>,
1015}
1016
1017impl ChoiceBuilder {
1018 pub fn when<F>(self, predicate: F) -> WhenBuilder
1021 where
1022 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1023 {
1024 WhenBuilder {
1025 parent: self,
1026 predicate: std::sync::Arc::new(predicate),
1027 steps: vec![],
1028 }
1029 }
1030
1031 pub fn otherwise(self) -> OtherwiseBuilder {
1035 OtherwiseBuilder {
1036 parent: self,
1037 steps: vec![],
1038 }
1039 }
1040
1041 pub fn end_choice(mut self) -> RouteBuilder {
1045 let step = BuilderStep::Choice {
1046 whens: self.whens,
1047 otherwise: self._otherwise,
1048 };
1049 self.parent.steps.push(step);
1050 self.parent
1051 }
1052}
1053
1054pub struct WhenBuilder {
1056 parent: ChoiceBuilder,
1057 predicate: camel_api::FilterPredicate,
1058 steps: Vec<BuilderStep>,
1059}
1060
1061impl WhenBuilder {
1062 pub fn end_when(mut self) -> ChoiceBuilder {
1065 self.parent.whens.push(WhenStep {
1066 predicate: self.predicate,
1067 steps: self.steps,
1068 });
1069 self.parent
1070 }
1071}
1072
1073impl StepAccumulator for WhenBuilder {
1074 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1075 &mut self.steps
1076 }
1077}
1078
1079pub struct OtherwiseBuilder {
1081 parent: ChoiceBuilder,
1082 steps: Vec<BuilderStep>,
1083}
1084
1085impl OtherwiseBuilder {
1086 pub fn end_otherwise(self) -> ChoiceBuilder {
1088 let OtherwiseBuilder { mut parent, steps } = self;
1089 parent._otherwise = Some(steps);
1090 parent
1091 }
1092}
1093
1094impl StepAccumulator for OtherwiseBuilder {
1095 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1096 &mut self.steps
1097 }
1098}
1099
1100pub struct MulticastBuilder {
1108 parent: RouteBuilder,
1109 steps: Vec<BuilderStep>,
1110 config: MulticastConfig,
1111}
1112
1113impl MulticastBuilder {
1114 pub fn parallel(mut self, parallel: bool) -> Self {
1115 self.config = self.config.parallel(parallel);
1116 self
1117 }
1118
1119 pub fn parallel_limit(mut self, limit: usize) -> Self {
1120 self.config = self.config.parallel_limit(limit);
1121 self
1122 }
1123
1124 pub fn stop_on_exception(mut self, stop: bool) -> Self {
1125 self.config = self.config.stop_on_exception(stop);
1126 self
1127 }
1128
1129 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1130 self.config = self.config.timeout(duration);
1131 self
1132 }
1133
1134 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1135 self.config = self.config.aggregation(strategy);
1136 self
1137 }
1138
1139 pub fn end_multicast(mut self) -> RouteBuilder {
1140 let step = BuilderStep::Multicast {
1141 steps: self.steps,
1142 config: self.config,
1143 };
1144 self.parent.steps.push(step);
1145 self.parent
1146 }
1147}
1148
1149impl StepAccumulator for MulticastBuilder {
1150 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1151 &mut self.steps
1152 }
1153}
1154
1155pub struct ThrottleBuilder {
1163 parent: RouteBuilder,
1164 config: ThrottlerConfig,
1165 steps: Vec<BuilderStep>,
1166}
1167
1168impl ThrottleBuilder {
1169 pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1175 self.config = self.config.strategy(strategy);
1176 self
1177 }
1178
1179 pub fn end_throttle(mut self) -> RouteBuilder {
1182 let step = BuilderStep::Throttle {
1183 config: self.config,
1184 steps: self.steps,
1185 };
1186 self.parent.steps.push(step);
1187 self.parent
1188 }
1189}
1190
1191impl StepAccumulator for ThrottleBuilder {
1192 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1193 &mut self.steps
1194 }
1195}
1196
1197pub struct LoadBalancerBuilder {
1205 parent: RouteBuilder,
1206 config: LoadBalancerConfig,
1207 steps: Vec<BuilderStep>,
1208}
1209
1210impl LoadBalancerBuilder {
1211 pub fn round_robin(mut self) -> Self {
1213 self.config = LoadBalancerConfig::round_robin();
1214 self
1215 }
1216
1217 pub fn random(mut self) -> Self {
1219 self.config = LoadBalancerConfig::random();
1220 self
1221 }
1222
1223 pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1228 self.config = LoadBalancerConfig::weighted(weights);
1229 self
1230 }
1231
1232 pub fn failover(mut self) -> Self {
1237 self.config = LoadBalancerConfig::failover();
1238 self
1239 }
1240
1241 pub fn parallel(mut self, parallel: bool) -> Self {
1246 self.config = self.config.parallel(parallel);
1247 self
1248 }
1249
1250 pub fn end_load_balance(mut self) -> RouteBuilder {
1253 let step = BuilderStep::LoadBalance {
1254 config: self.config,
1255 steps: self.steps,
1256 };
1257 self.parent.steps.push(step);
1258 self.parent
1259 }
1260}
1261
1262impl StepAccumulator for LoadBalancerBuilder {
1263 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1264 &mut self.steps
1265 }
1266}
1267
1268#[cfg(test)]
1273mod tests {
1274 use super::*;
1275 use camel_api::error_handler::ErrorHandlerConfig;
1276 use camel_api::load_balancer::LoadBalanceStrategy;
1277 use camel_api::{Exchange, Message};
1278 use camel_core::route::BuilderStep;
1279 use std::sync::Arc;
1280 use std::time::Duration;
1281 use tower::{Service, ServiceExt};
1282
1283 #[test]
1284 fn test_builder_from_creates_definition() {
1285 let definition = RouteBuilder::from("timer:tick")
1286 .route_id("test-route")
1287 .build()
1288 .unwrap();
1289 assert_eq!(definition.from_uri(), "timer:tick");
1290 }
1291
1292 #[test]
1293 fn test_builder_empty_from_uri_errors() {
1294 let result = RouteBuilder::from("").route_id("test-route").build();
1295 assert!(result.is_err());
1296 }
1297
1298 #[test]
1299 fn test_builder_to_adds_step() {
1300 let definition = RouteBuilder::from("timer:tick")
1301 .route_id("test-route")
1302 .to("log:info")
1303 .build()
1304 .unwrap();
1305
1306 assert_eq!(definition.from_uri(), "timer:tick");
1307 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1309 }
1310
1311 #[test]
1312 fn test_builder_filter_adds_filter_step() {
1313 let definition = RouteBuilder::from("timer:tick")
1314 .route_id("test-route")
1315 .filter(|_ex| true)
1316 .to("mock:result")
1317 .end_filter()
1318 .build()
1319 .unwrap();
1320
1321 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1322 }
1323
1324 #[test]
1325 fn test_builder_set_header_adds_processor_step() {
1326 let definition = RouteBuilder::from("timer:tick")
1327 .route_id("test-route")
1328 .set_header("key", Value::String("value".into()))
1329 .build()
1330 .unwrap();
1331
1332 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1333 }
1334
1335 #[test]
1336 fn test_builder_map_body_adds_processor_step() {
1337 let definition = RouteBuilder::from("timer:tick")
1338 .route_id("test-route")
1339 .map_body(|body| body)
1340 .build()
1341 .unwrap();
1342
1343 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1344 }
1345
1346 #[test]
1347 fn test_builder_process_adds_processor_step() {
1348 let definition = RouteBuilder::from("timer:tick")
1349 .route_id("test-route")
1350 .process(|ex| async move { Ok(ex) })
1351 .build()
1352 .unwrap();
1353
1354 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1355 }
1356
1357 #[test]
1358 fn test_builder_chain_multiple_steps() {
1359 let definition = RouteBuilder::from("timer:tick")
1360 .route_id("test-route")
1361 .set_header("source", Value::String("timer".into()))
1362 .filter(|ex| ex.input.header("source").is_some())
1363 .to("log:info")
1364 .end_filter()
1365 .to("mock:result")
1366 .build()
1367 .unwrap();
1368
1369 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1373 }
1374
1375 #[tokio::test]
1380 async fn test_set_header_processor_works() {
1381 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1382 let exchange = Exchange::new(Message::new("test"));
1383 let result = svc.call(exchange).await.unwrap();
1384 assert_eq!(
1385 result.input.header("greeting"),
1386 Some(&Value::String("hello".into()))
1387 );
1388 }
1389
1390 #[tokio::test]
1391 async fn test_filter_processor_passes() {
1392 use camel_api::BoxProcessorExt;
1393 use camel_processor::FilterService;
1394
1395 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1396 let mut svc =
1397 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1398 let exchange = Exchange::new(Message::new("pass"));
1399 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1400 assert_eq!(result.input.body.as_text(), Some("pass"));
1401 }
1402
1403 #[tokio::test]
1404 async fn test_filter_processor_blocks() {
1405 use camel_api::BoxProcessorExt;
1406 use camel_processor::FilterService;
1407
1408 let sub = BoxProcessor::from_fn(|_ex| {
1409 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1410 });
1411 let mut svc =
1412 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1413 let exchange = Exchange::new(Message::new("reject"));
1414 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1415 assert_eq!(result.input.body.as_text(), Some("reject"));
1416 }
1417
1418 #[tokio::test]
1419 async fn test_map_body_processor_works() {
1420 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1421 if let Some(text) = body.as_text() {
1422 Body::Text(text.to_uppercase())
1423 } else {
1424 body
1425 }
1426 });
1427 let exchange = Exchange::new(Message::new("hello"));
1428 let result = mapper.oneshot(exchange).await.unwrap();
1429 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1430 }
1431
1432 #[tokio::test]
1433 async fn test_process_custom_processor_works() {
1434 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1435 ex.set_property("custom", Value::Bool(true));
1436 Ok(ex)
1437 });
1438 let exchange = Exchange::new(Message::default());
1439 let result = processor.oneshot(exchange).await.unwrap();
1440 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1441 }
1442
1443 #[tokio::test]
1448 async fn test_compose_pipeline_runs_steps_in_order() {
1449 use camel_core::route::compose_pipeline;
1450
1451 let processors = vec![
1452 BoxProcessor::new(SetHeader::new(
1453 IdentityProcessor,
1454 "step",
1455 Value::String("one".into()),
1456 )),
1457 BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1458 if let Some(text) = body.as_text() {
1459 Body::Text(format!("{}-processed", text))
1460 } else {
1461 body
1462 }
1463 })),
1464 ];
1465
1466 let pipeline = compose_pipeline(processors);
1467 let exchange = Exchange::new(Message::new("hello"));
1468 let result = pipeline.oneshot(exchange).await.unwrap();
1469
1470 assert_eq!(
1471 result.input.header("step"),
1472 Some(&Value::String("one".into()))
1473 );
1474 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1475 }
1476
1477 #[tokio::test]
1478 async fn test_compose_pipeline_empty_is_identity() {
1479 use camel_core::route::compose_pipeline;
1480
1481 let pipeline = compose_pipeline(vec![]);
1482 let exchange = Exchange::new(Message::new("unchanged"));
1483 let result = pipeline.oneshot(exchange).await.unwrap();
1484 assert_eq!(result.input.body.as_text(), Some("unchanged"));
1485 }
1486
1487 #[test]
1492 fn test_builder_circuit_breaker_sets_config() {
1493 use camel_api::circuit_breaker::CircuitBreakerConfig;
1494
1495 let config = CircuitBreakerConfig::new().failure_threshold(5);
1496 let definition = RouteBuilder::from("timer:tick")
1497 .route_id("test-route")
1498 .circuit_breaker(config)
1499 .build()
1500 .unwrap();
1501
1502 let cb = definition
1503 .circuit_breaker_config()
1504 .expect("circuit breaker should be set");
1505 assert_eq!(cb.failure_threshold, 5);
1506 }
1507
1508 #[test]
1509 fn test_builder_circuit_breaker_with_error_handler() {
1510 use camel_api::circuit_breaker::CircuitBreakerConfig;
1511 use camel_api::error_handler::ErrorHandlerConfig;
1512
1513 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1514 let eh_config = ErrorHandlerConfig::log_only();
1515
1516 let definition = RouteBuilder::from("timer:tick")
1517 .route_id("test-route")
1518 .to("log:info")
1519 .circuit_breaker(cb_config)
1520 .error_handler(eh_config)
1521 .build()
1522 .unwrap();
1523
1524 assert!(
1525 definition.circuit_breaker_config().is_some(),
1526 "circuit breaker config should be set"
1527 );
1528 }
1530
1531 #[test]
1532 fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1533 let definition = RouteBuilder::from("direct:start")
1534 .route_id("test-route")
1535 .dead_letter_channel("log:dlc")
1536 .on_exception(|e| matches!(e, CamelError::Io(_)))
1537 .retry(3)
1538 .handled_by("log:io")
1539 .end_on_exception()
1540 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1541 .retry(1)
1542 .end_on_exception()
1543 .to("mock:out")
1544 .build()
1545 .expect("route should build");
1546
1547 let cfg = definition
1548 .error_handler_config()
1549 .expect("error handler should be set");
1550 assert_eq!(cfg.policies.len(), 2);
1551 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1552 assert_eq!(
1553 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1554 Some(3)
1555 );
1556 assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1557 assert_eq!(
1558 cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1559 Some(1)
1560 );
1561 }
1562
1563 #[test]
1564 fn test_builder_on_exception_mixed_mode_rejected() {
1565 let result = RouteBuilder::from("direct:start")
1566 .route_id("test-route")
1567 .error_handler(ErrorHandlerConfig::log_only())
1568 .on_exception(|_e| true)
1569 .end_on_exception()
1570 .to("mock:out")
1571 .build();
1572
1573 let err = result.err().expect("mixed mode should fail with an error");
1574
1575 assert!(
1576 format!("{err}").contains("mixed error handler modes"),
1577 "unexpected error: {err}"
1578 );
1579 }
1580
1581 #[test]
1582 fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1583 let definition = RouteBuilder::from("direct:start")
1584 .route_id("test-route")
1585 .on_exception(|_e| true)
1586 .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1587 .with_jitter(0.5)
1588 .end_on_exception()
1589 .to("mock:out")
1590 .build()
1591 .expect("route should build");
1592
1593 let cfg = definition
1594 .error_handler_config()
1595 .expect("error handler should be set");
1596 assert_eq!(cfg.policies.len(), 1);
1597 assert!(cfg.policies[0].retry.is_none());
1598 }
1599
1600 #[test]
1601 fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1602 let definition = RouteBuilder::from("direct:start")
1603 .route_id("test-route")
1604 .dead_letter_channel("log:dlc")
1605 .to("mock:out")
1606 .build()
1607 .expect("route should build");
1608
1609 let cfg = definition
1610 .error_handler_config()
1611 .expect("error handler should be set");
1612 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1613 assert!(cfg.policies.is_empty());
1614 }
1615
1616 #[test]
1617 fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1618 let definition = RouteBuilder::from("direct:start")
1619 .route_id("test-route")
1620 .dead_letter_channel("log:first")
1621 .on_exception(|e| matches!(e, CamelError::Io(_)))
1622 .retry(2)
1623 .end_on_exception()
1624 .dead_letter_channel("log:second")
1625 .to("mock:out")
1626 .build()
1627 .expect("route should build");
1628
1629 let cfg = definition
1630 .error_handler_config()
1631 .expect("error handler should be set");
1632 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1633 assert_eq!(cfg.policies.len(), 1);
1634 assert_eq!(
1635 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1636 Some(2)
1637 );
1638 }
1639
1640 #[test]
1641 fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
1642 let definition = RouteBuilder::from("direct:start")
1643 .route_id("test-route")
1644 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1645 .retry(1)
1646 .end_on_exception()
1647 .to("mock:out")
1648 .build()
1649 .expect("route should build");
1650
1651 let cfg = definition
1652 .error_handler_config()
1653 .expect("error handler should be set");
1654 assert!(cfg.dlc_uri.is_none());
1655 assert_eq!(cfg.policies.len(), 1);
1656 }
1657
1658 #[test]
1659 fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
1660 let first = ErrorHandlerConfig::dead_letter_channel("log:first");
1661 let second = ErrorHandlerConfig::dead_letter_channel("log:second");
1662
1663 let definition = RouteBuilder::from("direct:start")
1664 .route_id("test-route")
1665 .error_handler(first)
1666 .error_handler(second)
1667 .to("mock:out")
1668 .build()
1669 .expect("route should build");
1670
1671 let cfg = definition
1672 .error_handler_config()
1673 .expect("error handler should be set");
1674 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1675 }
1676
1677 #[test]
1680 fn test_split_builder_typestate() {
1681 use camel_api::splitter::{SplitterConfig, split_body_lines};
1682
1683 let definition = RouteBuilder::from("timer:test?period=1000")
1685 .route_id("test-route")
1686 .split(SplitterConfig::new(split_body_lines()))
1687 .to("mock:per-fragment")
1688 .end_split()
1689 .to("mock:final")
1690 .build()
1691 .unwrap();
1692
1693 assert_eq!(definition.steps().len(), 2);
1695 }
1696
1697 #[test]
1698 fn test_split_builder_steps_collected() {
1699 use camel_api::splitter::{SplitterConfig, split_body_lines};
1700
1701 let definition = RouteBuilder::from("timer:test?period=1000")
1702 .route_id("test-route")
1703 .split(SplitterConfig::new(split_body_lines()))
1704 .set_header("fragment", Value::String("yes".into()))
1705 .to("mock:per-fragment")
1706 .end_split()
1707 .build()
1708 .unwrap();
1709
1710 assert_eq!(definition.steps().len(), 1);
1712 match &definition.steps()[0] {
1713 BuilderStep::Split { steps, .. } => {
1714 assert_eq!(steps.len(), 2); }
1716 other => panic!("Expected Split, got {:?}", other),
1717 }
1718 }
1719
1720 #[test]
1721 fn test_split_builder_config_propagated() {
1722 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1723
1724 let definition = RouteBuilder::from("timer:test?period=1000")
1725 .route_id("test-route")
1726 .split(
1727 SplitterConfig::new(split_body_lines())
1728 .parallel(true)
1729 .parallel_limit(4)
1730 .aggregation(AggregationStrategy::CollectAll),
1731 )
1732 .to("mock:per-fragment")
1733 .end_split()
1734 .build()
1735 .unwrap();
1736
1737 match &definition.steps()[0] {
1738 BuilderStep::Split { config, .. } => {
1739 assert!(config.parallel);
1740 assert_eq!(config.parallel_limit, Some(4));
1741 assert!(matches!(
1742 config.aggregation,
1743 AggregationStrategy::CollectAll
1744 ));
1745 }
1746 other => panic!("Expected Split, got {:?}", other),
1747 }
1748 }
1749
1750 #[test]
1751 fn test_aggregate_builder_adds_step() {
1752 use camel_api::aggregator::AggregatorConfig;
1753 use camel_core::route::BuilderStep;
1754
1755 let definition = RouteBuilder::from("timer:tick")
1756 .route_id("test-route")
1757 .aggregate(
1758 AggregatorConfig::correlate_by("key")
1759 .complete_when_size(2)
1760 .build(),
1761 )
1762 .build()
1763 .unwrap();
1764
1765 assert_eq!(definition.steps().len(), 1);
1766 assert!(matches!(
1767 definition.steps()[0],
1768 BuilderStep::Aggregate { .. }
1769 ));
1770 }
1771
1772 #[test]
1773 fn test_aggregate_in_split_builder() {
1774 use camel_api::aggregator::AggregatorConfig;
1775 use camel_api::splitter::{SplitterConfig, split_body_lines};
1776 use camel_core::route::BuilderStep;
1777
1778 let definition = RouteBuilder::from("timer:tick")
1779 .route_id("test-route")
1780 .split(SplitterConfig::new(split_body_lines()))
1781 .aggregate(
1782 AggregatorConfig::correlate_by("key")
1783 .complete_when_size(1)
1784 .build(),
1785 )
1786 .end_split()
1787 .build()
1788 .unwrap();
1789
1790 assert_eq!(definition.steps().len(), 1);
1791 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
1792 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
1793 } else {
1794 panic!("expected Split step");
1795 }
1796 }
1797
1798 #[test]
1801 fn test_builder_set_body_static_adds_processor() {
1802 let definition = RouteBuilder::from("timer:tick")
1803 .route_id("test-route")
1804 .set_body("fixed")
1805 .build()
1806 .unwrap();
1807 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1808 }
1809
1810 #[test]
1811 fn test_builder_set_body_fn_adds_processor() {
1812 let definition = RouteBuilder::from("timer:tick")
1813 .route_id("test-route")
1814 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
1815 .build()
1816 .unwrap();
1817 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1818 }
1819
1820 #[test]
1821 fn transform_alias_produces_same_as_set_body() {
1822 let route_transform = RouteBuilder::from("timer:tick")
1823 .route_id("test-route")
1824 .transform("hello")
1825 .build()
1826 .unwrap();
1827
1828 let route_set_body = RouteBuilder::from("timer:tick")
1829 .route_id("test-route")
1830 .set_body("hello")
1831 .build()
1832 .unwrap();
1833
1834 assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
1835 }
1836
1837 #[test]
1838 fn test_builder_set_header_fn_adds_processor() {
1839 let definition = RouteBuilder::from("timer:tick")
1840 .route_id("test-route")
1841 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
1842 .build()
1843 .unwrap();
1844 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1845 }
1846
1847 #[tokio::test]
1848 async fn test_set_body_static_processor_works() {
1849 use camel_core::route::compose_pipeline;
1850 let def = RouteBuilder::from("t:t")
1851 .route_id("test-route")
1852 .set_body("replaced")
1853 .build()
1854 .unwrap();
1855 let pipeline = compose_pipeline(
1856 def.steps()
1857 .iter()
1858 .filter_map(|s| {
1859 if let BuilderStep::Processor(p) = s {
1860 Some(p.clone())
1861 } else {
1862 None
1863 }
1864 })
1865 .collect(),
1866 );
1867 let exchange = Exchange::new(Message::new("original"));
1868 let result = pipeline.oneshot(exchange).await.unwrap();
1869 assert_eq!(result.input.body.as_text(), Some("replaced"));
1870 }
1871
1872 #[tokio::test]
1873 async fn test_set_body_fn_processor_works() {
1874 use camel_core::route::compose_pipeline;
1875 let def = RouteBuilder::from("t:t")
1876 .route_id("test-route")
1877 .set_body_fn(|ex: &Exchange| {
1878 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
1879 })
1880 .build()
1881 .unwrap();
1882 let pipeline = compose_pipeline(
1883 def.steps()
1884 .iter()
1885 .filter_map(|s| {
1886 if let BuilderStep::Processor(p) = s {
1887 Some(p.clone())
1888 } else {
1889 None
1890 }
1891 })
1892 .collect(),
1893 );
1894 let exchange = Exchange::new(Message::new("hello"));
1895 let result = pipeline.oneshot(exchange).await.unwrap();
1896 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1897 }
1898
1899 #[tokio::test]
1900 async fn test_set_header_fn_processor_works() {
1901 use camel_core::route::compose_pipeline;
1902 let def = RouteBuilder::from("t:t")
1903 .route_id("test-route")
1904 .set_header_fn("echo", |ex: &Exchange| {
1905 ex.input
1906 .body
1907 .as_text()
1908 .map(|t| Value::String(t.into()))
1909 .unwrap_or(Value::Null)
1910 })
1911 .build()
1912 .unwrap();
1913 let pipeline = compose_pipeline(
1914 def.steps()
1915 .iter()
1916 .filter_map(|s| {
1917 if let BuilderStep::Processor(p) = s {
1918 Some(p.clone())
1919 } else {
1920 None
1921 }
1922 })
1923 .collect(),
1924 );
1925 let exchange = Exchange::new(Message::new("ping"));
1926 let result = pipeline.oneshot(exchange).await.unwrap();
1927 assert_eq!(
1928 result.input.header("echo"),
1929 Some(&Value::String("ping".into()))
1930 );
1931 }
1932
1933 #[test]
1936 fn test_filter_builder_typestate() {
1937 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1938 .route_id("test-route")
1939 .filter(|_ex| true)
1940 .to("mock:inner")
1941 .end_filter()
1942 .to("mock:outer")
1943 .build();
1944 assert!(result.is_ok());
1945 }
1946
1947 #[test]
1948 fn test_filter_builder_steps_collected() {
1949 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1950 .route_id("test-route")
1951 .filter(|_ex| true)
1952 .to("mock:inner")
1953 .end_filter()
1954 .build()
1955 .unwrap();
1956
1957 assert_eq!(definition.steps().len(), 1);
1958 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1959 }
1960
1961 #[test]
1962 fn test_wire_tap_builder_adds_step() {
1963 let definition = RouteBuilder::from("timer:tick")
1964 .route_id("test-route")
1965 .wire_tap("mock:tap")
1966 .to("mock:result")
1967 .build()
1968 .unwrap();
1969
1970 assert_eq!(definition.steps().len(), 2);
1971 assert!(
1972 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
1973 );
1974 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
1975 }
1976
1977 #[test]
1980 fn test_multicast_builder_typestate() {
1981 let definition = RouteBuilder::from("timer:tick")
1982 .route_id("test-route")
1983 .multicast()
1984 .to("direct:a")
1985 .to("direct:b")
1986 .end_multicast()
1987 .to("mock:result")
1988 .build()
1989 .unwrap();
1990
1991 assert_eq!(definition.steps().len(), 2); }
1993
1994 #[test]
1995 fn test_multicast_builder_steps_collected() {
1996 let definition = RouteBuilder::from("timer:tick")
1997 .route_id("test-route")
1998 .multicast()
1999 .to("direct:a")
2000 .to("direct:b")
2001 .end_multicast()
2002 .build()
2003 .unwrap();
2004
2005 match &definition.steps()[0] {
2006 BuilderStep::Multicast { steps, .. } => {
2007 assert_eq!(steps.len(), 2);
2008 }
2009 other => panic!("Expected Multicast, got {:?}", other),
2010 }
2011 }
2012
2013 #[test]
2016 fn test_builder_concurrent_sets_concurrency() {
2017 use camel_component_api::ConcurrencyModel;
2018
2019 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2020 .route_id("test-route")
2021 .concurrent(16)
2022 .to("log:info")
2023 .build()
2024 .unwrap();
2025
2026 assert_eq!(
2027 definition.concurrency_override(),
2028 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2029 );
2030 }
2031
2032 #[test]
2033 fn test_builder_concurrent_zero_means_unbounded() {
2034 use camel_component_api::ConcurrencyModel;
2035
2036 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2037 .route_id("test-route")
2038 .concurrent(0)
2039 .to("log:info")
2040 .build()
2041 .unwrap();
2042
2043 assert_eq!(
2044 definition.concurrency_override(),
2045 Some(&ConcurrencyModel::Concurrent { max: None })
2046 );
2047 }
2048
2049 #[test]
2050 fn test_builder_sequential_sets_concurrency() {
2051 use camel_component_api::ConcurrencyModel;
2052
2053 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2054 .route_id("test-route")
2055 .sequential()
2056 .to("log:info")
2057 .build()
2058 .unwrap();
2059
2060 assert_eq!(
2061 definition.concurrency_override(),
2062 Some(&ConcurrencyModel::Sequential)
2063 );
2064 }
2065
2066 #[test]
2067 fn test_builder_default_concurrency_is_none() {
2068 let definition = RouteBuilder::from("timer:tick")
2069 .route_id("test-route")
2070 .to("log:info")
2071 .build()
2072 .unwrap();
2073
2074 assert_eq!(definition.concurrency_override(), None);
2075 }
2076
2077 #[test]
2080 fn test_builder_route_id_sets_id() {
2081 let definition = RouteBuilder::from("timer:tick")
2082 .route_id("my-route")
2083 .build()
2084 .unwrap();
2085
2086 assert_eq!(definition.route_id(), "my-route");
2087 }
2088
2089 #[test]
2090 fn test_build_without_route_id_fails() {
2091 let result = RouteBuilder::from("timer:tick?period=1000")
2092 .to("log:info")
2093 .build();
2094 let err = match result {
2095 Err(e) => e.to_string(),
2096 Ok(_) => panic!("build() should fail without route_id"),
2097 };
2098 assert!(
2099 err.contains("route_id"),
2100 "error should mention route_id, got: {}",
2101 err
2102 );
2103 }
2104
2105 #[test]
2106 fn test_builder_auto_startup_false() {
2107 let definition = RouteBuilder::from("timer:tick")
2108 .route_id("test-route")
2109 .auto_startup(false)
2110 .build()
2111 .unwrap();
2112
2113 assert!(!definition.auto_startup());
2114 }
2115
2116 #[test]
2117 fn test_builder_startup_order_custom() {
2118 let definition = RouteBuilder::from("timer:tick")
2119 .route_id("test-route")
2120 .startup_order(50)
2121 .build()
2122 .unwrap();
2123
2124 assert_eq!(definition.startup_order(), 50);
2125 }
2126
2127 #[test]
2128 fn test_builder_defaults() {
2129 let definition = RouteBuilder::from("timer:tick")
2130 .route_id("test-route")
2131 .build()
2132 .unwrap();
2133
2134 assert_eq!(definition.route_id(), "test-route");
2135 assert!(definition.auto_startup());
2136 assert_eq!(definition.startup_order(), 1000);
2137 }
2138
2139 #[test]
2142 fn test_choice_builder_single_when() {
2143 let definition = RouteBuilder::from("timer:tick")
2144 .route_id("test-route")
2145 .choice()
2146 .when(|ex: &Exchange| ex.input.header("type").is_some())
2147 .to("mock:typed")
2148 .end_when()
2149 .end_choice()
2150 .build()
2151 .unwrap();
2152 assert_eq!(definition.steps().len(), 1);
2153 assert!(
2154 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2155 if whens.len() == 1 && otherwise.is_none())
2156 );
2157 }
2158
2159 #[test]
2160 fn test_choice_builder_when_otherwise() {
2161 let definition = RouteBuilder::from("timer:tick")
2162 .route_id("test-route")
2163 .choice()
2164 .when(|ex: &Exchange| ex.input.header("a").is_some())
2165 .to("mock:a")
2166 .end_when()
2167 .otherwise()
2168 .to("mock:fallback")
2169 .end_otherwise()
2170 .end_choice()
2171 .build()
2172 .unwrap();
2173 assert!(
2174 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2175 if whens.len() == 1 && otherwise.is_some())
2176 );
2177 }
2178
2179 #[test]
2180 fn test_choice_builder_multiple_whens() {
2181 let definition = RouteBuilder::from("timer:tick")
2182 .route_id("test-route")
2183 .choice()
2184 .when(|ex: &Exchange| ex.input.header("a").is_some())
2185 .to("mock:a")
2186 .end_when()
2187 .when(|ex: &Exchange| ex.input.header("b").is_some())
2188 .to("mock:b")
2189 .end_when()
2190 .end_choice()
2191 .build()
2192 .unwrap();
2193 assert!(
2194 matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2195 if whens.len() == 2)
2196 );
2197 }
2198
2199 #[test]
2200 fn test_choice_step_after_choice() {
2201 let definition = RouteBuilder::from("timer:tick")
2203 .route_id("test-route")
2204 .choice()
2205 .when(|_ex: &Exchange| true)
2206 .to("mock:inner")
2207 .end_when()
2208 .end_choice()
2209 .to("mock:outer") .build()
2211 .unwrap();
2212 assert_eq!(definition.steps().len(), 2);
2213 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2214 }
2215
2216 #[test]
2219 fn test_throttle_builder_typestate() {
2220 let definition = RouteBuilder::from("timer:tick")
2221 .route_id("test-route")
2222 .throttle(10, std::time::Duration::from_secs(1))
2223 .to("mock:result")
2224 .end_throttle()
2225 .build()
2226 .unwrap();
2227
2228 assert_eq!(definition.steps().len(), 1);
2229 assert!(matches!(
2230 &definition.steps()[0],
2231 BuilderStep::Throttle { .. }
2232 ));
2233 }
2234
2235 #[test]
2236 fn test_throttle_builder_with_strategy() {
2237 let definition = RouteBuilder::from("timer:tick")
2238 .route_id("test-route")
2239 .throttle(10, std::time::Duration::from_secs(1))
2240 .strategy(ThrottleStrategy::Reject)
2241 .to("mock:result")
2242 .end_throttle()
2243 .build()
2244 .unwrap();
2245
2246 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2247 assert_eq!(config.strategy, ThrottleStrategy::Reject);
2248 } else {
2249 panic!("Expected Throttle step");
2250 }
2251 }
2252
2253 #[test]
2254 fn test_throttle_builder_steps_collected() {
2255 let definition = RouteBuilder::from("timer:tick")
2256 .route_id("test-route")
2257 .throttle(5, std::time::Duration::from_secs(1))
2258 .set_header("throttled", Value::Bool(true))
2259 .to("mock:throttled")
2260 .end_throttle()
2261 .build()
2262 .unwrap();
2263
2264 match &definition.steps()[0] {
2265 BuilderStep::Throttle { steps, .. } => {
2266 assert_eq!(steps.len(), 2); }
2268 other => panic!("Expected Throttle, got {:?}", other),
2269 }
2270 }
2271
2272 #[test]
2273 fn test_throttle_step_after_throttle() {
2274 let definition = RouteBuilder::from("timer:tick")
2276 .route_id("test-route")
2277 .throttle(10, std::time::Duration::from_secs(1))
2278 .to("mock:inner")
2279 .end_throttle()
2280 .to("mock:outer")
2281 .build()
2282 .unwrap();
2283
2284 assert_eq!(definition.steps().len(), 2);
2285 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2286 }
2287
2288 #[test]
2291 fn test_load_balance_builder_typestate() {
2292 let definition = RouteBuilder::from("timer:tick")
2293 .route_id("test-route")
2294 .load_balance()
2295 .round_robin()
2296 .to("mock:a")
2297 .to("mock:b")
2298 .end_load_balance()
2299 .build()
2300 .unwrap();
2301
2302 assert_eq!(definition.steps().len(), 1);
2303 assert!(matches!(
2304 &definition.steps()[0],
2305 BuilderStep::LoadBalance { .. }
2306 ));
2307 }
2308
2309 #[test]
2310 fn test_load_balance_builder_with_strategy() {
2311 let definition = RouteBuilder::from("timer:tick")
2312 .route_id("test-route")
2313 .load_balance()
2314 .random()
2315 .to("mock:result")
2316 .end_load_balance()
2317 .build()
2318 .unwrap();
2319
2320 if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2321 assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2322 } else {
2323 panic!("Expected LoadBalance step");
2324 }
2325 }
2326
2327 #[test]
2328 fn test_load_balance_builder_steps_collected() {
2329 let definition = RouteBuilder::from("timer:tick")
2330 .route_id("test-route")
2331 .load_balance()
2332 .set_header("lb", Value::Bool(true))
2333 .to("mock:a")
2334 .end_load_balance()
2335 .build()
2336 .unwrap();
2337
2338 match &definition.steps()[0] {
2339 BuilderStep::LoadBalance { steps, .. } => {
2340 assert_eq!(steps.len(), 2); }
2342 other => panic!("Expected LoadBalance, got {:?}", other),
2343 }
2344 }
2345
2346 #[test]
2347 fn test_load_balance_step_after_load_balance() {
2348 let definition = RouteBuilder::from("timer:tick")
2350 .route_id("test-route")
2351 .load_balance()
2352 .to("mock:inner")
2353 .end_load_balance()
2354 .to("mock:outer")
2355 .build()
2356 .unwrap();
2357
2358 assert_eq!(definition.steps().len(), 2);
2359 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2360 }
2361
2362 #[test]
2365 fn test_dynamic_router_builder() {
2366 let definition = RouteBuilder::from("timer:tick")
2367 .route_id("test-route")
2368 .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2369 .build()
2370 .unwrap();
2371
2372 assert_eq!(definition.steps().len(), 1);
2373 assert!(matches!(
2374 &definition.steps()[0],
2375 BuilderStep::DynamicRouter { .. }
2376 ));
2377 }
2378
2379 #[test]
2380 fn test_dynamic_router_builder_with_config() {
2381 let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2382 .max_iterations(100)
2383 .cache_size(500);
2384
2385 let definition = RouteBuilder::from("timer:tick")
2386 .route_id("test-route")
2387 .dynamic_router_with_config(config)
2388 .build()
2389 .unwrap();
2390
2391 assert_eq!(definition.steps().len(), 1);
2392 if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2393 assert_eq!(config.max_iterations, 100);
2394 assert_eq!(config.cache_size, 500);
2395 } else {
2396 panic!("Expected DynamicRouter step");
2397 }
2398 }
2399
2400 #[test]
2401 fn test_dynamic_router_step_after_router() {
2402 let definition = RouteBuilder::from("timer:tick")
2404 .route_id("test-route")
2405 .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2406 .to("mock:outer")
2407 .build()
2408 .unwrap();
2409
2410 assert_eq!(definition.steps().len(), 2);
2411 assert!(matches!(
2412 &definition.steps()[0],
2413 BuilderStep::DynamicRouter { .. }
2414 ));
2415 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2416 }
2417
2418 #[test]
2419 fn routing_slip_builder_creates_step() {
2420 use camel_api::RoutingSlipExpression;
2421
2422 let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2423
2424 let route = RouteBuilder::from("direct:start")
2425 .route_id("routing-slip-test")
2426 .routing_slip(expression)
2427 .build()
2428 .unwrap();
2429
2430 assert!(
2431 matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2432 "Expected RoutingSlip step"
2433 );
2434 }
2435
2436 #[test]
2437 fn routing_slip_with_config_builder_creates_step() {
2438 use camel_api::RoutingSlipConfig;
2439
2440 let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2441 .uri_delimiter("|")
2442 .cache_size(50)
2443 .ignore_invalid_endpoints(true);
2444
2445 let route = RouteBuilder::from("direct:start")
2446 .route_id("routing-slip-config-test")
2447 .routing_slip_with_config(config)
2448 .build()
2449 .unwrap();
2450
2451 if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2452 assert_eq!(config.uri_delimiter, "|");
2453 assert_eq!(config.cache_size, 50);
2454 assert!(config.ignore_invalid_endpoints);
2455 } else {
2456 panic!("Expected RoutingSlip step");
2457 }
2458 }
2459
2460 #[test]
2461 fn test_builder_marshal_adds_processor_step() {
2462 let definition = RouteBuilder::from("timer:tick")
2463 .route_id("test-route")
2464 .marshal("json")
2465 .build()
2466 .unwrap();
2467 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2468 }
2469
2470 #[test]
2471 fn test_builder_unmarshal_adds_processor_step() {
2472 let definition = RouteBuilder::from("timer:tick")
2473 .route_id("test-route")
2474 .unmarshal("json")
2475 .build()
2476 .unwrap();
2477 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2478 }
2479
2480 #[test]
2481 #[should_panic(expected = "unknown data format: 'csv'")]
2482 fn test_builder_marshal_panics_on_unknown_format() {
2483 let _ = RouteBuilder::from("timer:tick")
2484 .route_id("test-route")
2485 .marshal("csv")
2486 .build();
2487 }
2488
2489 #[test]
2490 #[should_panic(expected = "unknown data format: 'csv'")]
2491 fn test_builder_unmarshal_panics_on_unknown_format() {
2492 let _ = RouteBuilder::from("timer:tick")
2493 .route_id("test-route")
2494 .unmarshal("csv")
2495 .build();
2496 }
2497}