1use camel_api::DelayConfig;
2use camel_api::aggregator::{
3 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
4};
5use camel_api::body::Body;
6use camel_api::body_converter::BodyType;
7use camel_api::circuit_breaker::CircuitBreakerConfig;
8use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
9use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
10use camel_api::load_balancer::LoadBalancerConfig;
11use camel_api::multicast::{MulticastConfig, MulticastStrategy};
12use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
13use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
14use camel_api::splitter::SplitterConfig;
15use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
16use camel_api::{
17 BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
18 ProcessorFn, Value,
19 runtime::{
20 CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
21 CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
22 CanonicalWhenSpec,
23 },
24};
25use camel_component_api::ConcurrencyModel;
26use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
27use camel_processor::{
28 ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
29 UnmarshalService, builtin_data_format,
30};
31
32pub trait StepAccumulator: Sized {
38 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
39
40 fn to(mut self, endpoint: impl Into<String>) -> Self {
41 self.steps_mut().push(BuilderStep::To(endpoint.into()));
42 self
43 }
44
45 fn process<F, Fut>(mut self, f: F) -> Self
46 where
47 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
48 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
49 {
50 let svc = ProcessorFn::new(f);
51 self.steps_mut()
52 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
53 self
54 }
55
56 fn process_fn(mut self, processor: BoxProcessor) -> Self {
57 self.steps_mut().push(BuilderStep::Processor(processor));
58 self
59 }
60
61 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
62 let svc = SetHeader::new(IdentityProcessor, key, value);
63 self.steps_mut()
64 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
65 self
66 }
67
68 fn map_body<F>(mut self, mapper: F) -> Self
69 where
70 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
71 {
72 let svc = MapBody::new(IdentityProcessor, mapper);
73 self.steps_mut()
74 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
75 self
76 }
77
78 fn set_body<B>(mut self, body: B) -> Self
79 where
80 B: Into<Body> + Clone + Send + Sync + 'static,
81 {
82 let body: Body = body.into();
83 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
84 self.steps_mut()
85 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
86 self
87 }
88
89 fn transform<B>(self, body: B) -> Self
94 where
95 B: Into<Body> + Clone + Send + Sync + 'static,
96 {
97 self.set_body(body)
98 }
99
100 fn set_body_fn<F>(mut self, expr: F) -> Self
101 where
102 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
103 {
104 let svc = SetBody::new(IdentityProcessor, expr);
105 self.steps_mut()
106 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
107 self
108 }
109
110 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
111 where
112 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
113 {
114 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
115 self.steps_mut()
116 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
117 self
118 }
119
120 fn aggregate(mut self, config: AggregatorConfig) -> Self {
121 self.steps_mut().push(BuilderStep::Aggregate { config });
122 self
123 }
124
125 fn stop(mut self) -> Self {
131 self.steps_mut().push(BuilderStep::Stop);
132 self
133 }
134
135 fn delay(mut self, duration: std::time::Duration) -> Self {
136 self.steps_mut().push(BuilderStep::Delay {
137 config: DelayConfig::from_duration(duration),
138 });
139 self
140 }
141
142 fn delay_with_header(
143 mut self,
144 duration: std::time::Duration,
145 header: impl Into<String>,
146 ) -> Self {
147 self.steps_mut().push(BuilderStep::Delay {
148 config: DelayConfig::from_duration_with_header(duration, header),
149 });
150 self
151 }
152
153 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
157 self.steps_mut().push(BuilderStep::Log {
158 level,
159 message: message.into(),
160 });
161 self
162 }
163
164 fn convert_body_to(mut self, target: BodyType) -> Self {
176 let svc = ConvertBodyTo::new(IdentityProcessor, target);
177 self.steps_mut()
178 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
179 self
180 }
181
182 fn marshal(mut self, format: impl Into<String>) -> Self {
192 let name = format.into();
193 let df =
194 builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
195 let svc = MarshalService::new(IdentityProcessor, df);
196 self.steps_mut()
197 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
198 self
199 }
200
201 fn unmarshal(mut self, format: impl Into<String>) -> Self {
211 let name = format.into();
212 let df =
213 builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
214 let svc = UnmarshalService::new(IdentityProcessor, df);
215 self.steps_mut()
216 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
217 self
218 }
219
220 fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
231 self.steps_mut().push(BuilderStep::Script {
232 language: language.into(),
233 script: script.into(),
234 });
235 self
236 }
237}
238
239pub struct RouteBuilder {
251 from_uri: String,
252 steps: Vec<BuilderStep>,
253 error_handler: Option<ErrorHandlerConfig>,
254 error_handler_mode: ErrorHandlerMode,
255 circuit_breaker_config: Option<CircuitBreakerConfig>,
256 concurrency: Option<ConcurrencyModel>,
257 route_id: Option<String>,
258 auto_startup: Option<bool>,
259 startup_order: Option<i32>,
260}
261
262#[derive(Default)]
263enum ErrorHandlerMode {
264 #[default]
265 None,
266 ExplicitConfig,
267 Shorthand {
268 dlc_uri: Option<String>,
269 specs: Vec<OnExceptionSpec>,
270 },
271 Mixed,
272}
273
274#[derive(Clone)]
275struct OnExceptionSpec {
276 matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
277 retry: Option<RedeliveryPolicy>,
278 handled_by: Option<String>,
279}
280
281impl RouteBuilder {
282 pub fn from(endpoint: &str) -> Self {
284 Self {
285 from_uri: endpoint.to_string(),
286 steps: Vec::new(),
287 error_handler: None,
288 error_handler_mode: ErrorHandlerMode::None,
289 circuit_breaker_config: None,
290 concurrency: None,
291 route_id: None,
292 auto_startup: None,
293 startup_order: None,
294 }
295 }
296
297 pub fn filter<F>(self, predicate: F) -> FilterBuilder
301 where
302 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
303 {
304 FilterBuilder {
305 parent: self,
306 predicate: std::sync::Arc::new(predicate),
307 steps: vec![],
308 }
309 }
310
311 pub fn choice(self) -> ChoiceBuilder {
317 ChoiceBuilder {
318 parent: self,
319 whens: vec![],
320 _otherwise: None,
321 }
322 }
323
324 pub fn wire_tap(mut self, endpoint: &str) -> Self {
328 self.steps.push(BuilderStep::WireTap {
329 uri: endpoint.to_string(),
330 });
331 self
332 }
333
334 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
336 self.error_handler_mode = match self.error_handler_mode {
337 ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
338 ErrorHandlerMode::ExplicitConfig
339 }
340 ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
341 };
342 self.error_handler = Some(config);
343 self
344 }
345
346 pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
348 let uri = uri.into();
349 self.error_handler_mode = match self.error_handler_mode {
350 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
351 dlc_uri: Some(uri),
352 specs: Vec::new(),
353 },
354 ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
355 dlc_uri: Some(uri),
356 specs,
357 },
358 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
359 };
360 self
361 }
362
363 pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
365 where
366 F: Fn(&CamelError) -> bool + Send + Sync + 'static,
367 {
368 self.error_handler_mode = match self.error_handler_mode {
369 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
370 dlc_uri: None,
371 specs: Vec::new(),
372 },
373 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
374 shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
375 };
376
377 OnExceptionBuilder {
378 parent: self,
379 policy: OnExceptionSpec {
380 matches: std::sync::Arc::new(matches),
381 retry: None,
382 handled_by: None,
383 },
384 }
385 }
386
387 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
389 self.circuit_breaker_config = Some(config);
390 self
391 }
392
393 pub fn concurrent(mut self, max: usize) -> Self {
407 let max = if max == 0 { None } else { Some(max) };
408 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
409 self
410 }
411
412 pub fn sequential(mut self) -> Self {
417 self.concurrency = Some(ConcurrencyModel::Sequential);
418 self
419 }
420
421 pub fn route_id(mut self, id: impl Into<String>) -> Self {
425 self.route_id = Some(id.into());
426 self
427 }
428
429 pub fn auto_startup(mut self, auto: bool) -> Self {
433 self.auto_startup = Some(auto);
434 self
435 }
436
437 pub fn startup_order(mut self, order: i32) -> Self {
441 self.startup_order = Some(order);
442 self
443 }
444
445 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
451 SplitBuilder {
452 parent: self,
453 config,
454 steps: Vec::new(),
455 }
456 }
457
458 pub fn multicast(self) -> MulticastBuilder {
464 MulticastBuilder {
465 parent: self,
466 steps: Vec::new(),
467 config: MulticastConfig::new(),
468 }
469 }
470
471 pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
478 ThrottleBuilder {
479 parent: self,
480 config: ThrottlerConfig::new(max_requests, period),
481 steps: Vec::new(),
482 }
483 }
484
485 pub fn load_balance(self) -> LoadBalancerBuilder {
491 LoadBalancerBuilder {
492 parent: self,
493 config: LoadBalancerConfig::round_robin(),
494 steps: Vec::new(),
495 }
496 }
497
498 pub fn dynamic_router(self, expression: RouterExpression) -> Self {
514 self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
515 }
516
517 pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
521 self.steps.push(BuilderStep::DynamicRouter { config });
522 self
523 }
524
525 pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
526 self.routing_slip_with_config(RoutingSlipConfig::new(expression))
527 }
528
529 pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
530 self.steps.push(BuilderStep::RoutingSlip { config });
531 self
532 }
533
534 pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
535 self.recipient_list_with_config(RecipientListConfig::new(expression))
536 }
537
538 pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
539 self.steps.push(BuilderStep::RecipientList { config });
540 self
541 }
542
543 pub fn build(self) -> Result<RouteDefinition, CamelError> {
545 if self.from_uri.is_empty() {
546 return Err(CamelError::RouteError(
547 "route must have a 'from' URI".to_string(),
548 ));
549 }
550 let route_id = self.route_id.ok_or_else(|| {
551 CamelError::RouteError(
552 "route must have a 'route_id' — call .route_id(\"name\") on the builder"
553 .to_string(),
554 )
555 })?;
556 let resolved_error_handler = match self.error_handler_mode {
557 ErrorHandlerMode::None => self.error_handler,
558 ErrorHandlerMode::ExplicitConfig => self.error_handler,
559 ErrorHandlerMode::Mixed => {
560 return Err(CamelError::RouteError(
561 "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
562 ));
563 }
564 ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
565 let mut config = if let Some(uri) = dlc_uri {
566 ErrorHandlerConfig::dead_letter_channel(uri)
567 } else {
568 ErrorHandlerConfig::log_only()
569 };
570
571 for spec in specs {
572 let matcher = spec.matches.clone();
573 let mut builder = config.on_exception(move |e| matcher(e));
574
575 if let Some(retry) = spec.retry {
576 builder = builder.retry(retry.max_attempts).with_backoff(
577 retry.initial_delay,
578 retry.multiplier,
579 retry.max_delay,
580 );
581 if retry.jitter_factor > 0.0 {
582 builder = builder.with_jitter(retry.jitter_factor);
583 }
584 }
585
586 if let Some(uri) = spec.handled_by {
587 builder = builder.handled_by(uri);
588 }
589
590 config = builder.build();
591 }
592
593 Some(config)
594 }
595 };
596
597 let definition = RouteDefinition::new(self.from_uri, self.steps);
598 let definition = if let Some(eh) = resolved_error_handler {
599 definition.with_error_handler(eh)
600 } else {
601 definition
602 };
603 let definition = if let Some(cb) = self.circuit_breaker_config {
604 definition.with_circuit_breaker(cb)
605 } else {
606 definition
607 };
608 let definition = if let Some(concurrency) = self.concurrency {
609 definition.with_concurrency(concurrency)
610 } else {
611 definition
612 };
613 let definition = definition.with_route_id(route_id);
614 let definition = if let Some(auto) = self.auto_startup {
615 definition.with_auto_startup(auto)
616 } else {
617 definition
618 };
619 let definition = if let Some(order) = self.startup_order {
620 definition.with_startup_order(order)
621 } else {
622 definition
623 };
624 Ok(definition)
625 }
626
627 pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
629 if self.from_uri.is_empty() {
630 return Err(CamelError::RouteError(
631 "route must have a 'from' URI".to_string(),
632 ));
633 }
634 let route_id = self.route_id.ok_or_else(|| {
635 CamelError::RouteError(
636 "route must have a 'route_id' — call .route_id(\"name\") on the builder"
637 .to_string(),
638 )
639 })?;
640
641 let steps = canonicalize_steps(self.steps)?;
642 let circuit_breaker = self
643 .circuit_breaker_config
644 .map(canonicalize_circuit_breaker);
645
646 let spec = CanonicalRouteSpec {
647 route_id,
648 from: self.from_uri,
649 steps,
650 circuit_breaker,
651 version: camel_api::CANONICAL_CONTRACT_VERSION,
652 };
653 spec.validate_contract()?;
654 Ok(spec)
655 }
656}
657
658pub struct OnExceptionBuilder {
659 parent: RouteBuilder,
660 policy: OnExceptionSpec,
661}
662
663impl OnExceptionBuilder {
664 pub fn retry(mut self, max_attempts: u32) -> Self {
665 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
666 self
667 }
668
669 pub fn with_backoff(
670 mut self,
671 initial: std::time::Duration,
672 multiplier: f64,
673 max: std::time::Duration,
674 ) -> Self {
675 if let Some(ref mut retry) = self.policy.retry {
676 retry.initial_delay = initial;
677 retry.multiplier = multiplier;
678 retry.max_delay = max;
679 }
680 self
681 }
682
683 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
684 if let Some(ref mut retry) = self.policy.retry {
685 retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
686 }
687 self
688 }
689
690 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
691 self.policy.handled_by = Some(uri.into());
692 self
693 }
694
695 pub fn end_on_exception(mut self) -> RouteBuilder {
696 if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
697 specs.push(self.policy);
698 }
699 self.parent
700 }
701}
702
703fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
704 let mut canonical = Vec::with_capacity(steps.len());
705 for step in steps {
706 canonical.push(canonicalize_step(step)?);
707 }
708 Ok(canonical)
709}
710
711fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
712 match step {
713 BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
714 BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
715 BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
716 BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
717 BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
718 delay_ms: config.delay_ms,
719 dynamic_header: config.dynamic_header,
720 }),
721 BuilderStep::DeclarativeScript { expression } => {
722 Ok(CanonicalStepSpec::Script { expression })
723 }
724 BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
725 predicate,
726 steps: canonicalize_steps(steps)?,
727 }),
728 BuilderStep::DeclarativeChoice { whens, otherwise } => {
729 let mut canonical_whens = Vec::with_capacity(whens.len());
730 for DeclarativeWhenStep { predicate, steps } in whens {
731 canonical_whens.push(CanonicalWhenSpec {
732 predicate,
733 steps: canonicalize_steps(steps)?,
734 });
735 }
736 let otherwise = match otherwise {
737 Some(steps) => Some(canonicalize_steps(steps)?),
738 None => None,
739 };
740 Ok(CanonicalStepSpec::Choice {
741 whens: canonical_whens,
742 otherwise,
743 })
744 }
745 BuilderStep::DeclarativeSplit {
746 expression,
747 aggregation,
748 parallel,
749 parallel_limit,
750 stop_on_exception,
751 steps,
752 } => Ok(CanonicalStepSpec::Split {
753 expression: CanonicalSplitExpressionSpec::Language(expression),
754 aggregation: canonicalize_split_aggregation(aggregation)?,
755 parallel,
756 parallel_limit,
757 stop_on_exception,
758 steps: canonicalize_steps(steps)?,
759 }),
760 BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate {
761 config: canonicalize_aggregate(config)?,
762 }),
763 other => {
764 let step_name = canonical_step_name(&other);
765 let detail = camel_api::canonical_contract_rejection_reason(step_name)
766 .unwrap_or("not included in canonical v1");
767 Err(CamelError::RouteError(format!(
768 "canonical v1 does not support step `{step_name}`: {detail}"
769 )))
770 }
771 }
772}
773
774fn canonicalize_split_aggregation(
775 strategy: camel_api::splitter::AggregationStrategy,
776) -> Result<CanonicalSplitAggregationSpec, CamelError> {
777 match strategy {
778 camel_api::splitter::AggregationStrategy::LastWins => {
779 Ok(CanonicalSplitAggregationSpec::LastWins)
780 }
781 camel_api::splitter::AggregationStrategy::CollectAll => {
782 Ok(CanonicalSplitAggregationSpec::CollectAll)
783 }
784 camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
785 "canonical v1 does not support custom split aggregation".to_string(),
786 )),
787 camel_api::splitter::AggregationStrategy::Original => {
788 Ok(CanonicalSplitAggregationSpec::Original)
789 }
790 }
791}
792
793fn extract_completion_fields(
794 mode: &CompletionMode,
795) -> Result<(Option<usize>, Option<u64>), CamelError> {
796 match mode {
797 CompletionMode::Single(cond) => match cond {
798 CompletionCondition::Size(n) => Ok((Some(*n), None)),
799 CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
800 CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
801 "canonical v1 does not support aggregate predicate completion".to_string(),
802 )),
803 },
804 CompletionMode::Any(conds) => {
805 let mut size = None;
806 let mut timeout_ms = None;
807 for cond in conds {
808 match cond {
809 CompletionCondition::Size(n) => size = Some(*n),
810 CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
811 CompletionCondition::Predicate(_) => {
812 return Err(CamelError::RouteError(
813 "canonical v1 does not support aggregate predicate completion"
814 .to_string(),
815 ));
816 }
817 }
818 }
819 Ok((size, timeout_ms))
820 }
821 }
822}
823
824fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
825 let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
826
827 let header = match &config.correlation {
828 CorrelationStrategy::HeaderName(h) => h.clone(),
829 CorrelationStrategy::Expression { expr, .. } => expr.clone(),
830 CorrelationStrategy::Fn(_) => {
831 return Err(CamelError::RouteError(
832 "canonical v1 does not support Fn correlation strategy".to_string(),
833 ));
834 }
835 };
836
837 let correlation_key = match &config.correlation {
838 CorrelationStrategy::HeaderName(_) => None,
839 CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
840 CorrelationStrategy::Fn(_) => unreachable!(),
841 };
842
843 let strategy = match config.strategy {
844 AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
845 AggregationStrategy::Custom(_) => {
846 return Err(CamelError::RouteError(
847 "canonical v1 does not support custom aggregate strategy".to_string(),
848 ));
849 }
850 };
851 let bucket_ttl_ms = config
852 .bucket_ttl
853 .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
854
855 Ok(CanonicalAggregateSpec {
856 header,
857 completion_size,
858 completion_timeout_ms,
859 correlation_key,
860 force_completion_on_stop: if config.force_completion_on_stop {
861 Some(true)
862 } else {
863 None
864 },
865 discard_on_timeout: if config.discard_on_timeout {
866 Some(true)
867 } else {
868 None
869 },
870 strategy,
871 max_buckets: config.max_buckets,
872 bucket_ttl_ms,
873 })
874}
875
876fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
877 CanonicalCircuitBreakerSpec {
878 failure_threshold: config.failure_threshold,
879 open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
880 }
881}
882
883fn canonical_step_name(step: &BuilderStep) -> &'static str {
884 match step {
885 BuilderStep::Processor(_) => "processor",
886 BuilderStep::To(_) => "to",
887 BuilderStep::Stop => "stop",
888 BuilderStep::Log { .. } => "log",
889 BuilderStep::DeclarativeSetHeader { .. } => "set_header",
890 BuilderStep::DeclarativeSetBody { .. } => "set_body",
891 BuilderStep::DeclarativeFilter { .. } => "filter",
892 BuilderStep::DeclarativeChoice { .. } => "choice",
893 BuilderStep::DeclarativeScript { .. } => "script",
894 BuilderStep::DeclarativeSplit { .. } => "split",
895 BuilderStep::Split { .. } => "split",
896 BuilderStep::Aggregate { .. } => "aggregate",
897 BuilderStep::Filter { .. } => "filter",
898 BuilderStep::Choice { .. } => "choice",
899 BuilderStep::WireTap { .. } => "wire_tap",
900 BuilderStep::Delay { .. } => "delay",
901 BuilderStep::Multicast { .. } => "multicast",
902 BuilderStep::DeclarativeLog { .. } => "log",
903 BuilderStep::Bean { .. } => "bean",
904 BuilderStep::Script { .. } => "script",
905 BuilderStep::Throttle { .. } => "throttle",
906 BuilderStep::LoadBalance { .. } => "load_balancer",
907 BuilderStep::DynamicRouter { .. } => "dynamic_router",
908 BuilderStep::RoutingSlip { .. } => "routing_slip",
909 BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
910 BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
911 BuilderStep::RecipientList { .. } => "recipient_list",
912 BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
913 }
914}
915
916impl StepAccumulator for RouteBuilder {
917 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
918 &mut self.steps
919 }
920}
921
922pub struct SplitBuilder {
930 parent: RouteBuilder,
931 config: SplitterConfig,
932 steps: Vec<BuilderStep>,
933}
934
935impl SplitBuilder {
936 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
938 where
939 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
940 {
941 FilterInSplitBuilder {
942 parent: self,
943 predicate: std::sync::Arc::new(predicate),
944 steps: vec![],
945 }
946 }
947
948 pub fn end_split(mut self) -> RouteBuilder {
951 let split_step = BuilderStep::Split {
952 config: self.config,
953 steps: self.steps,
954 };
955 self.parent.steps.push(split_step);
956 self.parent
957 }
958}
959
960impl StepAccumulator for SplitBuilder {
961 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
962 &mut self.steps
963 }
964}
965
966pub struct FilterBuilder {
968 parent: RouteBuilder,
969 predicate: FilterPredicate,
970 steps: Vec<BuilderStep>,
971}
972
973impl FilterBuilder {
974 pub fn end_filter(mut self) -> RouteBuilder {
977 let step = BuilderStep::Filter {
978 predicate: self.predicate,
979 steps: self.steps,
980 };
981 self.parent.steps.push(step);
982 self.parent
983 }
984}
985
986impl StepAccumulator for FilterBuilder {
987 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
988 &mut self.steps
989 }
990}
991
992pub struct FilterInSplitBuilder {
994 parent: SplitBuilder,
995 predicate: FilterPredicate,
996 steps: Vec<BuilderStep>,
997}
998
999impl FilterInSplitBuilder {
1000 pub fn end_filter(mut self) -> SplitBuilder {
1002 let step = BuilderStep::Filter {
1003 predicate: self.predicate,
1004 steps: self.steps,
1005 };
1006 self.parent.steps.push(step);
1007 self.parent
1008 }
1009}
1010
1011impl StepAccumulator for FilterInSplitBuilder {
1012 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1013 &mut self.steps
1014 }
1015}
1016
1017pub struct ChoiceBuilder {
1024 parent: RouteBuilder,
1025 whens: Vec<WhenStep>,
1026 _otherwise: Option<Vec<BuilderStep>>,
1027}
1028
1029impl ChoiceBuilder {
1030 pub fn when<F>(self, predicate: F) -> WhenBuilder
1033 where
1034 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1035 {
1036 WhenBuilder {
1037 parent: self,
1038 predicate: std::sync::Arc::new(predicate),
1039 steps: vec![],
1040 }
1041 }
1042
1043 pub fn otherwise(self) -> OtherwiseBuilder {
1047 OtherwiseBuilder {
1048 parent: self,
1049 steps: vec![],
1050 }
1051 }
1052
1053 pub fn end_choice(mut self) -> RouteBuilder {
1057 let step = BuilderStep::Choice {
1058 whens: self.whens,
1059 otherwise: self._otherwise,
1060 };
1061 self.parent.steps.push(step);
1062 self.parent
1063 }
1064}
1065
1066pub struct WhenBuilder {
1068 parent: ChoiceBuilder,
1069 predicate: camel_api::FilterPredicate,
1070 steps: Vec<BuilderStep>,
1071}
1072
1073impl WhenBuilder {
1074 pub fn end_when(mut self) -> ChoiceBuilder {
1077 self.parent.whens.push(WhenStep {
1078 predicate: self.predicate,
1079 steps: self.steps,
1080 });
1081 self.parent
1082 }
1083}
1084
1085impl StepAccumulator for WhenBuilder {
1086 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1087 &mut self.steps
1088 }
1089}
1090
1091pub struct OtherwiseBuilder {
1093 parent: ChoiceBuilder,
1094 steps: Vec<BuilderStep>,
1095}
1096
1097impl OtherwiseBuilder {
1098 pub fn end_otherwise(self) -> ChoiceBuilder {
1100 let OtherwiseBuilder { mut parent, steps } = self;
1101 parent._otherwise = Some(steps);
1102 parent
1103 }
1104}
1105
1106impl StepAccumulator for OtherwiseBuilder {
1107 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1108 &mut self.steps
1109 }
1110}
1111
1112pub struct MulticastBuilder {
1120 parent: RouteBuilder,
1121 steps: Vec<BuilderStep>,
1122 config: MulticastConfig,
1123}
1124
1125impl MulticastBuilder {
1126 pub fn parallel(mut self, parallel: bool) -> Self {
1127 self.config = self.config.parallel(parallel);
1128 self
1129 }
1130
1131 pub fn parallel_limit(mut self, limit: usize) -> Self {
1132 self.config = self.config.parallel_limit(limit);
1133 self
1134 }
1135
1136 pub fn stop_on_exception(mut self, stop: bool) -> Self {
1137 self.config = self.config.stop_on_exception(stop);
1138 self
1139 }
1140
1141 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1142 self.config = self.config.timeout(duration);
1143 self
1144 }
1145
1146 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1147 self.config = self.config.aggregation(strategy);
1148 self
1149 }
1150
1151 pub fn end_multicast(mut self) -> RouteBuilder {
1152 let step = BuilderStep::Multicast {
1153 steps: self.steps,
1154 config: self.config,
1155 };
1156 self.parent.steps.push(step);
1157 self.parent
1158 }
1159}
1160
1161impl StepAccumulator for MulticastBuilder {
1162 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1163 &mut self.steps
1164 }
1165}
1166
1167pub struct ThrottleBuilder {
1175 parent: RouteBuilder,
1176 config: ThrottlerConfig,
1177 steps: Vec<BuilderStep>,
1178}
1179
1180impl ThrottleBuilder {
1181 pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1187 self.config = self.config.strategy(strategy);
1188 self
1189 }
1190
1191 pub fn end_throttle(mut self) -> RouteBuilder {
1194 let step = BuilderStep::Throttle {
1195 config: self.config,
1196 steps: self.steps,
1197 };
1198 self.parent.steps.push(step);
1199 self.parent
1200 }
1201}
1202
1203impl StepAccumulator for ThrottleBuilder {
1204 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1205 &mut self.steps
1206 }
1207}
1208
1209pub struct LoadBalancerBuilder {
1217 parent: RouteBuilder,
1218 config: LoadBalancerConfig,
1219 steps: Vec<BuilderStep>,
1220}
1221
1222impl LoadBalancerBuilder {
1223 pub fn round_robin(mut self) -> Self {
1225 self.config = LoadBalancerConfig::round_robin();
1226 self
1227 }
1228
1229 pub fn random(mut self) -> Self {
1231 self.config = LoadBalancerConfig::random();
1232 self
1233 }
1234
1235 pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1240 self.config = LoadBalancerConfig::weighted(weights);
1241 self
1242 }
1243
1244 pub fn failover(mut self) -> Self {
1249 self.config = LoadBalancerConfig::failover();
1250 self
1251 }
1252
1253 pub fn parallel(mut self, parallel: bool) -> Self {
1258 self.config = self.config.parallel(parallel);
1259 self
1260 }
1261
1262 pub fn end_load_balance(mut self) -> RouteBuilder {
1265 let step = BuilderStep::LoadBalance {
1266 config: self.config,
1267 steps: self.steps,
1268 };
1269 self.parent.steps.push(step);
1270 self.parent
1271 }
1272}
1273
1274impl StepAccumulator for LoadBalancerBuilder {
1275 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1276 &mut self.steps
1277 }
1278}
1279
1280#[cfg(test)]
1285mod tests {
1286 use super::*;
1287 use camel_api::error_handler::ErrorHandlerConfig;
1288 use camel_api::load_balancer::LoadBalanceStrategy;
1289 use camel_api::{Exchange, Message};
1290 use camel_core::route::BuilderStep;
1291 use std::sync::Arc;
1292 use std::time::Duration;
1293 use tower::{Service, ServiceExt};
1294
1295 #[test]
1296 fn test_builder_from_creates_definition() {
1297 let definition = RouteBuilder::from("timer:tick")
1298 .route_id("test-route")
1299 .build()
1300 .unwrap();
1301 assert_eq!(definition.from_uri(), "timer:tick");
1302 }
1303
1304 #[test]
1305 fn test_builder_empty_from_uri_errors() {
1306 let result = RouteBuilder::from("").route_id("test-route").build();
1307 assert!(result.is_err());
1308 }
1309
1310 #[test]
1311 fn test_builder_to_adds_step() {
1312 let definition = RouteBuilder::from("timer:tick")
1313 .route_id("test-route")
1314 .to("log:info")
1315 .build()
1316 .unwrap();
1317
1318 assert_eq!(definition.from_uri(), "timer:tick");
1319 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1321 }
1322
1323 #[test]
1324 fn test_builder_filter_adds_filter_step() {
1325 let definition = RouteBuilder::from("timer:tick")
1326 .route_id("test-route")
1327 .filter(|_ex| true)
1328 .to("mock:result")
1329 .end_filter()
1330 .build()
1331 .unwrap();
1332
1333 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1334 }
1335
1336 #[test]
1337 fn test_builder_set_header_adds_processor_step() {
1338 let definition = RouteBuilder::from("timer:tick")
1339 .route_id("test-route")
1340 .set_header("key", Value::String("value".into()))
1341 .build()
1342 .unwrap();
1343
1344 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1345 }
1346
1347 #[test]
1348 fn test_builder_map_body_adds_processor_step() {
1349 let definition = RouteBuilder::from("timer:tick")
1350 .route_id("test-route")
1351 .map_body(|body| body)
1352 .build()
1353 .unwrap();
1354
1355 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1356 }
1357
1358 #[test]
1359 fn test_builder_process_adds_processor_step() {
1360 let definition = RouteBuilder::from("timer:tick")
1361 .route_id("test-route")
1362 .process(|ex| async move { Ok(ex) })
1363 .build()
1364 .unwrap();
1365
1366 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1367 }
1368
1369 #[test]
1370 fn test_builder_chain_multiple_steps() {
1371 let definition = RouteBuilder::from("timer:tick")
1372 .route_id("test-route")
1373 .set_header("source", Value::String("timer".into()))
1374 .filter(|ex| ex.input.header("source").is_some())
1375 .to("log:info")
1376 .end_filter()
1377 .to("mock:result")
1378 .build()
1379 .unwrap();
1380
1381 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1385 }
1386
1387 #[tokio::test]
1392 async fn test_set_header_processor_works() {
1393 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1394 let exchange = Exchange::new(Message::new("test"));
1395 let result = svc.call(exchange).await.unwrap();
1396 assert_eq!(
1397 result.input.header("greeting"),
1398 Some(&Value::String("hello".into()))
1399 );
1400 }
1401
1402 #[tokio::test]
1403 async fn test_filter_processor_passes() {
1404 use camel_api::BoxProcessorExt;
1405 use camel_processor::FilterService;
1406
1407 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1408 let mut svc =
1409 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1410 let exchange = Exchange::new(Message::new("pass"));
1411 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1412 assert_eq!(result.input.body.as_text(), Some("pass"));
1413 }
1414
1415 #[tokio::test]
1416 async fn test_filter_processor_blocks() {
1417 use camel_api::BoxProcessorExt;
1418 use camel_processor::FilterService;
1419
1420 let sub = BoxProcessor::from_fn(|_ex| {
1421 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1422 });
1423 let mut svc =
1424 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1425 let exchange = Exchange::new(Message::new("reject"));
1426 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1427 assert_eq!(result.input.body.as_text(), Some("reject"));
1428 }
1429
1430 #[tokio::test]
1431 async fn test_map_body_processor_works() {
1432 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1433 if let Some(text) = body.as_text() {
1434 Body::Text(text.to_uppercase())
1435 } else {
1436 body
1437 }
1438 });
1439 let exchange = Exchange::new(Message::new("hello"));
1440 let result = mapper.oneshot(exchange).await.unwrap();
1441 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1442 }
1443
1444 #[tokio::test]
1445 async fn test_process_custom_processor_works() {
1446 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1447 ex.set_property("custom", Value::Bool(true));
1448 Ok(ex)
1449 });
1450 let exchange = Exchange::new(Message::default());
1451 let result = processor.oneshot(exchange).await.unwrap();
1452 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1453 }
1454
1455 #[tokio::test]
1460 async fn test_compose_pipeline_runs_steps_in_order() {
1461 use camel_core::route::compose_pipeline;
1462
1463 let processors = vec![
1464 BoxProcessor::new(SetHeader::new(
1465 IdentityProcessor,
1466 "step",
1467 Value::String("one".into()),
1468 )),
1469 BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1470 if let Some(text) = body.as_text() {
1471 Body::Text(format!("{}-processed", text))
1472 } else {
1473 body
1474 }
1475 })),
1476 ];
1477
1478 let pipeline = compose_pipeline(processors);
1479 let exchange = Exchange::new(Message::new("hello"));
1480 let result = pipeline.oneshot(exchange).await.unwrap();
1481
1482 assert_eq!(
1483 result.input.header("step"),
1484 Some(&Value::String("one".into()))
1485 );
1486 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1487 }
1488
1489 #[tokio::test]
1490 async fn test_compose_pipeline_empty_is_identity() {
1491 use camel_core::route::compose_pipeline;
1492
1493 let pipeline = compose_pipeline(vec![]);
1494 let exchange = Exchange::new(Message::new("unchanged"));
1495 let result = pipeline.oneshot(exchange).await.unwrap();
1496 assert_eq!(result.input.body.as_text(), Some("unchanged"));
1497 }
1498
1499 #[test]
1504 fn test_builder_circuit_breaker_sets_config() {
1505 use camel_api::circuit_breaker::CircuitBreakerConfig;
1506
1507 let config = CircuitBreakerConfig::new().failure_threshold(5);
1508 let definition = RouteBuilder::from("timer:tick")
1509 .route_id("test-route")
1510 .circuit_breaker(config)
1511 .build()
1512 .unwrap();
1513
1514 let cb = definition
1515 .circuit_breaker_config()
1516 .expect("circuit breaker should be set");
1517 assert_eq!(cb.failure_threshold, 5);
1518 }
1519
1520 #[test]
1521 fn test_builder_circuit_breaker_with_error_handler() {
1522 use camel_api::circuit_breaker::CircuitBreakerConfig;
1523 use camel_api::error_handler::ErrorHandlerConfig;
1524
1525 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1526 let eh_config = ErrorHandlerConfig::log_only();
1527
1528 let definition = RouteBuilder::from("timer:tick")
1529 .route_id("test-route")
1530 .to("log:info")
1531 .circuit_breaker(cb_config)
1532 .error_handler(eh_config)
1533 .build()
1534 .unwrap();
1535
1536 assert!(
1537 definition.circuit_breaker_config().is_some(),
1538 "circuit breaker config should be set"
1539 );
1540 }
1542
1543 #[test]
1544 fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1545 let definition = RouteBuilder::from("direct:start")
1546 .route_id("test-route")
1547 .dead_letter_channel("log:dlc")
1548 .on_exception(|e| matches!(e, CamelError::Io(_)))
1549 .retry(3)
1550 .handled_by("log:io")
1551 .end_on_exception()
1552 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1553 .retry(1)
1554 .end_on_exception()
1555 .to("mock:out")
1556 .build()
1557 .expect("route should build");
1558
1559 let cfg = definition
1560 .error_handler_config()
1561 .expect("error handler should be set");
1562 assert_eq!(cfg.policies.len(), 2);
1563 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1564 assert_eq!(
1565 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1566 Some(3)
1567 );
1568 assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1569 assert_eq!(
1570 cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1571 Some(1)
1572 );
1573 }
1574
1575 #[test]
1576 fn test_builder_on_exception_mixed_mode_rejected() {
1577 let result = RouteBuilder::from("direct:start")
1578 .route_id("test-route")
1579 .error_handler(ErrorHandlerConfig::log_only())
1580 .on_exception(|_e| true)
1581 .end_on_exception()
1582 .to("mock:out")
1583 .build();
1584
1585 let err = result.err().expect("mixed mode should fail with an error");
1586
1587 assert!(
1588 format!("{err}").contains("mixed error handler modes"),
1589 "unexpected error: {err}"
1590 );
1591 }
1592
1593 #[test]
1594 fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1595 let definition = RouteBuilder::from("direct:start")
1596 .route_id("test-route")
1597 .on_exception(|_e| true)
1598 .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1599 .with_jitter(0.5)
1600 .end_on_exception()
1601 .to("mock:out")
1602 .build()
1603 .expect("route should build");
1604
1605 let cfg = definition
1606 .error_handler_config()
1607 .expect("error handler should be set");
1608 assert_eq!(cfg.policies.len(), 1);
1609 assert!(cfg.policies[0].retry.is_none());
1610 }
1611
1612 #[test]
1613 fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1614 let definition = RouteBuilder::from("direct:start")
1615 .route_id("test-route")
1616 .dead_letter_channel("log:dlc")
1617 .to("mock:out")
1618 .build()
1619 .expect("route should build");
1620
1621 let cfg = definition
1622 .error_handler_config()
1623 .expect("error handler should be set");
1624 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1625 assert!(cfg.policies.is_empty());
1626 }
1627
1628 #[test]
1629 fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1630 let definition = RouteBuilder::from("direct:start")
1631 .route_id("test-route")
1632 .dead_letter_channel("log:first")
1633 .on_exception(|e| matches!(e, CamelError::Io(_)))
1634 .retry(2)
1635 .end_on_exception()
1636 .dead_letter_channel("log:second")
1637 .to("mock:out")
1638 .build()
1639 .expect("route should build");
1640
1641 let cfg = definition
1642 .error_handler_config()
1643 .expect("error handler should be set");
1644 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1645 assert_eq!(cfg.policies.len(), 1);
1646 assert_eq!(
1647 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1648 Some(2)
1649 );
1650 }
1651
1652 #[test]
1653 fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
1654 let definition = RouteBuilder::from("direct:start")
1655 .route_id("test-route")
1656 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1657 .retry(1)
1658 .end_on_exception()
1659 .to("mock:out")
1660 .build()
1661 .expect("route should build");
1662
1663 let cfg = definition
1664 .error_handler_config()
1665 .expect("error handler should be set");
1666 assert!(cfg.dlc_uri.is_none());
1667 assert_eq!(cfg.policies.len(), 1);
1668 }
1669
1670 #[test]
1671 fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
1672 let first = ErrorHandlerConfig::dead_letter_channel("log:first");
1673 let second = ErrorHandlerConfig::dead_letter_channel("log:second");
1674
1675 let definition = RouteBuilder::from("direct:start")
1676 .route_id("test-route")
1677 .error_handler(first)
1678 .error_handler(second)
1679 .to("mock:out")
1680 .build()
1681 .expect("route should build");
1682
1683 let cfg = definition
1684 .error_handler_config()
1685 .expect("error handler should be set");
1686 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1687 }
1688
1689 #[test]
1692 fn test_split_builder_typestate() {
1693 use camel_api::splitter::{SplitterConfig, split_body_lines};
1694
1695 let definition = RouteBuilder::from("timer:test?period=1000")
1697 .route_id("test-route")
1698 .split(SplitterConfig::new(split_body_lines()))
1699 .to("mock:per-fragment")
1700 .end_split()
1701 .to("mock:final")
1702 .build()
1703 .unwrap();
1704
1705 assert_eq!(definition.steps().len(), 2);
1707 }
1708
1709 #[test]
1710 fn test_split_builder_steps_collected() {
1711 use camel_api::splitter::{SplitterConfig, split_body_lines};
1712
1713 let definition = RouteBuilder::from("timer:test?period=1000")
1714 .route_id("test-route")
1715 .split(SplitterConfig::new(split_body_lines()))
1716 .set_header("fragment", Value::String("yes".into()))
1717 .to("mock:per-fragment")
1718 .end_split()
1719 .build()
1720 .unwrap();
1721
1722 assert_eq!(definition.steps().len(), 1);
1724 match &definition.steps()[0] {
1725 BuilderStep::Split { steps, .. } => {
1726 assert_eq!(steps.len(), 2); }
1728 other => panic!("Expected Split, got {:?}", other),
1729 }
1730 }
1731
1732 #[test]
1733 fn test_split_builder_config_propagated() {
1734 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1735
1736 let definition = RouteBuilder::from("timer:test?period=1000")
1737 .route_id("test-route")
1738 .split(
1739 SplitterConfig::new(split_body_lines())
1740 .parallel(true)
1741 .parallel_limit(4)
1742 .aggregation(AggregationStrategy::CollectAll),
1743 )
1744 .to("mock:per-fragment")
1745 .end_split()
1746 .build()
1747 .unwrap();
1748
1749 match &definition.steps()[0] {
1750 BuilderStep::Split { config, .. } => {
1751 assert!(config.parallel);
1752 assert_eq!(config.parallel_limit, Some(4));
1753 assert!(matches!(
1754 config.aggregation,
1755 AggregationStrategy::CollectAll
1756 ));
1757 }
1758 other => panic!("Expected Split, got {:?}", other),
1759 }
1760 }
1761
1762 #[test]
1763 fn test_aggregate_builder_adds_step() {
1764 use camel_api::aggregator::AggregatorConfig;
1765 use camel_core::route::BuilderStep;
1766
1767 let definition = RouteBuilder::from("timer:tick")
1768 .route_id("test-route")
1769 .aggregate(
1770 AggregatorConfig::correlate_by("key")
1771 .complete_when_size(2)
1772 .build(),
1773 )
1774 .build()
1775 .unwrap();
1776
1777 assert_eq!(definition.steps().len(), 1);
1778 assert!(matches!(
1779 definition.steps()[0],
1780 BuilderStep::Aggregate { .. }
1781 ));
1782 }
1783
1784 #[test]
1785 fn test_aggregate_in_split_builder() {
1786 use camel_api::aggregator::AggregatorConfig;
1787 use camel_api::splitter::{SplitterConfig, split_body_lines};
1788 use camel_core::route::BuilderStep;
1789
1790 let definition = RouteBuilder::from("timer:tick")
1791 .route_id("test-route")
1792 .split(SplitterConfig::new(split_body_lines()))
1793 .aggregate(
1794 AggregatorConfig::correlate_by("key")
1795 .complete_when_size(1)
1796 .build(),
1797 )
1798 .end_split()
1799 .build()
1800 .unwrap();
1801
1802 assert_eq!(definition.steps().len(), 1);
1803 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
1804 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
1805 } else {
1806 panic!("expected Split step");
1807 }
1808 }
1809
1810 #[test]
1813 fn test_builder_set_body_static_adds_processor() {
1814 let definition = RouteBuilder::from("timer:tick")
1815 .route_id("test-route")
1816 .set_body("fixed")
1817 .build()
1818 .unwrap();
1819 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1820 }
1821
1822 #[test]
1823 fn test_builder_set_body_fn_adds_processor() {
1824 let definition = RouteBuilder::from("timer:tick")
1825 .route_id("test-route")
1826 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
1827 .build()
1828 .unwrap();
1829 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1830 }
1831
1832 #[test]
1833 fn transform_alias_produces_same_as_set_body() {
1834 let route_transform = RouteBuilder::from("timer:tick")
1835 .route_id("test-route")
1836 .transform("hello")
1837 .build()
1838 .unwrap();
1839
1840 let route_set_body = RouteBuilder::from("timer:tick")
1841 .route_id("test-route")
1842 .set_body("hello")
1843 .build()
1844 .unwrap();
1845
1846 assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
1847 }
1848
1849 #[test]
1850 fn test_builder_set_header_fn_adds_processor() {
1851 let definition = RouteBuilder::from("timer:tick")
1852 .route_id("test-route")
1853 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
1854 .build()
1855 .unwrap();
1856 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1857 }
1858
1859 #[tokio::test]
1860 async fn test_set_body_static_processor_works() {
1861 use camel_core::route::compose_pipeline;
1862 let def = RouteBuilder::from("t:t")
1863 .route_id("test-route")
1864 .set_body("replaced")
1865 .build()
1866 .unwrap();
1867 let pipeline = compose_pipeline(
1868 def.steps()
1869 .iter()
1870 .filter_map(|s| {
1871 if let BuilderStep::Processor(p) = s {
1872 Some(p.clone())
1873 } else {
1874 None
1875 }
1876 })
1877 .collect(),
1878 );
1879 let exchange = Exchange::new(Message::new("original"));
1880 let result = pipeline.oneshot(exchange).await.unwrap();
1881 assert_eq!(result.input.body.as_text(), Some("replaced"));
1882 }
1883
1884 #[tokio::test]
1885 async fn test_set_body_fn_processor_works() {
1886 use camel_core::route::compose_pipeline;
1887 let def = RouteBuilder::from("t:t")
1888 .route_id("test-route")
1889 .set_body_fn(|ex: &Exchange| {
1890 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
1891 })
1892 .build()
1893 .unwrap();
1894 let pipeline = compose_pipeline(
1895 def.steps()
1896 .iter()
1897 .filter_map(|s| {
1898 if let BuilderStep::Processor(p) = s {
1899 Some(p.clone())
1900 } else {
1901 None
1902 }
1903 })
1904 .collect(),
1905 );
1906 let exchange = Exchange::new(Message::new("hello"));
1907 let result = pipeline.oneshot(exchange).await.unwrap();
1908 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1909 }
1910
1911 #[tokio::test]
1912 async fn test_set_header_fn_processor_works() {
1913 use camel_core::route::compose_pipeline;
1914 let def = RouteBuilder::from("t:t")
1915 .route_id("test-route")
1916 .set_header_fn("echo", |ex: &Exchange| {
1917 ex.input
1918 .body
1919 .as_text()
1920 .map(|t| Value::String(t.into()))
1921 .unwrap_or(Value::Null)
1922 })
1923 .build()
1924 .unwrap();
1925 let pipeline = compose_pipeline(
1926 def.steps()
1927 .iter()
1928 .filter_map(|s| {
1929 if let BuilderStep::Processor(p) = s {
1930 Some(p.clone())
1931 } else {
1932 None
1933 }
1934 })
1935 .collect(),
1936 );
1937 let exchange = Exchange::new(Message::new("ping"));
1938 let result = pipeline.oneshot(exchange).await.unwrap();
1939 assert_eq!(
1940 result.input.header("echo"),
1941 Some(&Value::String("ping".into()))
1942 );
1943 }
1944
1945 #[test]
1948 fn test_filter_builder_typestate() {
1949 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1950 .route_id("test-route")
1951 .filter(|_ex| true)
1952 .to("mock:inner")
1953 .end_filter()
1954 .to("mock:outer")
1955 .build();
1956 assert!(result.is_ok());
1957 }
1958
1959 #[test]
1960 fn test_filter_builder_steps_collected() {
1961 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1962 .route_id("test-route")
1963 .filter(|_ex| true)
1964 .to("mock:inner")
1965 .end_filter()
1966 .build()
1967 .unwrap();
1968
1969 assert_eq!(definition.steps().len(), 1);
1970 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1971 }
1972
1973 #[test]
1974 fn test_wire_tap_builder_adds_step() {
1975 let definition = RouteBuilder::from("timer:tick")
1976 .route_id("test-route")
1977 .wire_tap("mock:tap")
1978 .to("mock:result")
1979 .build()
1980 .unwrap();
1981
1982 assert_eq!(definition.steps().len(), 2);
1983 assert!(
1984 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
1985 );
1986 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
1987 }
1988
1989 #[test]
1992 fn test_multicast_builder_typestate() {
1993 let definition = RouteBuilder::from("timer:tick")
1994 .route_id("test-route")
1995 .multicast()
1996 .to("direct:a")
1997 .to("direct:b")
1998 .end_multicast()
1999 .to("mock:result")
2000 .build()
2001 .unwrap();
2002
2003 assert_eq!(definition.steps().len(), 2); }
2005
2006 #[test]
2007 fn test_multicast_builder_steps_collected() {
2008 let definition = RouteBuilder::from("timer:tick")
2009 .route_id("test-route")
2010 .multicast()
2011 .to("direct:a")
2012 .to("direct:b")
2013 .end_multicast()
2014 .build()
2015 .unwrap();
2016
2017 match &definition.steps()[0] {
2018 BuilderStep::Multicast { steps, .. } => {
2019 assert_eq!(steps.len(), 2);
2020 }
2021 other => panic!("Expected Multicast, got {:?}", other),
2022 }
2023 }
2024
2025 #[test]
2028 fn test_builder_concurrent_sets_concurrency() {
2029 use camel_component_api::ConcurrencyModel;
2030
2031 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2032 .route_id("test-route")
2033 .concurrent(16)
2034 .to("log:info")
2035 .build()
2036 .unwrap();
2037
2038 assert_eq!(
2039 definition.concurrency_override(),
2040 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2041 );
2042 }
2043
2044 #[test]
2045 fn test_builder_concurrent_zero_means_unbounded() {
2046 use camel_component_api::ConcurrencyModel;
2047
2048 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2049 .route_id("test-route")
2050 .concurrent(0)
2051 .to("log:info")
2052 .build()
2053 .unwrap();
2054
2055 assert_eq!(
2056 definition.concurrency_override(),
2057 Some(&ConcurrencyModel::Concurrent { max: None })
2058 );
2059 }
2060
2061 #[test]
2062 fn test_builder_sequential_sets_concurrency() {
2063 use camel_component_api::ConcurrencyModel;
2064
2065 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2066 .route_id("test-route")
2067 .sequential()
2068 .to("log:info")
2069 .build()
2070 .unwrap();
2071
2072 assert_eq!(
2073 definition.concurrency_override(),
2074 Some(&ConcurrencyModel::Sequential)
2075 );
2076 }
2077
2078 #[test]
2079 fn test_builder_default_concurrency_is_none() {
2080 let definition = RouteBuilder::from("timer:tick")
2081 .route_id("test-route")
2082 .to("log:info")
2083 .build()
2084 .unwrap();
2085
2086 assert_eq!(definition.concurrency_override(), None);
2087 }
2088
2089 #[test]
2092 fn test_builder_route_id_sets_id() {
2093 let definition = RouteBuilder::from("timer:tick")
2094 .route_id("my-route")
2095 .build()
2096 .unwrap();
2097
2098 assert_eq!(definition.route_id(), "my-route");
2099 }
2100
2101 #[test]
2102 fn test_build_without_route_id_fails() {
2103 let result = RouteBuilder::from("timer:tick?period=1000")
2104 .to("log:info")
2105 .build();
2106 let err = match result {
2107 Err(e) => e.to_string(),
2108 Ok(_) => panic!("build() should fail without route_id"),
2109 };
2110 assert!(
2111 err.contains("route_id"),
2112 "error should mention route_id, got: {}",
2113 err
2114 );
2115 }
2116
2117 #[test]
2118 fn test_builder_auto_startup_false() {
2119 let definition = RouteBuilder::from("timer:tick")
2120 .route_id("test-route")
2121 .auto_startup(false)
2122 .build()
2123 .unwrap();
2124
2125 assert!(!definition.auto_startup());
2126 }
2127
2128 #[test]
2129 fn test_builder_startup_order_custom() {
2130 let definition = RouteBuilder::from("timer:tick")
2131 .route_id("test-route")
2132 .startup_order(50)
2133 .build()
2134 .unwrap();
2135
2136 assert_eq!(definition.startup_order(), 50);
2137 }
2138
2139 #[test]
2140 fn test_builder_defaults() {
2141 let definition = RouteBuilder::from("timer:tick")
2142 .route_id("test-route")
2143 .build()
2144 .unwrap();
2145
2146 assert_eq!(definition.route_id(), "test-route");
2147 assert!(definition.auto_startup());
2148 assert_eq!(definition.startup_order(), 1000);
2149 }
2150
2151 #[test]
2154 fn test_choice_builder_single_when() {
2155 let definition = RouteBuilder::from("timer:tick")
2156 .route_id("test-route")
2157 .choice()
2158 .when(|ex: &Exchange| ex.input.header("type").is_some())
2159 .to("mock:typed")
2160 .end_when()
2161 .end_choice()
2162 .build()
2163 .unwrap();
2164 assert_eq!(definition.steps().len(), 1);
2165 assert!(
2166 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2167 if whens.len() == 1 && otherwise.is_none())
2168 );
2169 }
2170
2171 #[test]
2172 fn test_choice_builder_when_otherwise() {
2173 let definition = RouteBuilder::from("timer:tick")
2174 .route_id("test-route")
2175 .choice()
2176 .when(|ex: &Exchange| ex.input.header("a").is_some())
2177 .to("mock:a")
2178 .end_when()
2179 .otherwise()
2180 .to("mock:fallback")
2181 .end_otherwise()
2182 .end_choice()
2183 .build()
2184 .unwrap();
2185 assert!(
2186 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2187 if whens.len() == 1 && otherwise.is_some())
2188 );
2189 }
2190
2191 #[test]
2192 fn test_choice_builder_multiple_whens() {
2193 let definition = RouteBuilder::from("timer:tick")
2194 .route_id("test-route")
2195 .choice()
2196 .when(|ex: &Exchange| ex.input.header("a").is_some())
2197 .to("mock:a")
2198 .end_when()
2199 .when(|ex: &Exchange| ex.input.header("b").is_some())
2200 .to("mock:b")
2201 .end_when()
2202 .end_choice()
2203 .build()
2204 .unwrap();
2205 assert!(
2206 matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2207 if whens.len() == 2)
2208 );
2209 }
2210
2211 #[test]
2212 fn test_choice_step_after_choice() {
2213 let definition = RouteBuilder::from("timer:tick")
2215 .route_id("test-route")
2216 .choice()
2217 .when(|_ex: &Exchange| true)
2218 .to("mock:inner")
2219 .end_when()
2220 .end_choice()
2221 .to("mock:outer") .build()
2223 .unwrap();
2224 assert_eq!(definition.steps().len(), 2);
2225 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2226 }
2227
2228 #[test]
2231 fn test_throttle_builder_typestate() {
2232 let definition = RouteBuilder::from("timer:tick")
2233 .route_id("test-route")
2234 .throttle(10, std::time::Duration::from_secs(1))
2235 .to("mock:result")
2236 .end_throttle()
2237 .build()
2238 .unwrap();
2239
2240 assert_eq!(definition.steps().len(), 1);
2241 assert!(matches!(
2242 &definition.steps()[0],
2243 BuilderStep::Throttle { .. }
2244 ));
2245 }
2246
2247 #[test]
2248 fn test_throttle_builder_with_strategy() {
2249 let definition = RouteBuilder::from("timer:tick")
2250 .route_id("test-route")
2251 .throttle(10, std::time::Duration::from_secs(1))
2252 .strategy(ThrottleStrategy::Reject)
2253 .to("mock:result")
2254 .end_throttle()
2255 .build()
2256 .unwrap();
2257
2258 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2259 assert_eq!(config.strategy, ThrottleStrategy::Reject);
2260 } else {
2261 panic!("Expected Throttle step");
2262 }
2263 }
2264
2265 #[test]
2266 fn test_throttle_builder_steps_collected() {
2267 let definition = RouteBuilder::from("timer:tick")
2268 .route_id("test-route")
2269 .throttle(5, std::time::Duration::from_secs(1))
2270 .set_header("throttled", Value::Bool(true))
2271 .to("mock:throttled")
2272 .end_throttle()
2273 .build()
2274 .unwrap();
2275
2276 match &definition.steps()[0] {
2277 BuilderStep::Throttle { steps, .. } => {
2278 assert_eq!(steps.len(), 2); }
2280 other => panic!("Expected Throttle, got {:?}", other),
2281 }
2282 }
2283
2284 #[test]
2285 fn test_throttle_step_after_throttle() {
2286 let definition = RouteBuilder::from("timer:tick")
2288 .route_id("test-route")
2289 .throttle(10, std::time::Duration::from_secs(1))
2290 .to("mock:inner")
2291 .end_throttle()
2292 .to("mock:outer")
2293 .build()
2294 .unwrap();
2295
2296 assert_eq!(definition.steps().len(), 2);
2297 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2298 }
2299
2300 #[test]
2303 fn test_load_balance_builder_typestate() {
2304 let definition = RouteBuilder::from("timer:tick")
2305 .route_id("test-route")
2306 .load_balance()
2307 .round_robin()
2308 .to("mock:a")
2309 .to("mock:b")
2310 .end_load_balance()
2311 .build()
2312 .unwrap();
2313
2314 assert_eq!(definition.steps().len(), 1);
2315 assert!(matches!(
2316 &definition.steps()[0],
2317 BuilderStep::LoadBalance { .. }
2318 ));
2319 }
2320
2321 #[test]
2322 fn test_load_balance_builder_with_strategy() {
2323 let definition = RouteBuilder::from("timer:tick")
2324 .route_id("test-route")
2325 .load_balance()
2326 .random()
2327 .to("mock:result")
2328 .end_load_balance()
2329 .build()
2330 .unwrap();
2331
2332 if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2333 assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2334 } else {
2335 panic!("Expected LoadBalance step");
2336 }
2337 }
2338
2339 #[test]
2340 fn test_load_balance_builder_steps_collected() {
2341 let definition = RouteBuilder::from("timer:tick")
2342 .route_id("test-route")
2343 .load_balance()
2344 .set_header("lb", Value::Bool(true))
2345 .to("mock:a")
2346 .end_load_balance()
2347 .build()
2348 .unwrap();
2349
2350 match &definition.steps()[0] {
2351 BuilderStep::LoadBalance { steps, .. } => {
2352 assert_eq!(steps.len(), 2); }
2354 other => panic!("Expected LoadBalance, got {:?}", other),
2355 }
2356 }
2357
2358 #[test]
2359 fn test_load_balance_step_after_load_balance() {
2360 let definition = RouteBuilder::from("timer:tick")
2362 .route_id("test-route")
2363 .load_balance()
2364 .to("mock:inner")
2365 .end_load_balance()
2366 .to("mock:outer")
2367 .build()
2368 .unwrap();
2369
2370 assert_eq!(definition.steps().len(), 2);
2371 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2372 }
2373
2374 #[test]
2377 fn test_dynamic_router_builder() {
2378 let definition = RouteBuilder::from("timer:tick")
2379 .route_id("test-route")
2380 .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2381 .build()
2382 .unwrap();
2383
2384 assert_eq!(definition.steps().len(), 1);
2385 assert!(matches!(
2386 &definition.steps()[0],
2387 BuilderStep::DynamicRouter { .. }
2388 ));
2389 }
2390
2391 #[test]
2392 fn test_dynamic_router_builder_with_config() {
2393 let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2394 .max_iterations(100)
2395 .cache_size(500);
2396
2397 let definition = RouteBuilder::from("timer:tick")
2398 .route_id("test-route")
2399 .dynamic_router_with_config(config)
2400 .build()
2401 .unwrap();
2402
2403 assert_eq!(definition.steps().len(), 1);
2404 if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2405 assert_eq!(config.max_iterations, 100);
2406 assert_eq!(config.cache_size, 500);
2407 } else {
2408 panic!("Expected DynamicRouter step");
2409 }
2410 }
2411
2412 #[test]
2413 fn test_dynamic_router_step_after_router() {
2414 let definition = RouteBuilder::from("timer:tick")
2416 .route_id("test-route")
2417 .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2418 .to("mock:outer")
2419 .build()
2420 .unwrap();
2421
2422 assert_eq!(definition.steps().len(), 2);
2423 assert!(matches!(
2424 &definition.steps()[0],
2425 BuilderStep::DynamicRouter { .. }
2426 ));
2427 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2428 }
2429
2430 #[test]
2431 fn routing_slip_builder_creates_step() {
2432 use camel_api::RoutingSlipExpression;
2433
2434 let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2435
2436 let route = RouteBuilder::from("direct:start")
2437 .route_id("routing-slip-test")
2438 .routing_slip(expression)
2439 .build()
2440 .unwrap();
2441
2442 assert!(
2443 matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2444 "Expected RoutingSlip step"
2445 );
2446 }
2447
2448 #[test]
2449 fn routing_slip_with_config_builder_creates_step() {
2450 use camel_api::RoutingSlipConfig;
2451
2452 let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2453 .uri_delimiter("|")
2454 .cache_size(50)
2455 .ignore_invalid_endpoints(true);
2456
2457 let route = RouteBuilder::from("direct:start")
2458 .route_id("routing-slip-config-test")
2459 .routing_slip_with_config(config)
2460 .build()
2461 .unwrap();
2462
2463 if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2464 assert_eq!(config.uri_delimiter, "|");
2465 assert_eq!(config.cache_size, 50);
2466 assert!(config.ignore_invalid_endpoints);
2467 } else {
2468 panic!("Expected RoutingSlip step");
2469 }
2470 }
2471
2472 #[test]
2473 fn test_builder_marshal_adds_processor_step() {
2474 let definition = RouteBuilder::from("timer:tick")
2475 .route_id("test-route")
2476 .marshal("json")
2477 .build()
2478 .unwrap();
2479 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2480 }
2481
2482 #[test]
2483 fn test_builder_unmarshal_adds_processor_step() {
2484 let definition = RouteBuilder::from("timer:tick")
2485 .route_id("test-route")
2486 .unmarshal("json")
2487 .build()
2488 .unwrap();
2489 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2490 }
2491
2492 #[test]
2493 #[should_panic(expected = "unknown data format: 'csv'")]
2494 fn test_builder_marshal_panics_on_unknown_format() {
2495 let _ = RouteBuilder::from("timer:tick")
2496 .route_id("test-route")
2497 .marshal("csv")
2498 .build();
2499 }
2500
2501 #[test]
2502 #[should_panic(expected = "unknown data format: 'csv'")]
2503 fn test_builder_unmarshal_panics_on_unknown_format() {
2504 let _ = RouteBuilder::from("timer:tick")
2505 .route_id("test-route")
2506 .unmarshal("csv")
2507 .build();
2508 }
2509}